diff --git a/src/main/java/com/qiniu/storage/ResumeUploader.java b/src/main/java/com/qiniu/storage/ResumeUploader.java index b3d398f23..6cded46ce 100644 --- a/src/main/java/com/qiniu/storage/ResumeUploader.java +++ b/src/main/java/com/qiniu/storage/ResumeUploader.java @@ -6,6 +6,7 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.storage.model.ResumeBlockInfo; +import com.qiniu.util.Crc32; import com.qiniu.util.StringMap; import com.qiniu.util.StringUtils; import com.qiniu.util.UrlSafeBase64; @@ -45,6 +46,7 @@ public final class ResumeUploader { private final RecordHelper helper; private FileInputStream file; private String host; + private int retryMax; ResumeUploader(Client client, String upToken, String key, File file, StringMap params, String mime, Recorder recorder, Configuration configuration) { @@ -62,6 +64,7 @@ public final class ResumeUploader { this.recorder = recorder; this.modifyTime = f.lastModified(); helper = new RecordHelper(); + retryMax = configuration.retryMax; } public Response upload() throws QiniuException { @@ -91,8 +94,9 @@ public Response upload() throws QiniuException { throw new QiniuException(e); } -// long crc = Crc32.bytes(blockBuffer, 0, blockSize); + long crc = Crc32.bytes(blockBuffer, 0, blockSize); Response response = null; + QiniuException temp = null; try { response = makeBlock(blockBuffer, blockSize); } catch (QiniuException e) { @@ -101,25 +105,38 @@ public Response upload() throws QiniuException { } if (e.response == null || e.response.needRetry()) { retry = true; + temp = e; } else { close(); throw e; } } + + if (!retry) { + ResumeBlockInfo blockInfo0 = response.jsonToObject(ResumeBlockInfo.class); + if (blockInfo0.crc32 != crc) { + retry = true; + temp = new QiniuException(new Exception("block's crc32 is not match")); + } + } + if (retry) { - try { - response = makeBlock(blockBuffer, blockSize); - retry = false; - } catch (QiniuException e) { + if (retryMax > 0) { + retryMax--; + try { + response = makeBlock(blockBuffer, blockSize); + retry = false; + } catch (QiniuException e) { + close(); + throw e; + } + } else { close(); - throw e; + throw temp; } - } - ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class); - //TODO check return crc32 - // if blockInfo.crc32 != crc{} + ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class); contexts[contextIndex++] = blockInfo.ctx; uploaded += blockSize; helper.record(uploaded); @@ -147,7 +164,7 @@ private Response makeBlock(byte[] block, int blockSize) throws QiniuException { private void close() { try { file.close(); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } } diff --git a/src/main/java/com/qiniu/storage/StreamUploader.java b/src/main/java/com/qiniu/storage/StreamUploader.java index c600b63a0..0306c407a 100644 --- a/src/main/java/com/qiniu/storage/StreamUploader.java +++ b/src/main/java/com/qiniu/storage/StreamUploader.java @@ -5,6 +5,7 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.storage.model.ResumeBlockInfo; +import com.qiniu.util.Crc32; import com.qiniu.util.StringMap; import com.qiniu.util.StringUtils; import com.qiniu.util.UrlSafeBase64; @@ -28,6 +29,7 @@ public final class StreamUploader { private final InputStream stream; private long size; private String host; + private int retryMax; StreamUploader(Client client, String upToken, String key, InputStream stream, StringMap params, String mime, Configuration configuration) { @@ -40,6 +42,7 @@ public final class StreamUploader { this.contexts = new ArrayList<>(); this.blockBuffer = new byte[Constants.BLOCK_SIZE]; this.stream = stream; + retryMax = configuration.retryMax; } public Response upload() throws QiniuException { @@ -54,9 +57,11 @@ public Response upload() throws QiniuException { while (size == 0) { int bufferIndex = 0; + int blockSize = 0; while (ret != -1 && bufferIndex != blockBuffer.length) { try { - ret = stream.read(blockBuffer, bufferIndex, blockBuffer.length - bufferIndex); + blockSize = blockBuffer.length - bufferIndex; + ret = stream.read(blockBuffer, bufferIndex, blockSize); } catch (IOException e) { close(); throw new QiniuException(e); @@ -75,7 +80,9 @@ public Response upload() throws QiniuException { } } + long crc = Crc32.bytes(blockBuffer, 0, blockSize); Response response = null; + QiniuException temp = null; try { response = makeBlock(blockBuffer, bufferIndex); } catch (QiniuException e) { @@ -84,24 +91,34 @@ public Response upload() throws QiniuException { } if (e.response == null || e.response.needRetry()) { retry = true; + temp = e; } else { close(); throw e; } } + if (!retry) { + ResumeBlockInfo blockInfo0 = response.jsonToObject(ResumeBlockInfo.class); + if (blockInfo0.crc32 != crc) { + retry = true; + temp = new QiniuException(new Exception("block's crc32 is not match")); + } + } if (retry) { - try { - response = makeBlock(blockBuffer, bufferIndex); - retry = false; - } catch (QiniuException e) { - close(); - throw e; + if (retryMax > 0) { + retryMax--; + try { + response = makeBlock(blockBuffer, bufferIndex); + retry = false; + } catch (QiniuException e) { + close(); + throw e; + } + } else { + throw temp; } - } ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class); - //TODO check return crc32 - // if blockInfo.crc32 != crc{} contexts.add(blockInfo.ctx); uploaded += bufferIndex; } @@ -126,7 +143,7 @@ private Response makeBlock(byte[] block, int blockSize) throws QiniuException { private void close() { try { stream.close(); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } } diff --git a/src/test/java/com/qiniu/streaming/StreamingTest.java b/src/test/java/com/qiniu/streaming/StreamingTest.java index 93596d8de..968aa568b 100644 --- a/src/test/java/com/qiniu/streaming/StreamingTest.java +++ b/src/test/java/com/qiniu/streaming/StreamingTest.java @@ -12,10 +12,19 @@ import static org.junit.Assert.*; /** - * Created by bailong on 16/9/22. + * Created by bailong on 16/9/22 */ public class StreamingTest { - private Auth auth = TestConfig.testAuth; + private Auth auth = null; + + { + try { + auth = Auth.create(System.getenv("ak"), System.getenv("sk")); + } catch (Exception e) { + auth = TestConfig.testAuth; + } + } + private String hub = "pilisdktest"; private String streamKeyPrefix = "pilijava" + System.currentTimeMillis(); private StreamingManager manager = new StreamingManager(auth, hub);