Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Qcloud/Cos/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public function getObjectUrl($bucket, $key, $expires = null, array $args = array

public function upload($bucket, $key, $body, $options = array()) {
$body = Psr7\stream_for($body);
$options['PartSize'] = isset($options['PartSize']) ? $options['PartSize'] : MultipartUpload::MIN_PART_SIZE;
$options['PartSize'] = isset($options['PartSize']) ? $options['PartSize'] : MultipartUpload::DEFAULT_PART_SIZE;
if ($body->getSize() < $options['PartSize']) {
$rt = $this->putObject(array(
'Bucket' => $bucket,
Expand Down
11 changes: 4 additions & 7 deletions src/Qcloud/Cos/Copy.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
use GuzzleHttp\Pool;

class Copy {
/**
* const var: part size from 1MB to 5GB, and max parts of 10000 are allowed for each upload.
*/
const MIN_PART_SIZE = 1048576;
const MAX_PART_SIZE = 5368709120;
const DEFAULT_PART_SIZE = 52428800;
Expand Down Expand Up @@ -79,7 +76,7 @@ public function uploadParts($uploadId) {
'CopySource'=> $copySourcePath,
'CopySourceRange' => 'bytes='.((string)$offset).'-'.(string)($offset+$partSize - 1),
);
if(!isset($parts[$partNumber])) {
if(!isset($this->parts[$partNumber])) {
$command = $this->client->getCommand('uploadPartCopy', $params);
$request = $this->client->commandToRequestTransformer($command);
$this->commandList[$index] = $command;
Expand All @@ -103,14 +100,14 @@ public function uploadParts($uploadId) {
},

'rejected' => function ($reason, $index) {
$index = $index += 1;
$retry = 2;
for ($i = 1; $i <= $retry; $i++) {
$index = $index += 1;
try {
$rt =$this->client->execute($commandList[$index]);
$rt =$this->client->execute($this->commandList[$index]);
$part = array('PartNumber' => $index, 'ETag' => $rt['ETag']);
$this->parts[$index] = $part;
} catch(Exception $e) {
} catch(\Exception $e) {
if ($i == $retry) {
throw($e);
}
Expand Down
156 changes: 83 additions & 73 deletions src/Qcloud/Cos/MultipartUpload.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,65 +3,94 @@
namespace Qcloud\Cos;

use Qcloud\Cos\Exception\CosException;
use GuzzleHttp\Pool;

class MultipartUpload {
/**
* const var: part size from 1MB to 5GB, and max parts of 10000 are allowed for each upload.
*/
const MIN_PART_SIZE = 1048576;
const MAX_PART_SIZE = 5368709120;
const DEFAULT_PART_SIZE = 52428800;
const MAX_PARTS = 10000;

private $client;
private $body;
private $options;
private $partSize;
private $parts;
private $body;

public function __construct($client, $body, $options = array()) {
$this->client = $client;
$minPartSize = $options['PartSize'];
unset($options['PartSize']);
$this->body = $body;
$this->client = $client;
$this->options = $options;
$this->partSize = $this->calculatePartSize($options['PartSize']);
unset($options['PartSize']);
$this->partSize = $this->calculatePartSize($minPartSize);
$this->concurrency = isset($options['Concurrency']) ? $options['Concurrency'] : 10;
$this->parts = [];
$this->partNumberList = [];
}

public function performUploading() {
$rt = $this->initiateMultipartUpload();
$uploadId = $rt['UploadId'];
$partNumber = 1;
$parts = array();
for (;;) {
if ($this->body->eof()) {
break;
}
$body = $this->body->read($this->partSize);
if (empty($body)) {
break;
$uploadId= $this->initiateMultipartUpload();
$this->uploadParts($uploadId);
foreach ( $this->parts as $key => $row ){
$num1[$key] = $row ['PartNumber'];
$num2[$key] = $row ['ETag'];
}
array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
return $this->client->completeMultipartUpload(array(
'Bucket' => $this->options['Bucket'],
'Key' => $this->options['Key'],
'UploadId' => $uploadId,
'Parts' => $this->parts)
);

}
public function uploadParts($uploadId) {
$uploadRequests = function ($uploadId) {
$partNumber = 1;
$index = 1;
for ( ; ; $partNumber ++) {
if ($this->body->eof()) {
break;
}
$body = $this->body->read($this->partSize);
if (empty($body)) {
break;
}
if (isset($this->parts[$partNumber])) {
continue;
}
$this->partNumberList[$index] = $partNumber;
$params = array(
'Bucket' => $this->options['Bucket'],
'Key' => $this->options['Key'],
'UploadId' => $uploadId,
'PartNumber' => $partNumber,
'Body' => $body
);
if(!isset($this->parts[$partNumber])) {
$command = $this->client->getCommand('uploadPart', $params);
$request = $this->client->commandToRequestTransformer($command);
$index ++;
yield $request;
}
}
$result = $this->client->uploadPart(array(
'Bucket' => $this->options['Bucket'],
'Key' => $this->options['Key'],
'Body' => $body,
'UploadId' => $uploadId,
'PartNumber' => $partNumber));
if (md5($body) != substr($result['ETag'], 1, -1)){
throw new CosException("ETag check inconsistency");
};
$pool = new Pool($this->client->httpClient, $uploadRequests($uploadId), [
'concurrency' => $this->concurrency,
'fulfilled' => function ($response, $index) {
$index = $index + 1;
$partNumber = $this->partNumberList[$index];
$etag = $response->getHeaders()["ETag"][0];
$part = array('PartNumber' => $partNumber, 'ETag' => $etag);
$this->parts[$partNumber] = $part;
},

'rejected' => function ($reason, $index) {
throw($reason);
}
$part = array('PartNumber' => $partNumber, 'ETag' => $result['ETag']);
array_push($parts, $part);
++$partNumber;
}
try {
$rt = $this->client->completeMultipartUpload(array(
'Bucket' => $this->options['Bucket'],
'Key' => $this->options['Key'],
'UploadId' => $uploadId,
'Parts' => $parts));
} catch(\Exception $e){
throw $e;
}
return $rt;
]);
$promise = $pool->promise();
$promise->wait();
}

public function resumeUploading() {
Expand All @@ -73,54 +102,35 @@ public function resumeUploading() {
$parts = array();
if (count($rt['Parts']) > 0) {
foreach ($rt['Parts'] as $part) {
$parts[$part['PartNumber'] - 1] = array('PartNumber' => $part['PartNumber'], 'ETag' => $part['ETag']);
$this->parts[$part['PartNumber']] = array('PartNumber' => $part['PartNumber'], 'ETag' => $part['ETag']);
}
}
for ($partNumber = 1;;++$partNumber) {
if ($this->body->eof()) {
break;
}
$body = $this->body->read($this->partSize);

if (array_key_exists($partNumber-1, $parts)){

if (md5($body) != substr($parts[$partNumber-1]['ETag'], 1, -1)){
throw new CosException("ETag check inconsistency");
}
continue;
}

$result = $this->client->uploadPart(array(
'Bucket' => $this->options['Bucket'],
'Key' => $this->options['Key'],
'Body' => $body,
'UploadId' => $uploadId,
'PartNumber' => $partNumber));
if (md5($body) != substr($result['ETag'], 1, -1)){
throw new CosException("ETag check inconsistency");
}
$parts[$partNumber-1] = array('PartNumber' => $partNumber, 'ETag' => $result['ETag']);

$this->uploadParts($uploadId);
foreach ( $this->parts as $key => $row ){
$num1[$key] = $row ['PartNumber'];
$num2[$key] = $row ['ETag'];
}
$rt = $this->client->completeMultipartUpload(array(
array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
return $this->client->completeMultipartUpload(array(
'Bucket' => $this->options['Bucket'],
'Key' => $this->options['Key'],
'UploadId' => $uploadId,
'Parts' => $parts));
return $rt;
'Parts' => $this->parts)
);
}

private function calculatePartSize($minPartSize) {
private function calculatePartSize($minPartSize)
{
$partSize = intval(ceil(($this->body->getSize() / self::MAX_PARTS)));
$partSize = max($minPartSize, $partSize);
$partSize = min($partSize, self::MAX_PART_SIZE);
$partSize = max($partSize, self::MIN_PART_SIZE);

return $partSize;
}

private function initiateMultipartUpload() {
$result = $this->client->createMultipartUpload($this->options);
return $result;
return $result['UploadId'];
}

}
12 changes: 8 additions & 4 deletions src/Qcloud/Cos/Service.php
Original file line number Diff line number Diff line change
Expand Up @@ -3430,13 +3430,17 @@ public static function getService() {
'type' => 'object',
'properties' => array(
'Status' => array(
'type' => 'string'),
'type' => 'string'
),
'Name' => array(
'type' => 'string'),
'type' => 'string'
),
'Type' => array(
'type' => 'string'),
'type' => 'string'
),
'ForcedReplacement' => array(
'type' => 'string'),
'type' => 'string'
),
),
),
),
Expand Down