From 6c274a93481259088c0d0d0a720a8480c3a0d04d Mon Sep 17 00:00:00 2001 From: Sxci Date: Thu, 22 Dec 2016 12:36:27 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=20=E5=88=86=E7=89=87=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=EF=BC=8C=E6=AF=8F=E4=B8=80=E7=89=87=E6=B7=BB=E5=8A=A0=20crc32?= =?UTF-8?q?=20=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qiniu/storage/ResumeUploader.java | 39 +++++++++++++------ .../com/qiniu/storage/StreamUploader.java | 39 +++++++++++++------ 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/qiniu/storage/ResumeUploader.java b/src/main/java/com/qiniu/storage/ResumeUploader.java index b3d398f23..6cded46ce 100644 --- a/src/main/java/com/qiniu/storage/ResumeUploader.java +++ b/src/main/java/com/qiniu/storage/ResumeUploader.java @@ -6,6 +6,7 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.storage.model.ResumeBlockInfo; +import com.qiniu.util.Crc32; import com.qiniu.util.StringMap; import com.qiniu.util.StringUtils; import com.qiniu.util.UrlSafeBase64; @@ -45,6 +46,7 @@ public final class ResumeUploader { private final RecordHelper helper; private FileInputStream file; private String host; + private int retryMax; ResumeUploader(Client client, String upToken, String key, File file, StringMap params, String mime, Recorder recorder, Configuration configuration) { @@ -62,6 +64,7 @@ public final class ResumeUploader { this.recorder = recorder; this.modifyTime = f.lastModified(); helper = new RecordHelper(); + retryMax = configuration.retryMax; } public Response upload() throws QiniuException { @@ -91,8 +94,9 @@ public Response upload() throws QiniuException { throw new QiniuException(e); } -// long crc = Crc32.bytes(blockBuffer, 0, blockSize); + long crc = Crc32.bytes(blockBuffer, 0, blockSize); Response response = null; + QiniuException temp = null; try { response = makeBlock(blockBuffer, blockSize); } catch (QiniuException e) { @@ -101,25 +105,38 @@ public Response upload() throws QiniuException { } if (e.response == null || e.response.needRetry()) { retry = true; + temp = e; } else { close(); throw e; } } + + if (!retry) { + ResumeBlockInfo blockInfo0 = response.jsonToObject(ResumeBlockInfo.class); + if (blockInfo0.crc32 != crc) { + retry = true; + temp = new QiniuException(new Exception("block's crc32 is not match")); + } + } + if (retry) { - try { - response = makeBlock(blockBuffer, blockSize); - retry = false; - } catch (QiniuException e) { + if (retryMax > 0) { + retryMax--; + try { + response = makeBlock(blockBuffer, blockSize); + retry = false; + } catch (QiniuException e) { + close(); + throw e; + } + } else { close(); - throw e; + throw temp; } - } - ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class); - //TODO check return crc32 - // if blockInfo.crc32 != crc{} + ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class); contexts[contextIndex++] = blockInfo.ctx; uploaded += blockSize; helper.record(uploaded); @@ -147,7 +164,7 @@ private Response makeBlock(byte[] block, int blockSize) throws QiniuException { private void close() { try { file.close(); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } } diff --git a/src/main/java/com/qiniu/storage/StreamUploader.java b/src/main/java/com/qiniu/storage/StreamUploader.java index c600b63a0..0306c407a 100644 --- a/src/main/java/com/qiniu/storage/StreamUploader.java +++ b/src/main/java/com/qiniu/storage/StreamUploader.java @@ -5,6 +5,7 @@ import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.storage.model.ResumeBlockInfo; +import com.qiniu.util.Crc32; import com.qiniu.util.StringMap; import com.qiniu.util.StringUtils; import com.qiniu.util.UrlSafeBase64; @@ -28,6 +29,7 @@ public final class StreamUploader { private final InputStream stream; private long size; private String host; + private int retryMax; StreamUploader(Client client, String upToken, String key, InputStream stream, StringMap params, String mime, Configuration configuration) { @@ -40,6 +42,7 @@ public final class StreamUploader { this.contexts = new ArrayList<>(); this.blockBuffer = new byte[Constants.BLOCK_SIZE]; this.stream = stream; + retryMax = configuration.retryMax; } public Response upload() throws QiniuException { @@ -54,9 +57,11 @@ public Response upload() throws QiniuException { while (size == 0) { int bufferIndex = 0; + int blockSize = 0; while (ret != -1 && bufferIndex != blockBuffer.length) { try { - ret = stream.read(blockBuffer, bufferIndex, blockBuffer.length - bufferIndex); + blockSize = blockBuffer.length - bufferIndex; + ret = stream.read(blockBuffer, bufferIndex, blockSize); } catch (IOException e) { close(); throw new QiniuException(e); @@ -75,7 +80,9 @@ public Response upload() throws QiniuException { } } + long crc = Crc32.bytes(blockBuffer, 0, blockSize); Response response = null; + QiniuException temp = null; try { response = makeBlock(blockBuffer, bufferIndex); } catch (QiniuException e) { @@ -84,24 +91,34 @@ public Response upload() throws QiniuException { } if (e.response == null || e.response.needRetry()) { retry = true; + temp = e; } else { close(); throw e; } } + if (!retry) { + ResumeBlockInfo blockInfo0 = response.jsonToObject(ResumeBlockInfo.class); + if (blockInfo0.crc32 != crc) { + retry = true; + temp = new QiniuException(new Exception("block's crc32 is not match")); + } + } if (retry) { - try { - response = makeBlock(blockBuffer, bufferIndex); - retry = false; - } catch (QiniuException e) { - close(); - throw e; + if (retryMax > 0) { + retryMax--; + try { + response = makeBlock(blockBuffer, bufferIndex); + retry = false; + } catch (QiniuException e) { + close(); + throw e; + } + } else { + throw temp; } - } ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class); - //TODO check return crc32 - // if blockInfo.crc32 != crc{} contexts.add(blockInfo.ctx); uploaded += bufferIndex; } @@ -126,7 +143,7 @@ private Response makeBlock(byte[] block, int blockSize) throws QiniuException { private void close() { try { stream.close(); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } } From 407637d26cd1678cea592b4180373be2af6d7c30 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 9 Jan 2017 10:19:48 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=20=E8=8E=B7=E5=BE=97=E6=AD=A3=E7=A1=AE?= =?UTF-8?q?=E7=9A=84=20auth?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test/java/com/qiniu/streaming/StreamingTest.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/qiniu/streaming/StreamingTest.java b/src/test/java/com/qiniu/streaming/StreamingTest.java index 93596d8de..a442b1b70 100644 --- a/src/test/java/com/qiniu/streaming/StreamingTest.java +++ b/src/test/java/com/qiniu/streaming/StreamingTest.java @@ -15,7 +15,16 @@ * Created by bailong on 16/9/22. */ public class StreamingTest { - private Auth auth = TestConfig.testAuth; + private Auth auth = null; + + { + try { + auth = Auth.create(System.getenv("ak"), System.getenv("sk")); + } catch (Exception e) { + auth = TestConfig.testAuth; + } + } + private String hub = "pilisdktest"; private String streamKeyPrefix = "pilijava" + System.currentTimeMillis(); private StreamingManager manager = new StreamingManager(auth, hub); From aa8a9031676e8c583d515b79aebe10646f55e0f1 Mon Sep 17 00:00:00 2001 From: Sxci Date: Mon, 9 Jan 2017 10:21:12 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=20=E8=8E=B7=E5=BE=97=E6=AD=A3=E7=A1=AE?= =?UTF-8?q?=E7=9A=84=20auth=20[ci=20skip]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test/java/com/qiniu/streaming/StreamingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/qiniu/streaming/StreamingTest.java b/src/test/java/com/qiniu/streaming/StreamingTest.java index a442b1b70..968aa568b 100644 --- a/src/test/java/com/qiniu/streaming/StreamingTest.java +++ b/src/test/java/com/qiniu/streaming/StreamingTest.java @@ -12,7 +12,7 @@ import static org.junit.Assert.*; /** - * Created by bailong on 16/9/22. + * Created by bailong on 16/9/22 */ public class StreamingTest { private Auth auth = null;