Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions src/main/java/com/qiniu/storage/ResumeUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.qiniu.http.Client;
import com.qiniu.http.Response;
import com.qiniu.storage.model.ResumeBlockInfo;
import com.qiniu.util.Crc32;
import com.qiniu.util.StringMap;
import com.qiniu.util.StringUtils;
import com.qiniu.util.UrlSafeBase64;
Expand Down Expand Up @@ -45,6 +46,7 @@ public final class ResumeUploader {
private final RecordHelper helper;
private FileInputStream file;
private String host;
private int retryMax;

ResumeUploader(Client client, String upToken, String key, File file,
StringMap params, String mime, Recorder recorder, Configuration configuration) {
Expand All @@ -62,6 +64,7 @@ public final class ResumeUploader {
this.recorder = recorder;
this.modifyTime = f.lastModified();
helper = new RecordHelper();
retryMax = configuration.retryMax;
}

public Response upload() throws QiniuException {
Expand Down Expand Up @@ -91,8 +94,9 @@ public Response upload() throws QiniuException {
throw new QiniuException(e);
}

// long crc = Crc32.bytes(blockBuffer, 0, blockSize);
long crc = Crc32.bytes(blockBuffer, 0, blockSize);
Response response = null;
QiniuException temp = null;
try {
response = makeBlock(blockBuffer, blockSize);
} catch (QiniuException e) {
Expand All @@ -101,25 +105,38 @@ public Response upload() throws QiniuException {
}
if (e.response == null || e.response.needRetry()) {
retry = true;
temp = e;
} else {
close();
throw e;
}
}

if (!retry) {
ResumeBlockInfo blockInfo0 = response.jsonToObject(ResumeBlockInfo.class);
if (blockInfo0.crc32 != crc) {
retry = true;
temp = new QiniuException(new Exception("block's crc32 is not match"));
}
}

if (retry) {
try {
response = makeBlock(blockBuffer, blockSize);
retry = false;
} catch (QiniuException e) {
if (retryMax > 0) {
retryMax--;
try {
response = makeBlock(blockBuffer, blockSize);
retry = false;
} catch (QiniuException e) {
close();
throw e;
}
} else {
close();
throw e;
throw temp;
}

}
ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class);
//TODO check return crc32
// if blockInfo.crc32 != crc{}

ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class);
contexts[contextIndex++] = blockInfo.ctx;
uploaded += blockSize;
helper.record(uploaded);
Expand Down Expand Up @@ -147,7 +164,7 @@ private Response makeBlock(byte[] block, int blockSize) throws QiniuException {
private void close() {
try {
file.close();
} catch (IOException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
Expand Down
39 changes: 28 additions & 11 deletions src/main/java/com/qiniu/storage/StreamUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.qiniu.http.Client;
import com.qiniu.http.Response;
import com.qiniu.storage.model.ResumeBlockInfo;
import com.qiniu.util.Crc32;
import com.qiniu.util.StringMap;
import com.qiniu.util.StringUtils;
import com.qiniu.util.UrlSafeBase64;
Expand All @@ -28,6 +29,7 @@ public final class StreamUploader {
private final InputStream stream;
private long size;
private String host;
private int retryMax;

StreamUploader(Client client, String upToken, String key, InputStream stream,
StringMap params, String mime, Configuration configuration) {
Expand All @@ -40,6 +42,7 @@ public final class StreamUploader {
this.contexts = new ArrayList<>();
this.blockBuffer = new byte[Constants.BLOCK_SIZE];
this.stream = stream;
retryMax = configuration.retryMax;
}

public Response upload() throws QiniuException {
Expand All @@ -54,9 +57,11 @@ public Response upload() throws QiniuException {

while (size == 0) {
int bufferIndex = 0;
int blockSize = 0;
while (ret != -1 && bufferIndex != blockBuffer.length) {
try {
ret = stream.read(blockBuffer, bufferIndex, blockBuffer.length - bufferIndex);
blockSize = blockBuffer.length - bufferIndex;
ret = stream.read(blockBuffer, bufferIndex, blockSize);
} catch (IOException e) {
close();
throw new QiniuException(e);
Expand All @@ -75,7 +80,9 @@ public Response upload() throws QiniuException {
}
}

long crc = Crc32.bytes(blockBuffer, 0, blockSize);
Response response = null;
QiniuException temp = null;
try {
response = makeBlock(blockBuffer, bufferIndex);
} catch (QiniuException e) {
Expand All @@ -84,24 +91,34 @@ public Response upload() throws QiniuException {
}
if (e.response == null || e.response.needRetry()) {
retry = true;
temp = e;
} else {
close();
throw e;
}
}
if (!retry) {
ResumeBlockInfo blockInfo0 = response.jsonToObject(ResumeBlockInfo.class);
if (blockInfo0.crc32 != crc) {
retry = true;
temp = new QiniuException(new Exception("block's crc32 is not match"));
}
}
if (retry) {
try {
response = makeBlock(blockBuffer, bufferIndex);
retry = false;
} catch (QiniuException e) {
close();
throw e;
if (retryMax > 0) {
retryMax--;
try {
response = makeBlock(blockBuffer, bufferIndex);
retry = false;
} catch (QiniuException e) {
close();
throw e;
}
} else {
throw temp;
}

}
ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class);
//TODO check return crc32
// if blockInfo.crc32 != crc{}
contexts.add(blockInfo.ctx);
uploaded += bufferIndex;
}
Expand All @@ -126,7 +143,7 @@ private Response makeBlock(byte[] block, int blockSize) throws QiniuException {
private void close() {
try {
stream.close();
} catch (IOException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
Expand Down
13 changes: 11 additions & 2 deletions src/test/java/com/qiniu/streaming/StreamingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@
import static org.junit.Assert.*;

/**
* Created by bailong on 16/9/22.
* Created by bailong on 16/9/22
*/
public class StreamingTest {
private Auth auth = TestConfig.testAuth;
private Auth auth = null;

{
try {
auth = Auth.create(System.getenv("ak"), System.getenv("sk"));
} catch (Exception e) {
auth = TestConfig.testAuth;
}
}

private String hub = "pilisdktest";
private String streamKeyPrefix = "pilijava" + System.currentTimeMillis();
private StreamingManager manager = new StreamingManager(auth, hub);
Expand Down