From acf0420cb2642482f8b36c7dd3030993d4f0ad1f Mon Sep 17 00:00:00 2001 From: jayzhenghan Date: Wed, 29 Jul 2020 17:45:16 +0800 Subject: [PATCH 01/13] add resumable download file --- qcloud_cos/cos_client.py | 27 +++- qcloud_cos/resumable_downloader.py | 190 +++++++++++++++++++++++++++++ qcloud_cos/streambody.py | 31 +++++ ut/test.py | 4 + 4 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 qcloud_cos/resumable_downloader.py diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 7028e73b..7bcc4833 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -25,6 +25,7 @@ from .cos_exception import CosServiceError from .version import __version__ from .select_event_stream import EventStream +from .resumable_downloader import ResumableDownLoader logger = logging.getLogger(__name__) @@ -185,7 +186,7 @@ def __init__(self, conf, retry=1, session=None): else: self._session = session - def get_conf(): + def get_conf(self): """获取配置""" return self._conf @@ -2942,6 +2943,30 @@ def _check_all_upload_parts(self, bucket, key, uploadid, local_path, parts_num, already_exist_parts[part_num] = part['ETag'] return True + def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAZThread=5, EnableCRC=False, **Kwargs): + """小于等于20MB的文件简单上传,大于20MB的文件使用分块上传 + + :param Bucket(string): 存储桶名称. + :param key(string): 分块上传路径名. + :param LocalFilePath(string): 本地文件路径名. + :param PartSize(int): 分块的大小设置,单位为MB. + :param MAXThread(int): 并发上传的最大线程数. + :param EnableCRC(bool): 校验下载文件与源文件是否一致 + :param kwargs(dict): 设置请求headers. + """ + logger.debug("Start to download file, bucket: {0}, key: {1}, dest_filename: {2}, part_size: {3}MB, " + "max_thread: {4}".format(Bucket, Key, DestFilePath, PartSize, MAZThread)) + + object_info = self.head_object(Bucket, Key) + file_size = object_info['Content-Length'] + if file_size <= 1024*1024*20: + response = self.get_object(Bucket, Key) + response['Body'].get_stream_to_file(DestFilePath) + return + + downloader = ResumableDownLoader(self, Bucket, Key, DestFilePath, object_info, PartSize, MAZThread, EnableCRC, **Kwargs) + downloader.start() + def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, EnableMD5=False, **kwargs): """小于等于20MB的文件简单上传,大于20MB的文件使用分块上传 diff --git a/qcloud_cos/resumable_downloader.py b/qcloud_cos/resumable_downloader.py new file mode 100644 index 00000000..b5a0cc18 --- /dev/null +++ b/qcloud_cos/resumable_downloader.py @@ -0,0 +1,190 @@ +import json +import os +import sys +import threading +import logging +import uuid +import hashlib +import crcmod +from .cos_comm import * +from .streambody import StreamBody +from .cos_threadpool import SimpleThreadPool +logger = logging.getLogger(__name__) + +class ResumableDownLoader(object): + def __init__(self, cos_client, bucket, key, dest_filename, object_info, part_size=20, max_thread=5, enable_crc=False, **kwargs): + self.__cos_client = cos_client + self.__bucket = bucket + self.__key = key + self.__dest_file_path = os.path.abspath(dest_filename) + self.__object_info = object_info + self.__max_thread = max_thread + self.__enable_crc = enable_crc + self.__headers = kwargs + + self.__max_part_count = 100 # 取决于服务端是否对并发有限制 + self.__min_part_size = 1024 * 1024 # 1M + self.__part_size = self.__determine_part_size_internal(int(object_info['Content-Length']), part_size) + self.__finished_parts = [] + self.__lock = threading.Lock() + self.__record = None #记录当前的上下文 + self.__dump_record_dir = os.path.join(os.path.expanduser('~'), '.cos_download_tmp_file') + + record_filename = self.__get_record_filename(bucket, key, self.__dest_file_path) + self.__record_filepath = os.path.join(self.__dump_record_dir, record_filename) + self.__tmp_file = None + + if not os.path.exists(self.__dump_record_dir): + os.makedirs(self.__dump_record_dir) + + logger.debug('resumale downloader init finish, bucket: {0}, key: {1}'.format(bucket, key)) + + def start(self): + logger.debug('start resumable downloade, bucket: {0}, key: {1}'.format(self.__bucket, self.__key)) + self.__load_record() # 从record文件中恢复读取上下文 + + assert self.__tmp_file + open(self.__tmp_file, 'a').close() + + parts_need_to_download = self.__get_parts_need_to_download() + logger.debug('parts_need_to_download: {0}'.format(parts_need_to_download)) + pool = SimpleThreadPool(self.__max_thread) + for part in parts_need_to_download: + part_range = "bytes=" + str(part.start) + "-" + str(part.start + part.length - 1) + headers = dict.copy(self.__headers) + headers["Range"] = part_range + pool.add_task(self.__download_part, part, headers) + + pool.wait_completion() + result = pool.get_result() + if not result['success_all']: + raise CosClientError('some upload_part fail after max_retry, please upload_file again') + + if os.path.exists(self.__dest_file_path): + os.remove(self.__dest_file_path) + os.rename(self.__tmp_file, self.__dest_file_path) + + if self.__enable_crc: + self.__check_crc() + + self.__del_record() + logger.debug('download success, bucket: {0}, key: {1}'.format(self.__bucket, self.__key)) + + def __get_record_filename(self, bucket, key, dest_file_path): + return '{0}_{1}.{2}'.format(self.__bucket, self.__key, get_md5(self.__dest_file_path)) + + def __determine_part_size_internal(self, file_size, part_size): + real_part_size = part_size * 1024 * 1024 # MB + if real_part_size < self.__min_part_size: + real_part_size = self.__min_part_size + + while real_part_size * self.__max_part_count < file_size: + real_part_size = real_part_size * 2 + logger.debug('finish to determine part size, file_size: {0}, part_size: {1}'.format(file_size, real_part_size)) + return real_part_size + + def __splite_to_parts(self): + parts = [] + file_size = int(self.__object_info['Content-Length']) + num_parts = (file_size + self.__part_size - 1) / self.__part_size + for i in range(num_parts): + start = i * self.__part_size + if i == num_parts - 1: + length = file_size - start + else: + length = self.__part_size + + parts.append(PartInfo(i + 1, start, length)) + return parts + + def __get_parts_need_to_download(self): + all_set = set(self.__splite_to_parts()) + logger.debug('all_set: {0}'.format(len(all_set))) + finished_set = set(self.__finished_parts) + logger.debug('finished_set: {0}'.format(len(finished_set))) + return list(all_set - finished_set) + + def __download_part(self, part, headers): + with open(self.__tmp_file, 'rb+') as f: + f.seek(part.start, 0) + range = None + traffic_limit = None + if 'Range' in headers: + range = headers['Range'] + + if 'TrafficLimit' in headers: + traffic_limit = headers['TrafficLimit'] + logger.debug("part_id: {0}, part_range: {1}, traffic_limit:{2}".format(part.part_id, range, traffic_limit)) + result = self.__cos_client.get_object(Bucket=self.__bucket, Key=self.__key, **headers) + result["Body"].pget_stream_to_file(f, part.start, part.length) + + self.__finish_part(part) + + def __finish_part(self, part): + logger.debug('download part finished,bucket: {0}, key: {1}, part_id: {2}'. + format(self.__bucket, self.__key, part.part_id)) + with self.__lock: + self.__finished_parts.append(part) + self.__record['parts'].append({'part_id': part.part_id, + 'start': part.start, + 'length': part.length}) + self.__dump_record(self.__record) + + def __dump_record(self, record): + with open(self.__record_filepath, 'w') as f: + json.dump(record, f) + logger.debug('dump record to {0}, bucket: {1}, key: {2}'. + format(self.__record_filepath, self.__bucket, self.__key)) + + def __load_record(self): + record = None + + if os.path.exists(self.__record_filepath): + with open(self.__record_filepath, 'r') as f: + record = json.load(f) + self.__part_size = record['part_size'] + self.__tmp_file = record['tmp_filename'] + if not os.path.exists(self.__tmp_file): + record = None + self.__tmp_file = None + else: + self.__finished_parts = list(PartInfo(p['part_id'], p['start'], p['length']) for p in record['parts']) + logger.debug('load record: finished parts nums: {0}'.format(len(self.__finished_parts))) + self.__record = record + + if not record: + self.__tmp_file = "{file_name}_{uuid}".format(file_name=self.__dest_file_path, uuid=uuid.uuid4().hex) + record = {'bucket': self.__bucket, 'key': self.__key, 'tmp_filename':self.__tmp_file, + 'part_size': self.__part_size, 'parts':[]} + self.__record = record + self.__dump_record(record) + + def __del_record(self): + os.remove(self.__record_filepath) + logger.debug('ResumableDownLoader delete record_file, path: {0}'.format(self.__record_filepath)) + + def __check_crc(self): + logger.debug('start to check crc') + c64 = crcmod.mkCrcFun(0x142F0E1EBA9EA3693L, initCrc=0L, xorOut=0xffffffffffffffffL, rev=True) + with open(self.__dest_file_path,'rb') as f: + local_crc64 = str(c64(f.read())) + object_crc64 = self.__object_info['x-cos-hash-crc64ecma'] + if local_crc64 is not None and object_crc64 is not None and local_crc64 != object_crc64: + raise CosClientError('crc of client: {0} is mismatch with cos: {1}'.format(local_crc64, object_crc64)) + +class PartInfo(object): + def __init__(self, part_id, start, length): + self.part_id = part_id + self.start = start + self.length = length + + def __eq__(self, other): + return self.__key() == other.__key() + + def __hash__(self): + return hash(self.__key()) + + def __key(self): + return self.part_id, self.start, self.length + + \ No newline at end of file diff --git a/qcloud_cos/streambody.py b/qcloud_cos/streambody.py index e373e807..3bce1055 100644 --- a/qcloud_cos/streambody.py +++ b/qcloud_cos/streambody.py @@ -53,3 +53,34 @@ def get_stream_to_file(self, file_name, auto_decompress=False): if os.path.exists(file_name): os.remove(file_name) os.rename(tmp_file_name, file_name) + + def pget_stream_to_file(self, fdst, offset, expected_len, auto_decompress=False): + """保存流到本地文件的offset偏移""" + use_chunked = False + use_encoding = False + if 'Transfer-Encoding' in self._rt.headers and self._rt.headers['Transfer-Encoding'] == "chunked": + use_chunked = True + elif 'Content-Length' not in self._rt.headers: + raise IOError("download failed without Content-Length header or Transfer-Encoding header") + + if 'Content-Encoding' in self._rt.headers: + use_encoding = True + read_len = 0 + fdst.seek(offset, 0) + + if use_encoding and not auto_decompress: + chunk = self._rt.raw.read(1024) + while chunk: + read_len += len(chunk) + fdst.write(chunk) + chunk = self._rt.raw.read(1024) + else: + for chunk in self._rt.iter_content(chunk_size=1024): + if chunk: + read_len += len(chunk) + fdst.write(chunk) + + + if not use_chunked and not (use_encoding and auto_decompress) and read_len != expected_len: + raise IOError("download failed with incomplete file") + diff --git a/ut/test.py b/ut/test.py index 2de93b58..a214e512 100644 --- a/ut/test.py +++ b/ut/test.py @@ -1164,6 +1164,10 @@ def _test_get_object_sensitive_content_recognition(): print(response) assert response +def test_download_file(): + """测试断点续传下载接口""" + client.download_file(test_bucket, test_object, 'test_download_file.local') + client.download_file(test_bucket, test_object, 'test_download_rraffic_limit.local', TrafficLimit=1024) if __name__ == "__main__": setUp() From cd6e19fcda5ab1d8e0a7df96d6afbdf6aee42241 Mon Sep 17 00:00:00 2001 From: jayzhenghan Date: Wed, 29 Jul 2020 17:45:16 +0800 Subject: [PATCH 02/13] add resumable download file --- qcloud_cos/cos_client.py | 27 +++- qcloud_cos/resumable_downloader.py | 192 +++++++++++++++++++++++++++++ qcloud_cos/streambody.py | 31 +++++ ut/test.py | 4 + 4 files changed, 253 insertions(+), 1 deletion(-) create mode 100644 qcloud_cos/resumable_downloader.py diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 7028e73b..7bcc4833 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -25,6 +25,7 @@ from .cos_exception import CosServiceError from .version import __version__ from .select_event_stream import EventStream +from .resumable_downloader import ResumableDownLoader logger = logging.getLogger(__name__) @@ -185,7 +186,7 @@ def __init__(self, conf, retry=1, session=None): else: self._session = session - def get_conf(): + def get_conf(self): """获取配置""" return self._conf @@ -2942,6 +2943,30 @@ def _check_all_upload_parts(self, bucket, key, uploadid, local_path, parts_num, already_exist_parts[part_num] = part['ETag'] return True + def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAZThread=5, EnableCRC=False, **Kwargs): + """小于等于20MB的文件简单上传,大于20MB的文件使用分块上传 + + :param Bucket(string): 存储桶名称. + :param key(string): 分块上传路径名. + :param LocalFilePath(string): 本地文件路径名. + :param PartSize(int): 分块的大小设置,单位为MB. + :param MAXThread(int): 并发上传的最大线程数. + :param EnableCRC(bool): 校验下载文件与源文件是否一致 + :param kwargs(dict): 设置请求headers. + """ + logger.debug("Start to download file, bucket: {0}, key: {1}, dest_filename: {2}, part_size: {3}MB, " + "max_thread: {4}".format(Bucket, Key, DestFilePath, PartSize, MAZThread)) + + object_info = self.head_object(Bucket, Key) + file_size = object_info['Content-Length'] + if file_size <= 1024*1024*20: + response = self.get_object(Bucket, Key) + response['Body'].get_stream_to_file(DestFilePath) + return + + downloader = ResumableDownLoader(self, Bucket, Key, DestFilePath, object_info, PartSize, MAZThread, EnableCRC, **Kwargs) + downloader.start() + def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, EnableMD5=False, **kwargs): """小于等于20MB的文件简单上传,大于20MB的文件使用分块上传 diff --git a/qcloud_cos/resumable_downloader.py b/qcloud_cos/resumable_downloader.py new file mode 100644 index 00000000..c20c6851 --- /dev/null +++ b/qcloud_cos/resumable_downloader.py @@ -0,0 +1,192 @@ +# -*- coding: utf-8 -*- + +import json +import os +import sys +import threading +import logging +import uuid +import hashlib +import crcmod +from .cos_comm import * +from .streambody import StreamBody +from .cos_threadpool import SimpleThreadPool +logger = logging.getLogger(__name__) + +class ResumableDownLoader(object): + def __init__(self, cos_client, bucket, key, dest_filename, object_info, part_size=20, max_thread=5, enable_crc=False, **kwargs): + self.__cos_client = cos_client + self.__bucket = bucket + self.__key = key + self.__dest_file_path = os.path.abspath(dest_filename) + self.__object_info = object_info + self.__max_thread = max_thread + self.__enable_crc = enable_crc + self.__headers = kwargs + + self.__max_part_count = 100 # 取决于服务端是否对并发有限制 + self.__min_part_size = 1024 * 1024 # 1M + self.__part_size = self.__determine_part_size_internal(int(object_info['Content-Length']), part_size) + self.__finished_parts = [] + self.__lock = threading.Lock() + self.__record = None #记录当前的上下文 + self.__dump_record_dir = os.path.join(os.path.expanduser('~'), '.cos_download_tmp_file') + + record_filename = self.__get_record_filename(bucket, key, self.__dest_file_path) + self.__record_filepath = os.path.join(self.__dump_record_dir, record_filename) + self.__tmp_file = None + + if not os.path.exists(self.__dump_record_dir): + os.makedirs(self.__dump_record_dir) + + logger.debug('resumale downloader init finish, bucket: {0}, key: {1}'.format(bucket, key)) + + def start(self): + logger.debug('start resumable downloade, bucket: {0}, key: {1}'.format(self.__bucket, self.__key)) + self.__load_record() # 从record文件中恢复读取上下文 + + assert self.__tmp_file + open(self.__tmp_file, 'a').close() + + parts_need_to_download = self.__get_parts_need_to_download() + logger.debug('parts_need_to_download: {0}'.format(parts_need_to_download)) + pool = SimpleThreadPool(self.__max_thread) + for part in parts_need_to_download: + part_range = "bytes=" + str(part.start) + "-" + str(part.start + part.length - 1) + headers = dict.copy(self.__headers) + headers["Range"] = part_range + pool.add_task(self.__download_part, part, headers) + + pool.wait_completion() + result = pool.get_result() + if not result['success_all']: + raise CosClientError('some upload_part fail after max_retry, please upload_file again') + + if os.path.exists(self.__dest_file_path): + os.remove(self.__dest_file_path) + os.rename(self.__tmp_file, self.__dest_file_path) + + if self.__enable_crc: + self.__check_crc() + + self.__del_record() + logger.debug('download success, bucket: {0}, key: {1}'.format(self.__bucket, self.__key)) + + def __get_record_filename(self, bucket, key, dest_file_path): + return '{0}_{1}.{2}'.format(self.__bucket, self.__key, get_md5(self.__dest_file_path)) + + def __determine_part_size_internal(self, file_size, part_size): + real_part_size = part_size * 1024 * 1024 # MB + if real_part_size < self.__min_part_size: + real_part_size = self.__min_part_size + + while real_part_size * self.__max_part_count < file_size: + real_part_size = real_part_size * 2 + logger.debug('finish to determine part size, file_size: {0}, part_size: {1}'.format(file_size, real_part_size)) + return real_part_size + + def __splite_to_parts(self): + parts = [] + file_size = int(self.__object_info['Content-Length']) + num_parts = (file_size + self.__part_size - 1) / self.__part_size + for i in range(num_parts): + start = i * self.__part_size + if i == num_parts - 1: + length = file_size - start + else: + length = self.__part_size + + parts.append(PartInfo(i + 1, start, length)) + return parts + + def __get_parts_need_to_download(self): + all_set = set(self.__splite_to_parts()) + logger.debug('all_set: {0}'.format(len(all_set))) + finished_set = set(self.__finished_parts) + logger.debug('finished_set: {0}'.format(len(finished_set))) + return list(all_set - finished_set) + + def __download_part(self, part, headers): + with open(self.__tmp_file, 'rb+') as f: + f.seek(part.start, 0) + range = None + traffic_limit = None + if 'Range' in headers: + range = headers['Range'] + + if 'TrafficLimit' in headers: + traffic_limit = headers['TrafficLimit'] + logger.debug("part_id: {0}, part_range: {1}, traffic_limit:{2}".format(part.part_id, range, traffic_limit)) + result = self.__cos_client.get_object(Bucket=self.__bucket, Key=self.__key, **headers) + result["Body"].pget_stream_to_file(f, part.start, part.length) + + self.__finish_part(part) + + def __finish_part(self, part): + logger.debug('download part finished,bucket: {0}, key: {1}, part_id: {2}'. + format(self.__bucket, self.__key, part.part_id)) + with self.__lock: + self.__finished_parts.append(part) + self.__record['parts'].append({'part_id': part.part_id, + 'start': part.start, + 'length': part.length}) + self.__dump_record(self.__record) + + def __dump_record(self, record): + with open(self.__record_filepath, 'w') as f: + json.dump(record, f) + logger.debug('dump record to {0}, bucket: {1}, key: {2}'. + format(self.__record_filepath, self.__bucket, self.__key)) + + def __load_record(self): + record = None + + if os.path.exists(self.__record_filepath): + with open(self.__record_filepath, 'r') as f: + record = json.load(f) + self.__part_size = record['part_size'] + self.__tmp_file = record['tmp_filename'] + if not os.path.exists(self.__tmp_file): + record = None + self.__tmp_file = None + else: + self.__finished_parts = list(PartInfo(p['part_id'], p['start'], p['length']) for p in record['parts']) + logger.debug('load record: finished parts nums: {0}'.format(len(self.__finished_parts))) + self.__record = record + + if not record: + self.__tmp_file = "{file_name}_{uuid}".format(file_name=self.__dest_file_path, uuid=uuid.uuid4().hex) + record = {'bucket': self.__bucket, 'key': self.__key, 'tmp_filename':self.__tmp_file, + 'part_size': self.__part_size, 'parts':[]} + self.__record = record + self.__dump_record(record) + + def __del_record(self): + os.remove(self.__record_filepath) + logger.debug('ResumableDownLoader delete record_file, path: {0}'.format(self.__record_filepath)) + + def __check_crc(self): + logger.debug('start to check crc') + c64 = crcmod.mkCrcFun(0x142F0E1EBA9EA3693L, initCrc=0L, xorOut=0xffffffffffffffffL, rev=True) + with open(self.__dest_file_path,'rb') as f: + local_crc64 = str(c64(f.read())) + object_crc64 = self.__object_info['x-cos-hash-crc64ecma'] + if local_crc64 is not None and object_crc64 is not None and local_crc64 != object_crc64: + raise CosClientError('crc of client: {0} is mismatch with cos: {1}'.format(local_crc64, object_crc64)) + +class PartInfo(object): + def __init__(self, part_id, start, length): + self.part_id = part_id + self.start = start + self.length = length + + def __eq__(self, other): + return self.__key() == other.__key() + + def __hash__(self): + return hash(self.__key()) + + def __key(self): + return self.part_id, self.start, self.length + + diff --git a/qcloud_cos/streambody.py b/qcloud_cos/streambody.py index e373e807..3bce1055 100644 --- a/qcloud_cos/streambody.py +++ b/qcloud_cos/streambody.py @@ -53,3 +53,34 @@ def get_stream_to_file(self, file_name, auto_decompress=False): if os.path.exists(file_name): os.remove(file_name) os.rename(tmp_file_name, file_name) + + def pget_stream_to_file(self, fdst, offset, expected_len, auto_decompress=False): + """保存流到本地文件的offset偏移""" + use_chunked = False + use_encoding = False + if 'Transfer-Encoding' in self._rt.headers and self._rt.headers['Transfer-Encoding'] == "chunked": + use_chunked = True + elif 'Content-Length' not in self._rt.headers: + raise IOError("download failed without Content-Length header or Transfer-Encoding header") + + if 'Content-Encoding' in self._rt.headers: + use_encoding = True + read_len = 0 + fdst.seek(offset, 0) + + if use_encoding and not auto_decompress: + chunk = self._rt.raw.read(1024) + while chunk: + read_len += len(chunk) + fdst.write(chunk) + chunk = self._rt.raw.read(1024) + else: + for chunk in self._rt.iter_content(chunk_size=1024): + if chunk: + read_len += len(chunk) + fdst.write(chunk) + + + if not use_chunked and not (use_encoding and auto_decompress) and read_len != expected_len: + raise IOError("download failed with incomplete file") + diff --git a/ut/test.py b/ut/test.py index 2de93b58..a214e512 100644 --- a/ut/test.py +++ b/ut/test.py @@ -1164,6 +1164,10 @@ def _test_get_object_sensitive_content_recognition(): print(response) assert response +def test_download_file(): + """测试断点续传下载接口""" + client.download_file(test_bucket, test_object, 'test_download_file.local') + client.download_file(test_bucket, test_object, 'test_download_rraffic_limit.local', TrafficLimit=1024) if __name__ == "__main__": setUp() From 058881d2eb442533ea25922dc64c07cf4bb82851 Mon Sep 17 00:00:00 2001 From: jayzhenghan Date: Thu, 30 Jul 2020 10:37:05 +0800 Subject: [PATCH 03/13] add funtion of resumable downloader --- qcloud_cos/resumable_downloader.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/qcloud_cos/resumable_downloader.py b/qcloud_cos/resumable_downloader.py index c20c6851..cf207a29 100644 --- a/qcloud_cos/resumable_downloader.py +++ b/qcloud_cos/resumable_downloader.py @@ -1,5 +1,8 @@ +<<<<<<< HEAD # -*- coding: utf-8 -*- +======= +>>>>>>> acf0420cb2642482f8b36c7dd3030993d4f0ad1f import json import os import sys @@ -189,4 +192,8 @@ def __hash__(self): def __key(self): return self.part_id, self.start, self.length +<<<<<<< HEAD +======= + +>>>>>>> acf0420cb2642482f8b36c7dd3030993d4f0ad1f From c924cc8f93b97bf21926b3f566d5c0533ba14dcc Mon Sep 17 00:00:00 2001 From: jayzhenghan Date: Thu, 30 Jul 2020 10:50:33 +0800 Subject: [PATCH 04/13] add funtion of resumable doweloader --- qcloud_cos/cos_client.py | 2 +- ut/test.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 7bcc4833..ef881d32 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -2944,7 +2944,7 @@ def _check_all_upload_parts(self, bucket, key, uploadid, local_path, parts_num, return True def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAZThread=5, EnableCRC=False, **Kwargs): - """小于等于20MB的文件简单上传,大于20MB的文件使用分块上传 + """小于等于20MB的文件简单下载,大于20MB的文件使用续传下载 :param Bucket(string): 存储桶名称. :param key(string): 分块上传路径名. diff --git a/ut/test.py b/ut/test.py index a214e512..d30bbee4 100644 --- a/ut/test.py +++ b/ut/test.py @@ -1167,7 +1167,8 @@ def _test_get_object_sensitive_content_recognition(): def test_download_file(): """测试断点续传下载接口""" client.download_file(test_bucket, test_object, 'test_download_file.local') - client.download_file(test_bucket, test_object, 'test_download_rraffic_limit.local', TrafficLimit=1024) + client.download_file(test_bucket, test_object, 'test_download_traffic_limit.local', TrafficLimit=1024) + client.download_file(test_bucket, test_object, 'test_download_crc.local', EnableCRC=True) if __name__ == "__main__": setUp() From c36f7a60cc1f29631f5cc099af000724b4491ec8 Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Thu, 30 Jul 2020 11:02:27 +0800 Subject: [PATCH 05/13] Update resumable_downloader.py --- qcloud_cos/resumable_downloader.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/qcloud_cos/resumable_downloader.py b/qcloud_cos/resumable_downloader.py index 5602567b..8d9933b8 100644 --- a/qcloud_cos/resumable_downloader.py +++ b/qcloud_cos/resumable_downloader.py @@ -188,13 +188,3 @@ def __hash__(self): def __key(self): return self.part_id, self.start, self.length - -<<<<<<< HEAD -<<<<<<< HEAD - -======= - ->>>>>>> acf0420cb2642482f8b36c7dd3030993d4f0ad1f -======= - ->>>>>>> acf0420cb2642482f8b36c7dd3030993d4f0ad1f From e47fbc0d4a71a0b8fe0d07247087095668b65aee Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Fri, 31 Jul 2020 11:40:15 +0800 Subject: [PATCH 06/13] Update test.py --- ut/test.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/ut/test.py b/ut/test.py index d30bbee4..a6bc6ed4 100644 --- a/ut/test.py +++ b/ut/test.py @@ -1166,9 +1166,51 @@ def _test_get_object_sensitive_content_recognition(): def test_download_file(): """测试断点续传下载接口""" + #测试普通下载 client.download_file(test_bucket, test_object, 'test_download_file.local') + if os.path.exists('test_download_file.local'): + os.remove('test_download_file.local') + + # 测试限速下载 client.download_file(test_bucket, test_object, 'test_download_traffic_limit.local', TrafficLimit=1024) + if os.path.exists('test_download_traffic_limit.local'): + os.remove('test_download_traffic_limit.local') + + # 测试crc64校验开关 client.download_file(test_bucket, test_object, 'test_download_crc.local', EnableCRC=True) + if os.path.exists('test_download_crc.local'): + os.remove('test_download_crc.local') + + # 测试源文件的md5与下载下来后的文件md5 + file_size = 25 # MB + file_id = str(random.randint(0, 1000)) + str(random.randint(0, 1000)) + file_name = "tmp" + file_id + "_" + str(file_size) + "MB" + gen_file(file_name, file_size) + + source_file_md5 = None + dest_file_md5 = None + with open(file_name, 'rb') as f: + source_file_md5 = get_raw_md5(f.read()) + + client.put_object_from_local_file( + Bucket=test_bucket, + LocalFilePath=file_name, + Key=file_name + ) + + client.download_file(test_bucket, file_name, 'test_download_md5.local') + if os.path.exists('test_download_md5.local'): + with open('test_download_md5.local', 'rb') as f: + dest_file_md5 = get_raw_md5(f.read()) + assert source_file_md5 and dest_file_md5 and source_file_md5 == dest_file_md5 + + # 释放资源 + client.delete_object( + Bucket=test_bucket, + Key=file_name + ) + if os.path.exists(file_name): + os.remove(file_name) if __name__ == "__main__": setUp() From 725fd52da0537c9237dd27564b8d192784b09fa5 Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Fri, 31 Jul 2020 11:42:11 +0800 Subject: [PATCH 07/13] Update cos_client.py --- qcloud_cos/cos_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index ef881d32..0b368e63 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -2960,7 +2960,7 @@ def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAZThread=5, Ena object_info = self.head_object(Bucket, Key) file_size = object_info['Content-Length'] if file_size <= 1024*1024*20: - response = self.get_object(Bucket, Key) + response = self.get_object(Bucket, Key, **Kwargs) response['Body'].get_stream_to_file(DestFilePath) return From 16d683d3dc4571e312e54c6544b5deceae51750c Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Tue, 4 Aug 2020 09:46:49 +0800 Subject: [PATCH 08/13] Update cos_client.py --- qcloud_cos/cos_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 0b368e63..43680238 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -2947,10 +2947,10 @@ def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAZThread=5, Ena """小于等于20MB的文件简单下载,大于20MB的文件使用续传下载 :param Bucket(string): 存储桶名称. - :param key(string): 分块上传路径名. - :param LocalFilePath(string): 本地文件路径名. - :param PartSize(int): 分块的大小设置,单位为MB. - :param MAXThread(int): 并发上传的最大线程数. + :param key(string): COS文件的路径名. + :param DestFilePath(string): 下载文件的目的路径. + :param PartSize(int): 分块下载的大小设置,单位为MB. + :param MAXThread(int): 并发下载的最大线程数. :param EnableCRC(bool): 校验下载文件与源文件是否一致 :param kwargs(dict): 设置请求headers. """ From 55951de0963234ffb42bf9195656dae3eaaa107a Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Tue, 4 Aug 2020 09:48:27 +0800 Subject: [PATCH 09/13] Update resumable_downloader.py --- qcloud_cos/resumable_downloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qcloud_cos/resumable_downloader.py b/qcloud_cos/resumable_downloader.py index 8d9933b8..92f255ec 100644 --- a/qcloud_cos/resumable_downloader.py +++ b/qcloud_cos/resumable_downloader.py @@ -60,7 +60,7 @@ def start(self): pool.wait_completion() result = pool.get_result() if not result['success_all']: - raise CosClientError('some upload_part fail after max_retry, please upload_file again') + raise CosClientError('some download_part fail after max_retry, please downloade_file again') if os.path.exists(self.__dest_file_path): os.remove(self.__dest_file_path) From ef557f815351d41cf9a9cc45e80d3c2283497d1f Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Tue, 4 Aug 2020 09:52:21 +0800 Subject: [PATCH 10/13] Update test.py --- ut/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ut/test.py b/ut/test.py index a6bc6ed4..5d74fdae 100644 --- a/ut/test.py +++ b/ut/test.py @@ -1172,7 +1172,7 @@ def test_download_file(): os.remove('test_download_file.local') # 测试限速下载 - client.download_file(test_bucket, test_object, 'test_download_traffic_limit.local', TrafficLimit=1024) + client.download_file(test_bucket, test_object, 'test_download_traffic_limit.local', TrafficLimit='819200') if os.path.exists('test_download_traffic_limit.local'): os.remove('test_download_traffic_limit.local') From 7edc78723dc8921378de8df94358c63b0403fd0b Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Tue, 4 Aug 2020 11:18:57 +0800 Subject: [PATCH 11/13] Update resumable_downloader.py --- qcloud_cos/resumable_downloader.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/qcloud_cos/resumable_downloader.py b/qcloud_cos/resumable_downloader.py index 92f255ec..46cf9de3 100644 --- a/qcloud_cos/resumable_downloader.py +++ b/qcloud_cos/resumable_downloader.py @@ -73,7 +73,8 @@ def start(self): logger.debug('download success, bucket: {0}, key: {1}'.format(self.__bucket, self.__key)) def __get_record_filename(self, bucket, key, dest_file_path): - return '{0}_{1}.{2}'.format(self.__bucket, self.__key, get_md5(self.__dest_file_path)) + dest_file_path_md5 = hashlib.md5(dest_file_path).hexdigest() + return '{0}_{1}.{2}'.format(self.__bucket, self.__key, dest_file_path_md5) def __determine_part_size_internal(self, file_size, part_size): real_part_size = part_size * 1024 * 1024 # MB From 6bda36376eb9befc7d668c0496b02b001b5265d5 Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Tue, 4 Aug 2020 11:21:08 +0800 Subject: [PATCH 12/13] Update test.py --- ut/test.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ut/test.py b/ut/test.py index 5d74fdae..12151f61 100644 --- a/ut/test.py +++ b/ut/test.py @@ -1167,17 +1167,17 @@ def _test_get_object_sensitive_content_recognition(): def test_download_file(): """测试断点续传下载接口""" #测试普通下载 - client.download_file(test_bucket, test_object, 'test_download_file.local') + client.download_file(copy_test_bucket, test_object, 'test_download_file.local') if os.path.exists('test_download_file.local'): os.remove('test_download_file.local') # 测试限速下载 - client.download_file(test_bucket, test_object, 'test_download_traffic_limit.local', TrafficLimit='819200') + client.download_file(copy_test_bucket, test_object, 'test_download_traffic_limit.local', TrafficLimit='819200') if os.path.exists('test_download_traffic_limit.local'): os.remove('test_download_traffic_limit.local') # 测试crc64校验开关 - client.download_file(test_bucket, test_object, 'test_download_crc.local', EnableCRC=True) + client.download_file(copy_test_bucket, test_object, 'test_download_crc.local', EnableCRC=True) if os.path.exists('test_download_crc.local'): os.remove('test_download_crc.local') @@ -1193,12 +1193,12 @@ def test_download_file(): source_file_md5 = get_raw_md5(f.read()) client.put_object_from_local_file( - Bucket=test_bucket, + Bucket=copy_test_bucket, LocalFilePath=file_name, Key=file_name ) - client.download_file(test_bucket, file_name, 'test_download_md5.local') + client.download_file(copy_test_bucket, file_name, 'test_download_md5.local') if os.path.exists('test_download_md5.local'): with open('test_download_md5.local', 'rb') as f: dest_file_md5 = get_raw_md5(f.read()) @@ -1206,7 +1206,7 @@ def test_download_file(): # 释放资源 client.delete_object( - Bucket=test_bucket, + Bucket=copy_test_bucket, Key=file_name ) if os.path.exists(file_name): @@ -1237,6 +1237,7 @@ def test_download_file(): test_put_get_delete_bucket_domain() test_select_object() _test_get_object_sensitive_content_recognition() + test_download_file() """ tearDown() From 76bc9aaefa560e141e741b3acb30da39bb7992e8 Mon Sep 17 00:00:00 2001 From: jayzhenghan <68948288+jayzhenghan@users.noreply.github.com> Date: Wed, 5 Aug 2020 11:41:06 +0800 Subject: [PATCH 13/13] Update resumable_downloader.py --- qcloud_cos/resumable_downloader.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/qcloud_cos/resumable_downloader.py b/qcloud_cos/resumable_downloader.py index 46cf9de3..03f6fca5 100644 --- a/qcloud_cos/resumable_downloader.py +++ b/qcloud_cos/resumable_downloader.py @@ -74,7 +74,8 @@ def start(self): def __get_record_filename(self, bucket, key, dest_file_path): dest_file_path_md5 = hashlib.md5(dest_file_path).hexdigest() - return '{0}_{1}.{2}'.format(self.__bucket, self.__key, dest_file_path_md5) + key_md5 = hashlib.md5(key).hexdigest() + return '{0}_{1}.{2}'.format(bucket, key_md5, dest_file_path_md5) def __determine_part_size_internal(self, file_size, part_size): real_part_size = part_size * 1024 * 1024 # MB @@ -145,11 +146,19 @@ def __load_record(self): if os.path.exists(self.__record_filepath): with open(self.__record_filepath, 'r') as f: record = json.load(f) + + ret = self.__check_record(record) + # record记录是否跟head object的一致,不一致则删除 + if ret == False: + self.__del_record() + record = None + else: self.__part_size = record['part_size'] self.__tmp_file = record['tmp_filename'] if not os.path.exists(self.__tmp_file): record = None self.__tmp_file = None + self.__del_record() else: self.__finished_parts = list(PartInfo(p['part_id'], p['start'], p['length']) for p in record['parts']) logger.debug('load record: finished parts nums: {0}'.format(len(self.__finished_parts))) @@ -158,14 +167,20 @@ def __load_record(self): if not record: self.__tmp_file = "{file_name}_{uuid}".format(file_name=self.__dest_file_path, uuid=uuid.uuid4().hex) record = {'bucket': self.__bucket, 'key': self.__key, 'tmp_filename':self.__tmp_file, - 'part_size': self.__part_size, 'parts':[]} + 'mtime':self.__object_info['Last-Modified'], 'etag':self.__object_info['ETag'], + 'file_size':self.__object_info['Content-Length'], 'part_size': self.__part_size, 'parts':[]} self.__record = record self.__dump_record(record) + def __check_record(self, record): + return record['etag'] == self.__object_info['ETag'] and\ + record['mtime'] == self.__object_info['Last-Modified'] and\ + record['file_size'] == self.__object_info['Content-Length'] + def __del_record(self): - os.remove(self.__record_filepath) - logger.debug('ResumableDownLoader delete record_file, path: {0}'.format(self.__record_filepath)) - + os.remove(self.__record_filepath) + logger.debug('ResumableDownLoader delete record_file, path: {0}'.format(self.__record_filepath)) + def __check_crc(self): logger.debug('start to check crc') c64 = crcmod.mkCrcFun(0x142F0E1EBA9EA3693L, initCrc=0L, xorOut=0xffffffffffffffffL, rev=True)