From 63a979d846c176ab94d44c9fa98ee61b27e880ce Mon Sep 17 00:00:00 2001 From: lewzylu Date: Wed, 5 Aug 2020 20:47:06 +0800 Subject: [PATCH] feature-multi-thread-upload --- src/Qcloud/Cos/Client.php | 2 +- src/Qcloud/Cos/Copy.php | 11 +- src/Qcloud/Cos/MultipartUpload.php | 156 +++++++++++++++-------------- src/Qcloud/Cos/Service.php | 12 ++- 4 files changed, 96 insertions(+), 85 deletions(-) diff --git a/src/Qcloud/Cos/Client.php b/src/Qcloud/Cos/Client.php index ae292639..9dc25dc6 100644 --- a/src/Qcloud/Cos/Client.php +++ b/src/Qcloud/Cos/Client.php @@ -151,7 +151,7 @@ public function getObjectUrl($bucket, $key, $expires = null, array $args = array public function upload($bucket, $key, $body, $options = array()) { $body = Psr7\stream_for($body); - $options['PartSize'] = isset($options['PartSize']) ? $options['PartSize'] : MultipartUpload::MIN_PART_SIZE; + $options['PartSize'] = isset($options['PartSize']) ? $options['PartSize'] : MultipartUpload::DEFAULT_PART_SIZE; if ($body->getSize() < $options['PartSize']) { $rt = $this->putObject(array( 'Bucket' => $bucket, diff --git a/src/Qcloud/Cos/Copy.php b/src/Qcloud/Cos/Copy.php index 98ed075d..661df592 100644 --- a/src/Qcloud/Cos/Copy.php +++ b/src/Qcloud/Cos/Copy.php @@ -6,9 +6,6 @@ use GuzzleHttp\Pool; class Copy { - /** - * const var: part size from 1MB to 5GB, and max parts of 10000 are allowed for each upload. - */ const MIN_PART_SIZE = 1048576; const MAX_PART_SIZE = 5368709120; const DEFAULT_PART_SIZE = 52428800; @@ -79,7 +76,7 @@ public function uploadParts($uploadId) { 'CopySource'=> $copySourcePath, 'CopySourceRange' => 'bytes='.((string)$offset).'-'.(string)($offset+$partSize - 1), ); - if(!isset($parts[$partNumber])) { + if(!isset($this->parts[$partNumber])) { $command = $this->client->getCommand('uploadPartCopy', $params); $request = $this->client->commandToRequestTransformer($command); $this->commandList[$index] = $command; @@ -103,14 +100,14 @@ public function uploadParts($uploadId) { }, 'rejected' => function ($reason, $index) { + $index = $index += 1; $retry = 2; for ($i = 1; $i <= $retry; $i++) { - $index = $index += 1; try { - $rt =$this->client->execute($commandList[$index]); + $rt =$this->client->execute($this->commandList[$index]); $part = array('PartNumber' => $index, 'ETag' => $rt['ETag']); $this->parts[$index] = $part; - } catch(Exception $e) { + } catch(\Exception $e) { if ($i == $retry) { throw($e); } diff --git a/src/Qcloud/Cos/MultipartUpload.php b/src/Qcloud/Cos/MultipartUpload.php index 94cc5bbd..604ffe4d 100644 --- a/src/Qcloud/Cos/MultipartUpload.php +++ b/src/Qcloud/Cos/MultipartUpload.php @@ -3,65 +3,94 @@ namespace Qcloud\Cos; use Qcloud\Cos\Exception\CosException; +use GuzzleHttp\Pool; class MultipartUpload { - /** - * const var: part size from 1MB to 5GB, and max parts of 10000 are allowed for each upload. - */ const MIN_PART_SIZE = 1048576; const MAX_PART_SIZE = 5368709120; const DEFAULT_PART_SIZE = 52428800; const MAX_PARTS = 10000; private $client; - private $body; private $options; private $partSize; + private $parts; + private $body; public function __construct($client, $body, $options = array()) { - $this->client = $client; + $minPartSize = $options['PartSize']; + unset($options['PartSize']); $this->body = $body; + $this->client = $client; $this->options = $options; - $this->partSize = $this->calculatePartSize($options['PartSize']); - unset($options['PartSize']); + $this->partSize = $this->calculatePartSize($minPartSize); + $this->concurrency = isset($options['Concurrency']) ? $options['Concurrency'] : 10; + $this->parts = []; + $this->partNumberList = []; } - public function performUploading() { - $rt = $this->initiateMultipartUpload(); - $uploadId = $rt['UploadId']; - $partNumber = 1; - $parts = array(); - for (;;) { - if ($this->body->eof()) { - break; - } - $body = $this->body->read($this->partSize); - if (empty($body)) { - break; + $uploadId= $this->initiateMultipartUpload(); + $this->uploadParts($uploadId); + foreach ( $this->parts as $key => $row ){ + $num1[$key] = $row ['PartNumber']; + $num2[$key] = $row ['ETag']; + } + array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts); + return $this->client->completeMultipartUpload(array( + 'Bucket' => $this->options['Bucket'], + 'Key' => $this->options['Key'], + 'UploadId' => $uploadId, + 'Parts' => $this->parts) + ); + + } + public function uploadParts($uploadId) { + $uploadRequests = function ($uploadId) { + $partNumber = 1; + $index = 1; + for ( ; ; $partNumber ++) { + if ($this->body->eof()) { + break; + } + $body = $this->body->read($this->partSize); + if (empty($body)) { + break; + } + if (isset($this->parts[$partNumber])) { + continue; + } + $this->partNumberList[$index] = $partNumber; + $params = array( + 'Bucket' => $this->options['Bucket'], + 'Key' => $this->options['Key'], + 'UploadId' => $uploadId, + 'PartNumber' => $partNumber, + 'Body' => $body + ); + if(!isset($this->parts[$partNumber])) { + $command = $this->client->getCommand('uploadPart', $params); + $request = $this->client->commandToRequestTransformer($command); + $index ++; + yield $request; + } } - $result = $this->client->uploadPart(array( - 'Bucket' => $this->options['Bucket'], - 'Key' => $this->options['Key'], - 'Body' => $body, - 'UploadId' => $uploadId, - 'PartNumber' => $partNumber)); - if (md5($body) != substr($result['ETag'], 1, -1)){ - throw new CosException("ETag check inconsistency"); + }; + $pool = new Pool($this->client->httpClient, $uploadRequests($uploadId), [ + 'concurrency' => $this->concurrency, + 'fulfilled' => function ($response, $index) { + $index = $index + 1; + $partNumber = $this->partNumberList[$index]; + $etag = $response->getHeaders()["ETag"][0]; + $part = array('PartNumber' => $partNumber, 'ETag' => $etag); + $this->parts[$partNumber] = $part; + }, + + 'rejected' => function ($reason, $index) { + throw($reason); } - $part = array('PartNumber' => $partNumber, 'ETag' => $result['ETag']); - array_push($parts, $part); - ++$partNumber; - } - try { - $rt = $this->client->completeMultipartUpload(array( - 'Bucket' => $this->options['Bucket'], - 'Key' => $this->options['Key'], - 'UploadId' => $uploadId, - 'Parts' => $parts)); - } catch(\Exception $e){ - throw $e; - } - return $rt; + ]); + $promise = $pool->promise(); + $promise->wait(); } public function resumeUploading() { @@ -73,54 +102,35 @@ public function resumeUploading() { $parts = array(); if (count($rt['Parts']) > 0) { foreach ($rt['Parts'] as $part) { - $parts[$part['PartNumber'] - 1] = array('PartNumber' => $part['PartNumber'], 'ETag' => $part['ETag']); + $this->parts[$part['PartNumber']] = array('PartNumber' => $part['PartNumber'], 'ETag' => $part['ETag']); } } - for ($partNumber = 1;;++$partNumber) { - if ($this->body->eof()) { - break; - } - $body = $this->body->read($this->partSize); - - if (array_key_exists($partNumber-1, $parts)){ - - if (md5($body) != substr($parts[$partNumber-1]['ETag'], 1, -1)){ - throw new CosException("ETag check inconsistency"); - } - continue; - } - - $result = $this->client->uploadPart(array( - 'Bucket' => $this->options['Bucket'], - 'Key' => $this->options['Key'], - 'Body' => $body, - 'UploadId' => $uploadId, - 'PartNumber' => $partNumber)); - if (md5($body) != substr($result['ETag'], 1, -1)){ - throw new CosException("ETag check inconsistency"); - } - $parts[$partNumber-1] = array('PartNumber' => $partNumber, 'ETag' => $result['ETag']); - + $this->uploadParts($uploadId); + foreach ( $this->parts as $key => $row ){ + $num1[$key] = $row ['PartNumber']; + $num2[$key] = $row ['ETag']; } - $rt = $this->client->completeMultipartUpload(array( + array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts); + return $this->client->completeMultipartUpload(array( 'Bucket' => $this->options['Bucket'], 'Key' => $this->options['Key'], 'UploadId' => $uploadId, - 'Parts' => $parts)); - return $rt; + 'Parts' => $this->parts) + ); } - private function calculatePartSize($minPartSize) { + private function calculatePartSize($minPartSize) + { $partSize = intval(ceil(($this->body->getSize() / self::MAX_PARTS))); $partSize = max($minPartSize, $partSize); $partSize = min($partSize, self::MAX_PART_SIZE); $partSize = max($partSize, self::MIN_PART_SIZE); - return $partSize; } private function initiateMultipartUpload() { $result = $this->client->createMultipartUpload($this->options); - return $result; + return $result['UploadId']; } + } diff --git a/src/Qcloud/Cos/Service.php b/src/Qcloud/Cos/Service.php index 4705c0c1..2b805a3e 100644 --- a/src/Qcloud/Cos/Service.php +++ b/src/Qcloud/Cos/Service.php @@ -3430,13 +3430,17 @@ public static function getService() { 'type' => 'object', 'properties' => array( 'Status' => array( - 'type' => 'string'), + 'type' => 'string' + ), 'Name' => array( - 'type' => 'string'), + 'type' => 'string' + ), 'Type' => array( - 'type' => 'string'), + 'type' => 'string' + ), 'ForcedReplacement' => array( - 'type' => 'string'), + 'type' => 'string' + ), ), ), ),