diff --git a/README.rst b/README.rst index 176fa0e2..90936ff8 100644 --- a/README.rst +++ b/README.rst @@ -26,13 +26,12 @@ cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224 .. code:: python - # 设置用户属性, 包括appid, secret_id, secret_key, region - appid = '100000' # 替换为用户的appid + # 设置用户属性, 包括secret_id, secret_key, region secret_id = 'xxxxxxxx' # 替换为用户的secret_id secret_key = 'xxxxxxx' # 替换为用户的secret_key   region = 'ap-beiging-1'   # 替换为用户的region token = '' # 使用临时秘钥需要传入Token,默认为空,可不填 - config = CosConfig(Appid=appid, Region=region, Access_id=secret_id, Access_key=secret_key, Token=token) #获取配置对象 + config = CosConfig(Region=region, Access_id=secret_id, Access_key=secret_key, Token=token) #获取配置对象 client = CosS3Client(config) #获取客户端对象 @@ -41,7 +40,7 @@ cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224 ############################################################################ # 1. 上传单个文件 response = client.put_object( - Bucket='test01', + Bucket='test01-123456789', Body='TY'*1024*512*file_size, Key=file_name, CacheControl='no-cache', @@ -50,46 +49,46 @@ cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224 # 2. 下载单个文件 response = client.get_object( - Bucket='test01', + Bucket='test01-123456789', Key=file_name, ) # 3. 获取文件属性 response = client.head_object( - Bucket='test01', + Bucket='test01-123456789', Key=file_name ) # 4. 删除单个文件 response = client.delete_object( - Bucket='test01', + Bucket='test01-123456789', Key=file_name ) # 5. 创建分片上传 response = client.create_multipart_upload( - Bucket='test01', + Bucket='test01-123456789', Key='multipartfile.txt', ) uploadid = get_id_from_xml(response.text) # 6. 删除分片上传 response = client.abort_multipart_upload( - Bucket='test01', + Bucket='test01-123456789', Key='multipartfile.txt', UploadId=uploadid ) # 7. 再次创建分片上传 response = client.create_multipart_upload( - Bucket='test01', + Bucket='test01-123456789', Key='multipartfile.txt', ) uploadid = response['UploadId'] # 8. 上传分片 response = client.upload_part( - Bucket='test01', + Bucket='test01-123456789', Key='multipartfile.txt', UploadId=uploadid, PartNumber=1, @@ -99,7 +98,7 @@ cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224 # 9. 列出分片 response = clieent.list_parts( - Bucket='test01', + Bucket='test01-123456789', Key='mutilpartfile.txt', UploadId=uploadid ) @@ -107,7 +106,7 @@ cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224 # 10. 完成分片上传 response = client.complete_multipart_upload( - Bucket='test01', + Bucket='test01-123456789', Key='multipartfile.txt', UploadId=uploadid, MultipartUpload={'Part': lst} # 超过1000个分块,请本地保存分块信息,再complete @@ -119,16 +118,16 @@ cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224 ############################################################################ # 1. 创建Bucket response = client.create_bucket( - Bucket='test02', + Bucket='test02-123456789', ACL='public-read' ) # 2. 删除Bucket response = client.delete_bucket( - Bucket='test02' + Bucket='test02-123456789' ) # 3. 获取文件列表 response = client.list_objects( - Bucket='test01' + Bucket='test01-123456789' ) diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 9ea8e3bd..81bf5c23 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -16,6 +16,8 @@ from xml2dict import Xml2Dict from dicttoxml import dicttoxml from cos_auth import CosS3Auth +from cos_comm import * +from cos_threadpool import SimpleThreadPool from cos_exception import CosClientError from cos_exception import CosServiceError @@ -29,194 +31,56 @@ reload(sys) sys.setdefaultencoding('utf-8') -# kwargs中params到http headers的映射 -maplist = { - 'ContentLength': 'Content-Length', - 'ContentMD5': 'Content-MD5', - 'ContentType': 'Content-Type', - 'CacheControl': 'Cache-Control', - 'ContentDisposition': 'Content-Disposition', - 'ContentEncoding': 'Content-Encoding', - 'ContentLanguage': 'Content-Language', - 'Expires': 'Expires', - 'ResponseContentType': 'response-content-type', - 'ResponseContentLanguage': 'response-content-language', - 'ResponseExpires': 'response-expires', - 'ResponseCacheControl': 'response-cache-control', - 'ResponseContentDisposition': 'response-content-disposition', - 'ResponseContentEncoding': 'response-content-encoding', - 'Metadata': 'Metadata', - 'ACL': 'x-cos-acl', - 'GrantFullControl': 'x-cos-grant-full-control', - 'GrantWrite': 'x-cos-grant-write', - 'GrantRead': 'x-cos-grant-read', - 'StorageClass': 'x-cos-storage-class', - 'Range': 'Range', - 'IfMatch': 'If-Match', - 'IfNoneMatch': 'If-None-Match', - 'IfModifiedSince': 'If-Modified-Since', - 'IfUnmodifiedSince': 'If-Unmodified-Since', - 'CopySourceIfMatch': 'x-cos-copy-source-If-Match', - 'CopySourceIfNoneMatch': 'x-cos-copy-source-If-None-Match', - 'CopySourceIfModifiedSince': 'x-cos-copy-source-If-Modified-Since', - 'CopySourceIfUnmodifiedSince': 'x-cos-copy-source-If-Unmodified-Since', - 'VersionId': 'x-cos-version-id', - } - - -def to_unicode(s): - if isinstance(s, unicode): - return s - else: - return s.decode('utf-8') - - -def get_md5(data): - m2 = hashlib.md5(data) - MD5 = base64.standard_b64encode(m2.digest()) - return MD5 - - -def dict_to_xml(data): - """V5使用xml格式,将输入的dict转换为xml""" - doc = xml.dom.minidom.Document() - root = doc.createElement('CompleteMultipartUpload') - doc.appendChild(root) - - if 'Part' not in data.keys(): - raise CosClientError("Invalid Parameter, Part Is Required!") - - for i in data['Part']: - nodePart = doc.createElement('Part') - - if 'PartNumber' not in i.keys(): - raise CosClientError("Invalid Parameter, PartNumber Is Required!") - - nodeNumber = doc.createElement('PartNumber') - nodeNumber.appendChild(doc.createTextNode(str(i['PartNumber']))) - - if 'ETag' not in i.keys(): - raise CosClientError("Invalid Parameter, ETag Is Required!") - - nodeETag = doc.createElement('ETag') - nodeETag.appendChild(doc.createTextNode(str(i['ETag']))) - - nodePart.appendChild(nodeNumber) - nodePart.appendChild(nodeETag) - root.appendChild(nodePart) - return doc.toxml('utf-8') - - -def xml_to_dict(data, origin_str="", replace_str=""): - """V5使用xml格式,将response中的xml转换为dict""" - root = xml.etree.ElementTree.fromstring(data) - xmldict = Xml2Dict(root) - xmlstr = str(xmldict) - xmlstr = xmlstr.replace("{http://www.qcloud.com/document/product/436/7751}", "") - xmlstr = xmlstr.replace("{http://www.w3.org/2001/XMLSchema-instance}", "") - if origin_str: - xmlstr = xmlstr.replace(origin_str, replace_str) - xmldict = eval(xmlstr) - return xmldict - - -def get_id_from_xml(data, name): - """解析xml中的特定字段""" - tree = xml.dom.minidom.parseString(data) - root = tree.documentElement - result = root.getElementsByTagName(name) - # use childNodes to get a list, if has no child get itself - return result[0].childNodes[0].nodeValue - - -def mapped(headers): - """S3到COS参数的一个映射""" - _headers = dict() - for i in headers.keys(): - if i in maplist: - _headers[maplist[i]] = headers[i] - else: - raise CosClientError('No Parameter Named '+i+' Please Check It') - return _headers - - -def format_xml(data, root, lst=list()): - """将dict转换为xml""" - xml_config = dicttoxml(data, item_func=lambda x: x, custom_root=root, attr_type=False) - for i in lst: - xml_config = xml_config.replace(i+i, i) - return xml_config - - -def format_region(region): - """格式化地域""" - if region.find('cos.') != -1: - return region # 传入cos.ap-beijing-1这样显示加上cos.的region - if region == 'cn-north' or region == 'cn-south' or region == 'cn-east' or region == 'cn-south-2' or region == 'cn-southwest' or region == 'sg': - return region # 老域名不能加cos. - # 支持v4域名映射到v5 - if region == 'cossh': - return 'cos.ap-shanghai' - if region == 'cosgz': - return 'cos.ap-guangzhou' - if region == 'cosbj': - return 'cos.ap-beijing' - if region == 'costj': - return 'cos.ap-beijing-1' - if region == 'coscd': - return 'cos.ap-chengdu' - if region == 'cossgp': - return 'cos.ap-singapore' - if region == 'coshk': - return 'cos.ap-hongkong' - if region == 'cosca': - return 'cos.na-toronto' - if region == 'cosger': - return 'cos.eu-frankfurt' - - return 'cos.' + region # 新域名加上cos. - class CosConfig(object): """config类,保存用户相关信息""" - def __init__(self, Appid, Region, Access_id, Access_key, Token=None): + def __init__(self, Region, Access_id, Access_key, Appid='', Scheme='http', Token=None, Timeout=None): """初始化,保存用户的信息 :param Appid(string): 用户APPID. :param Region(string): 地域信息. :param Access_id(string): 秘钥SecretId. :param Access_key(string): 秘钥SecretKey. + :param Scheme(string): http/https. :param Token(string): 临时秘钥使用的token. + :param Timeout(int): http超时时间. """ self._appid = Appid self._region = format_region(Region) self._access_id = Access_id self._access_key = Access_key + self._scheme = Scheme self._token = Token + self._timeout = Timeout logger.info("config parameter-> appid: {appid}, region: {region}".format( appid=Appid, region=Region)) - def uri(self, bucket, path=None): + def uri(self, bucket, path=None, scheme=None, region=None): """拼接url :param bucket(string): 存储桶名称. :param path(string): 请求COS的路径. :return(string): 请求COS的URL地址. """ + bucket = format_bucket(bucket, self._appid) + if scheme is None: + scheme = self._scheme + if region is None: + region = self._region if path: if path[0] == '/': path = path[1:] - url = u"http://{bucket}-{uid}.{region}.myqcloud.com/{path}".format( + url = u"{scheme}://{bucket}.{region}.myqcloud.com/{path}".format( + scheme=scheme, bucket=to_unicode(bucket), - uid=self._appid, - region=self._region, + region=region, path=to_unicode(path) ) else: - url = u"http://{bucket}-{uid}.{region}.myqcloud.com/".format( + url = u"{scheme}://{bucket}.{region}.myqcloud.com/".format( + scheme=self._scheme, bucket=to_unicode(bucket), - uid=self._appid, region=self._region ) return url @@ -238,7 +102,7 @@ def __init__(self, conf, retry=1, session=None): else: self._session = session - def get_auth(self, Method, Bucket, Key='', Expired=300, headers={}, params={}): + def get_auth(self, Method, Bucket, Key='', Expired=300, Headers={}, Params={}): """获取签名 :param Method(string): http method,如'PUT','GET'. @@ -250,12 +114,14 @@ def get_auth(self, Method, Bucket, Key='', Expired=300, headers={}, params={}): :return (string): 计算出的V5签名. """ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')) - r = Request(Method, url, headers=headers, params=params) - auth = CosS3Auth(self._conf._access_id, self._conf._access_key, Key, params, Expired) + r = Request(Method, url, headers=Headers, params=Params) + auth = CosS3Auth(self._conf._access_id, self._conf._access_key, Key, Params, Expired) return auth(r).headers['Authorization'] def send_request(self, method, url, timeout=30, **kwargs): """封装request库发起http请求""" + if self._conf._timeout is not None: # 用户自定义超时时间 + timeout = self._conf._timeout if self._conf._token is not None: kwargs['headers']['x-cos-security-token'] = self._conf._token kwargs['headers']['User-Agent'] = 'cos-python-sdk-v5' @@ -314,6 +180,7 @@ def put_object(self, Bucket, Body, Key, **kwargs): logger.info("put object, url=:{url} ,headers=:{headers}".format( url=url, headers=headers)) + Body = deal_with_empty_file_stream(Body) rt = self.send_request( method='PUT', url=url, @@ -333,6 +200,11 @@ def get_object(self, Bucket, Key, **kwargs): :return(dict): 下载成功返回的结果,包含Body对应的StreamBody,可以获取文件流或下载文件到本地. """ headers = mapped(kwargs) + params = {} + for key in headers.keys(): + if key.startswith("response"): + params[key] = headers[key] + headers.pop(key) url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')) logger.info("get object, url=:{url} ,headers=:{headers}".format( url=url, @@ -342,6 +214,7 @@ def get_object(self, Bucket, Key, **kwargs): url=url, stream=True, auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key), + params=params, headers=headers) response = rt.headers @@ -382,6 +255,40 @@ def delete_object(self, Bucket, Key, **kwargs): headers=headers) return None + def delete_objects(self, Bucket, Delete={}, **kwargs): + """文件批量删除接口,单次最多支持1000个object + + :param Bucket(string): 存储桶名称. + :param Delete(dict): 批量删除的object信息. + :param kwargs(dict): 设置请求headers. + :return(dict): 批量删除的结果. + """ + lst = ['', ''] # 类型为list的标签 + xml_config = format_xml(data=Delete, root='Delete', lst=lst) + headers = mapped(kwargs) + headers['Content-MD5'] = get_md5(xml_config) + headers['Content-Type'] = 'application/xml' + url = self._conf.uri(bucket=Bucket, path="?delete") + logger.info("put bucket replication, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='POST', + url=url, + data=xml_config, + auth=CosS3Auth(self._conf._access_id, self._conf._access_key), + headers=headers) + data = xml_to_dict(rt.text) + if 'Deleted' in data.keys() and not isinstance(data['Deleted'], list): + lst = [] + lst.append(data['Deleted']) + data['Deleted'] = lst + if 'Error' in data.keys() and not isinstance(data['Error'], list): + lst = [] + lst.append(data['Error']) + data['Error'] = lst + return data + def head_object(self, Bucket, Key, **kwargs): """获取文件信息 @@ -402,35 +309,6 @@ def head_object(self, Bucket, Key, **kwargs): headers=headers) return rt.headers - def gen_copy_source_url(self, CopySource): - """拼接拷贝源url""" - if 'Appid' in CopySource.keys(): - appid = CopySource['Appid'] - else: - raise CosClientError('CopySource Need Parameter Appid') - if 'Bucket' in CopySource.keys(): - bucket = CopySource['Bucket'] - else: - raise CosClientError('CopySource Need Parameter Bucket') - if 'Region' in CopySource.keys(): - region = CopySource['Region'] - region = format_region(region) - else: - raise CosClientError('CopySource Need Parameter Region') - if 'Key' in CopySource.keys(): - path = CopySource['Key'] - if path and path[0] == '/': - path = path[1:] - else: - raise CosClientError('CopySource Need Parameter Key') - url = "{bucket}-{uid}.{region}.myqcloud.com/{path}".format( - bucket=bucket, - uid=appid, - region=region, - path=path - ) - return url - def copy_object(self, Bucket, Key, CopySource, CopyStatus='Copy', **kwargs): """文件拷贝,文件信息修改 @@ -446,7 +324,7 @@ def copy_object(self, Bucket, Key, CopySource, CopyStatus='Copy', **kwargs): for i in headers['Metadata'].keys(): headers[i] = headers['Metadata'][i] headers.pop('Metadata') - headers['x-cos-copy-source'] = self.gen_copy_source_url(CopySource) + headers['x-cos-copy-source'] = gen_copy_source_url(CopySource) if CopyStatus != 'Copy' and CopyStatus != 'Replaced': raise CosClientError('CopyStatus must be Copy or Replaced') headers['x-cos-metadata-directive'] = CopyStatus @@ -462,6 +340,35 @@ def copy_object(self, Bucket, Key, CopySource, CopyStatus='Copy', **kwargs): data = xml_to_dict(rt.text) return data + def upload_part_copy(self, Bucket, Key, PartNumber, UploadId, CopySource, CopySourceRange='', **kwargs): + """拷贝指定文件至分块上传 + + :param Bucket(string): 存储桶名称. + :param Key(string): 上传COS路径. + :param PartNumber(int): 上传分块的编号. + :param UploadId(string): 分块上传创建的UploadId. + :param CopySource(dict): 拷贝源,包含Appid,Bucket,Region,Key. + :param CopySourceRange(string): 拷贝源的字节范围,bytes=first-last。 + :param kwargs(dict): 设置请求headers. + :return(dict): 拷贝成功的结果. + """ + headers = mapped(kwargs) + headers['x-cos-copy-source'] = gen_copy_source_url(CopySource) + headers['x-cos-copy-source-range'] = CopySourceRange + url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?partNumber={PartNumber}&uploadId={UploadId}".format( + PartNumber=PartNumber, + UploadId=UploadId)) + logger.info("upload part copy, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='PUT', + url=url, + headers=headers, + auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key)) + data = xml_to_dict(rt.text) + return data + def create_multipart_upload(self, Bucket, Key, **kwargs): """创建分片上传,适用于大文件上传 @@ -507,6 +414,7 @@ def upload_part(self, Bucket, Key, Body, PartNumber, UploadId, **kwargs): logger.info("put object, url=:{url} ,headers=:{headers}".format( url=url, headers=headers)) + Body = deal_with_empty_file_stream(Body) rt = self.send_request( method='PUT', url=url, @@ -694,14 +602,14 @@ def delete_bucket(self, Bucket, **kwargs): headers=headers) return None - def list_objects(self, Bucket, Delimiter="", Marker="", MaxKeys=1000, Prefix="", EncodingType="", **kwargs): + def list_objects(self, Bucket, Prefix="", Delimiter="", Marker="", MaxKeys=1000, EncodingType="", **kwargs): """获取文件列表 :param Bucket(string): 存储桶名称. + :param Prefix(string): 设置匹配文件的前缀. :param Delimiter(string): 分隔符. :param Marker(string): 从marker开始列出条目. :param MaxKeys(int): 设置单次返回最大的数量,最大为1000. - :param Prefix(string): 设置匹配文件的前缀. :param EncodingType(string): 设置返回结果编码方式,只能设置为url. :param kwargs(dict): 设置请求headers. :return(dict): 文件的相关信息,包括Etag等信息. @@ -712,10 +620,10 @@ def list_objects(self, Bucket, Delimiter="", Marker="", MaxKeys=1000, Prefix="", url=url, headers=headers)) params = { + 'prefix': Prefix, 'delimiter': Delimiter, 'marker': Marker, - 'max-keys': MaxKeys, - 'prefix': Prefix + 'max-keys': MaxKeys } if EncodingType: if EncodingType != 'url': @@ -735,6 +643,92 @@ def list_objects(self, Bucket, Delimiter="", Marker="", MaxKeys=1000, Prefix="", data['Contents'] = lst return data + def list_objects_versions(self, Bucket, Prefix="", Delimiter="", KeyMarker="", VersionIdMarker="", MaxKeys=1000, EncodingType="", **kwargs): + """获取文件列表 + + :param Bucket(string): 存储桶名称. + :param Prefix(string): 设置匹配文件的前缀. + :param Delimiter(string): 分隔符. + :param KeyMarker(string): 从KeyMarker指定的Key开始列出条目. + :param VersionIdMarker(string): 从VersionIdMarker指定的版本开始列出条目. + :param MaxKeys(int): 设置单次返回最大的数量,最大为1000. + :param EncodingType(string): 设置返回结果编码方式,只能设置为url. + :param kwargs(dict): 设置请求headers. + :return(dict): 文件的相关信息,包括Etag等信息. + """ + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path='?versions') + logger.info("list objects versions, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + params = { + 'prefix': Prefix, + 'delimiter': Delimiter, + 'key-marker': KeyMarker, + 'version-id-marker': VersionIdMarker, + 'max-keys': MaxKeys + } + if EncodingType: + if EncodingType != 'url': + raise CosClientError('EncodingType must be url') + params['encoding-type'] = EncodingType + rt = self.send_request( + method='GET', + url=url, + params=params, + headers=headers, + auth=CosS3Auth(self._conf._access_id, self._conf._access_key)) + + data = xml_to_dict(rt.text) + if 'Version' in data.keys() and isinstance(data['Version'], dict): # 只有一个Version,将dict转为list,保持一致 + lst = [] + lst.append(data['Version']) + data['Version'] = lst + return data + + def list_multipart_uploads(self, Bucket, Prefix="", Delimiter="", KeyMarker="", UploadIdMarker="", MaxUploads=1000, EncodingType="", **kwargs): + """获取Bucket中正在进行的分块上传 + + :param Bucket(string): 存储桶名称. + :param Prefix(string): 设置匹配文件的前缀. + :param Delimiter(string): 分隔符. + :param KeyMarker(string): 从KeyMarker指定的Key开始列出条目. + :param UploadIdMarker(string): 从UploadIdMarker指定的UploadID开始列出条目. + :param MaxUploads(int): 设置单次返回最大的数量,最大为1000. + :param EncodingType(string): 设置返回结果编码方式,只能设置为url. + :param kwargs(dict): 设置请求headers. + :return(dict): 文件的相关信息,包括Etag等信息. + """ + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path='?uploads') + logger.info("get multipart uploads, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + params = { + 'prefix': Prefix, + 'delimiter': Delimiter, + 'key-marker': KeyMarker, + 'upload-id-marker': UploadIdMarker, + 'max-uploads': MaxUploads + } + if EncodingType: + if EncodingType != 'url': + raise CosClientError('EncodingType must be url') + params['encoding-type'] = EncodingType + rt = self.send_request( + method='GET', + url=url, + params=params, + headers=headers, + auth=CosS3Auth(self._conf._access_id, self._conf._access_key)) + + data = xml_to_dict(rt.text) + if 'Upload' in data.keys() and isinstance(data['Upload'], dict): # 只有一个Upload,将dict转为list,保持一致 + lst = [] + lst.append(data['Upload']) + data['Upload'] = lst + return data + def head_bucket(self, Bucket, **kwargs): """确认bucket是否存在 @@ -842,6 +836,7 @@ def put_bucket_cors(self, Bucket, CORSConfiguration={}, **kwargs): def get_bucket_cors(self, Bucket, **kwargs): """获取bucket CORS + :param Bucket(string): 存储桶名称. :param kwargs(dict): 设置请求headers. :return(dict): 获取Bucket对应的跨域配置. @@ -892,6 +887,7 @@ def delete_bucket_cors(self, Bucket, **kwargs): def put_bucket_lifecycle(self, Bucket, LifecycleConfiguration={}, **kwargs): """设置bucket LifeCycle + :param Bucket(string): 存储桶名称. :param LifecycleConfiguration(dict): 设置Bucket的生命周期规则. :param kwargs(dict): 设置请求headers. @@ -959,6 +955,7 @@ def delete_bucket_lifecycle(self, Bucket, **kwargs): def put_bucket_versioning(self, Bucket, Status, **kwargs): """设置bucket版本控制 + :param Bucket(string): 存储桶名称. :param Status(string): 设置Bucket版本控制的状态,可选值为'Enabled'|'Suspended'. :param kwargs(dict): 设置请求headers. @@ -1024,6 +1021,74 @@ def get_bucket_location(self, Bucket, **kwargs): data['LocationConstraint'] = root.text return data + def put_bucket_replication(self, Bucket, ReplicationConfiguration={}, **kwargs): + """设置bucket跨区域复制配置 + + :param Bucket(string): 存储桶名称. + :param ReplicationConfiguration(dict): 设置Bucket的跨区域复制规则. + :param kwargs(dict): 设置请求headers. + :return: None. + """ + lst = ['', ''] # 类型为list的标签 + xml_config = format_xml(data=ReplicationConfiguration, root='ReplicationConfiguration', lst=lst) + headers = mapped(kwargs) + headers['Content-MD5'] = get_md5(xml_config) + headers['Content-Type'] = 'application/xml' + url = self._conf.uri(bucket=Bucket, path="?replication") + logger.info("put bucket replication, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='PUT', + url=url, + data=xml_config, + auth=CosS3Auth(self._conf._access_id, self._conf._access_key), + headers=headers) + return None + + def get_bucket_replication(self, Bucket, **kwargs): + """获取bucket 跨区域复制配置 + + :param Bucket(string): 存储桶名称. + :param kwargs(dict): 设置请求headers. + :return(dict): Bucket对应的跨区域复制配置. + """ + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path="?replication") + logger.info("get bucket replication, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='GET', + url=url, + auth=CosS3Auth(self._conf._access_id, self._conf._access_key), + headers=headers) + data = xml_to_dict(rt.text) + if 'Rule' in data.keys() and not isinstance(data['Rule'], list): + lst = [] + lst.append(data['Rule']) + data['Rule'] = lst + return data + + def delete_bucket_replication(self, Bucket, **kwargs): + """删除bucket 跨区域复制配置 + + :param Bucket(string): 存储桶名称. + :param kwargs(dict): 设置请求headers. + :return: None. + """ + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path="?replication") + logger.info("delete bucket replication, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='DELETE', + url=url, + auth=CosS3Auth(self._conf._access_id, self._conf._access_key), + headers=headers) + return None + # service interface begin def list_buckets(self, **kwargs): """列出所有bucket @@ -1045,5 +1110,184 @@ def list_buckets(self, **kwargs): data['Buckets']['Bucket'] = lst return data + # Advanced interface + def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst): + """从本地文件中读取分块, 上传单个分块,将结果记录在md5——list中 + + :param bucket(string): 存储桶名称. + :param key(string): 分块上传路径名. + :param local_path(string): 本地文件路径名. + :param offset(int): 读取本地文件的分块偏移量. + :param size(int): 读取本地文件的分块大小. + :param part_num(int): 上传分块的序号. + :param uploadid(string): 分块上传的uploadid. + :param md5_lst(list): 保存上传成功分块的MD5和序号. + :return: None. + """ + with open(local_path, 'rb') as fp: + fp.seek(offset, 0) + data = fp.read(size) + rt = self.upload_part(bucket, key, data, part_num, uploadid) + md5_lst.append({'PartNumber': part_num, 'ETag': rt['ETag']}) + return None + + def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kwargs): + """小于等于100MB的文件简单上传,大于等于100MB的文件使用分块上传 + + :param Bucket(string): 存储桶名称. + :param key(string): 分块上传路径名. + :param LocalFilePath(string): 本地文件路径名. + :param PartSize(int): 分块的大小设置. + :param MAXThread(int): 并发上传的最大线程数. + :param kwargs(dict): 设置请求headers. + :return: None. + """ + file_size = os.path.getsize(LocalFilePath) + if file_size <= 1024*1024*100: + with open(LocalFilePath, 'rb') as fp: + rt = self.put_object(Bucket=Bucket, Key=Key, Body=fp, **kwargs) + return rt + else: + part_size = 1024*1024*PartSize # 默认按照10MB分块,最大支持100G的文件,超过100G的分块数固定为10000 + last_size = 0 # 最后一块可以小于1MB + parts_num = file_size / part_size + last_size = file_size % part_size + + if last_size != 0: + parts_num += 1 + if parts_num > 10000: + parts_num = 10000 + part_size = file_size / parts_num + last_size = file_size % parts_num + last_size += part_size + + # 创建分块上传 + rt = self.create_multipart_upload(Bucket=Bucket, Key=Key, **kwargs) + uploadid = rt['UploadId'] + + # 上传分块 + offset = 0 # 记录文件偏移量 + lst = list() # 记录分块信息 + pool = SimpleThreadPool(MAXThread) + + for i in range(1, parts_num+1): + if i == parts_num: # 最后一块 + pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst) + else: + pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst) + offset += part_size + + pool.wait_completion() + lst = sorted(lst, key=lambda x: x['PartNumber']) # 按PartNumber升序排列 + + # 完成分片上传 + try: + rt = self.complete_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid, MultipartUpload={'Part': lst}) + except Exception as e: + abort_response = self.abort_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid) + raise e + return rt + + def _inner_head_object(self, CopySource): + """查询源文件的长度""" + bucket, path, region = get_copy_source_info(CopySource) + url = self._conf.uri(bucket=bucket, path=quote(path, '/-_.~'), scheme=self._conf._scheme, region=region) + rt = self.send_request( + method='HEAD', + url=url, + auth=CosS3Auth(self._conf._access_id, self._conf._access_key, path), + headers={}) + return int(rt.headers['Content-Length']) + + def _upload_part_copy(self, bucket, key, part_number, upload_id, copy_source, copy_source_range, md5_lst): + """拷贝指定文件至分块上传,记录结果到lst中去 + + :param bucket(string): 存储桶名称. + :param key(string): 上传COS路径. + :param part_number(int): 上传分块的编号. + :param upload_id(string): 分块上传创建的UploadId. + :param copy_source(dict): 拷贝源,包含Appid,Bucket,Region,Key. + :param copy_source_range(string): 拷贝源的字节范围,bytes=first-last。 + :param md5_lst(list): 保存上传成功分块的MD5和序号. + :return: None. + """ + rt = self.upload_part_copy(bucket, key, part_number, upload_id, copy_source, copy_source_range) + md5_lst.append({'PartNumber': part_number, 'ETag': rt['ETag']}) + return None + + def _check_same_region(self, dst_region, CopySource): + if 'Region' in CopySource.keys(): + src_region = CopySource['Region'] + src_region = format_region(src_region) + else: + raise CosClientError('CopySource Need Parameter Region') + if src_region == dst_region: + return True + return False + + def copy(self, Bucket, Key, CopySource, CopyStatus='Copy', PartSize=10, MAXThread=5, **kwargs): + """文件拷贝,小于5G的文件调用copy_object,大于等于5G的文件调用分块上传的upload_part_copy + + :param Bucket(string): 存储桶名称. + :param Key(string): 上传COS路径. + :param CopySource(dict): 拷贝源,包含Appid,Bucket,Region,Key. + :param CopyStatus(string): 拷贝状态,可选值'Copy'|'Replaced'. + :param PartSize(int): 分块的大小设置. + :param MAXThread(int): 并发上传的最大线程数. + :param kwargs(dict): 设置请求headers. + :return(dict): 拷贝成功的结果. + """ + # 同园区直接走copy_object + if self._check_same_region(self._conf._region, CopySource): + response = self.copy_object(Bucket=Bucket, Key=Key, CopySource=CopySource, CopyStatus=CopyStatus, **kwargs) + return response + + # 不同园区查询拷贝源object的content-length + file_size = self._inner_head_object(CopySource) + # 如果源文件大小小于5G,则直接调用copy_object接口 + if file_size < SINGLE_UPLOAD_LENGTH: + response = self.copy_object(Bucket=Bucket, Key=Key, CopySource=CopySource, CopyStatus=CopyStatus, **kwargs) + return response + + # 如果源文件大小大于等于5G,则先创建分块上传,在调用upload_part + part_size = 1024*1024*PartSize # 默认按照10MB分块 + last_size = 0 # 最后一块可以小于1MB + parts_num = file_size / part_size + last_size = file_size % part_size + if last_size != 0: + parts_num += 1 + if parts_num > 10000: + parts_num = 10000 + part_size = file_size / parts_num + last_size = file_size % parts_num + last_size += part_size + # 创建分块上传 + rt = self.create_multipart_upload(Bucket=Bucket, Key=Key, **kwargs) + uploadid = rt['UploadId'] + + # 上传分块拷贝 + offset = 0 # 记录文件偏移量 + lst = list() # 记录分块信息 + pool = SimpleThreadPool(MAXThread) + + for i in range(1, parts_num+1): + if i == parts_num: # 最后一块 + copy_range = gen_copy_source_range(offset, file_size-1) + pool.add_task(self._upload_part_copy, Bucket, Key, i, uploadid, CopySource, copy_range, lst) + else: + copy_range = gen_copy_source_range(offset, offset+part_size-1) + pool.add_task(self._upload_part_copy, Bucket, Key, i, uploadid, CopySource, copy_range, lst) + offset += part_size + + pool.wait_completion() + lst = sorted(lst, key=lambda x: x['PartNumber']) # 按PartNumber升序排列 + # 完成分片上传 + try: + rt = self.complete_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid, MultipartUpload={'Part': lst}) + except Exception as e: + abort_response = self.abort_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid) + raise e + return rt + if __name__ == "__main__": pass diff --git a/qcloud_cos/cos_comm.py b/qcloud_cos/cos_comm.py new file mode 100644 index 00000000..55196299 --- /dev/null +++ b/qcloud_cos/cos_comm.py @@ -0,0 +1,233 @@ +# -*- coding=utf-8 + +import hashlib +import base64 +import os +import io +import sys +import xml.dom.minidom +import xml.etree.ElementTree +from urllib import quote +from xml2dict import Xml2Dict +from dicttoxml import dicttoxml +from cos_exception import CosClientError +from cos_exception import CosServiceError + +SINGLE_UPLOAD_LENGTH = 5*1024*1024*1024 # 单次上传文件最大为5G +# kwargs中params到http headers的映射 +maplist = { + 'ContentLength': 'Content-Length', + 'ContentMD5': 'Content-MD5', + 'ContentType': 'Content-Type', + 'CacheControl': 'Cache-Control', + 'ContentDisposition': 'Content-Disposition', + 'ContentEncoding': 'Content-Encoding', + 'ContentLanguage': 'Content-Language', + 'Expires': 'Expires', + 'ResponseContentType': 'response-content-type', + 'ResponseContentLanguage': 'response-content-language', + 'ResponseExpires': 'response-expires', + 'ResponseCacheControl': 'response-cache-control', + 'ResponseContentDisposition': 'response-content-disposition', + 'ResponseContentEncoding': 'response-content-encoding', + 'Metadata': 'Metadata', + 'ACL': 'x-cos-acl', + 'GrantFullControl': 'x-cos-grant-full-control', + 'GrantWrite': 'x-cos-grant-write', + 'GrantRead': 'x-cos-grant-read', + 'StorageClass': 'x-cos-storage-class', + 'Range': 'Range', + 'IfMatch': 'If-Match', + 'IfNoneMatch': 'If-None-Match', + 'IfModifiedSince': 'If-Modified-Since', + 'IfUnmodifiedSince': 'If-Unmodified-Since', + 'CopySourceIfMatch': 'x-cos-copy-source-If-Match', + 'CopySourceIfNoneMatch': 'x-cos-copy-source-If-None-Match', + 'CopySourceIfModifiedSince': 'x-cos-copy-source-If-Modified-Since', + 'CopySourceIfUnmodifiedSince': 'x-cos-copy-source-If-Unmodified-Since', + 'VersionId': 'x-cos-version-id', + } + + +def to_unicode(s): + if isinstance(s, unicode): + return s + else: + return s.decode('utf-8') + + +def get_md5(data): + m2 = hashlib.md5(data) + MD5 = base64.standard_b64encode(m2.digest()) + return MD5 + + +def dict_to_xml(data): + """V5使用xml格式,将输入的dict转换为xml""" + doc = xml.dom.minidom.Document() + root = doc.createElement('CompleteMultipartUpload') + doc.appendChild(root) + + if 'Part' not in data.keys(): + raise CosClientError("Invalid Parameter, Part Is Required!") + + for i in data['Part']: + nodePart = doc.createElement('Part') + + if 'PartNumber' not in i.keys(): + raise CosClientError("Invalid Parameter, PartNumber Is Required!") + + nodeNumber = doc.createElement('PartNumber') + nodeNumber.appendChild(doc.createTextNode(str(i['PartNumber']))) + + if 'ETag' not in i.keys(): + raise CosClientError("Invalid Parameter, ETag Is Required!") + + nodeETag = doc.createElement('ETag') + nodeETag.appendChild(doc.createTextNode(str(i['ETag']))) + + nodePart.appendChild(nodeNumber) + nodePart.appendChild(nodeETag) + root.appendChild(nodePart) + return doc.toxml('utf-8') + + +def xml_to_dict(data, origin_str="", replace_str=""): + """V5使用xml格式,将response中的xml转换为dict""" + root = xml.etree.ElementTree.fromstring(data) + xmldict = Xml2Dict(root) + xmlstr = str(xmldict) + xmlstr = xmlstr.replace("{http://www.qcloud.com/document/product/436/7751}", "") + xmlstr = xmlstr.replace("{http://www.w3.org/2001/XMLSchema-instance}", "") + if origin_str: + xmlstr = xmlstr.replace(origin_str, replace_str) + xmldict = eval(xmlstr) + return xmldict + + +def get_id_from_xml(data, name): + """解析xml中的特定字段""" + tree = xml.dom.minidom.parseString(data) + root = tree.documentElement + result = root.getElementsByTagName(name) + # use childNodes to get a list, if has no child get itself + return result[0].childNodes[0].nodeValue + + +def mapped(headers): + """S3到COS参数的一个映射""" + _headers = dict() + for i in headers.keys(): + if i in maplist: + _headers[maplist[i]] = headers[i] + else: + raise CosClientError('No Parameter Named '+i+' Please Check It') + return _headers + + +def format_xml(data, root, lst=list()): + """将dict转换为xml""" + xml_config = dicttoxml(data, item_func=lambda x: x, custom_root=root, attr_type=False) + for i in lst: + xml_config = xml_config.replace(i+i, i) + return xml_config + + +def format_region(region): + """格式化地域""" + if region.find('cos.') != -1: + return region # 传入cos.ap-beijing-1这样显示加上cos.的region + if region == 'cn-north' or region == 'cn-south' or region == 'cn-east' or region == 'cn-south-2' or region == 'cn-southwest' or region == 'sg': + return region # 老域名不能加cos. + # 支持v4域名映射到v5 + if region == 'cossh': + return 'cos.ap-shanghai' + if region == 'cosgz': + return 'cos.ap-guangzhou' + if region == 'cosbj': + return 'cos.ap-beijing' + if region == 'costj': + return 'cos.ap-beijing-1' + if region == 'coscd': + return 'cos.ap-chengdu' + if region == 'cossgp': + return 'cos.ap-singapore' + if region == 'coshk': + return 'cos.ap-hongkong' + if region == 'cosca': + return 'cos.na-toronto' + if region == 'cosger': + return 'cos.eu-frankfurt' + + return 'cos.' + region # 新域名加上cos. + + +def format_bucket(bucket, appid): + """兼容新老bucket长短命名,appid为空默认为长命名,appid不为空则认为是短命名""" + if not isinstance(bucket, str): + raise CosClientError("bucket is not str") + # appid为空直接返回bucket + if not appid: + return bucket + # appid不为空,检查是否以-appid结尾 + if bucket.endswith("-"+appid): + return bucket + return bucket + "-" + appid + + +def get_copy_source_info(CopySource): + """获取拷贝源的所有信息""" + appid = "" + if 'Appid' in CopySource.keys(): + appid = CopySource['Appid'] + if 'Bucket' in CopySource.keys(): + bucket = CopySource['Bucket'] + bucket = format_bucket(bucket, appid) + else: + raise CosClientError('CopySource Need Parameter Bucket') + if 'Region' in CopySource.keys(): + region = CopySource['Region'] + region = format_region(region) + else: + raise CosClientError('CopySource Need Parameter Region') + if 'Key' in CopySource.keys(): + path = CopySource['Key'] + else: + raise CosClientError('CopySource Need Parameter Key') + return bucket, path, region + + +def gen_copy_source_url(CopySource): + """拼接拷贝源url""" + bucket, path, region = get_copy_source_info(CopySource) + if path and path[0] == '/': + path = path[1:] + url = "{bucket}.{region}.myqcloud.com/{path}".format( + bucket=bucket, + region=region, + path=path + ) + return url + + +def gen_copy_source_range(begin_range, end_range): + """拼接bytes=begin-end形式的字符串""" + range = "bytes={first}-{end}".format( + first=begin_range, + end=end_range + ) + return range + + +def deal_with_empty_file_stream(data): + """对于文件流的剩余长度为0的情况下,返回空字节流""" + if hasattr(data, 'fileno') and hasattr(data, 'tell'): + try: + fileno = data.fileno() + total_length = os.fstat(fileno).st_size + current_position = data.tell() + if total_length - current_position == 0: + return "" + except io.UnsupportedOperation: + return "" + return data diff --git a/qcloud_cos/cos_threadpool.py b/qcloud_cos/cos_threadpool.py index f5410db0..d87d6e96 100644 --- a/qcloud_cos/cos_threadpool.py +++ b/qcloud_cos/cos_threadpool.py @@ -20,7 +20,9 @@ def __init__(self, task_queue, *args, **kwargs): def run(self): while True: func, args, kwargs = self._task_queue.get() - + # 判断线程是否需要退出 + if func is None: + return try: ret = func(*args, **kwargs) self._succ_task_num += 1 @@ -53,6 +55,7 @@ def add_task(self, func, *args, **kwargs): if not self._active: with self._lock: if not self._active: + self._workers = [] self._active = True for i in range(self._num_threads): @@ -65,6 +68,11 @@ def add_task(self, func, *args, **kwargs): def wait_completion(self): self._queue.join() self._finished = True + # 已经结束的任务, 需要将线程都退出, 防止卡死 + for i in range(self._num_threads): + self._queue.put((None, None, None)) + + self._active = False def get_result(self): assert self._finished diff --git a/qcloud_cos/demo.py b/qcloud_cos/demo.py index 7720cf3f..f1fd38af 100644 --- a/qcloud_cos/demo.py +++ b/qcloud_cos/demo.py @@ -10,32 +10,30 @@ # cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224 -# 设置用户属性, 包括appid, secret_id, secret_key, region -appid = '1242338703' # 替换为用户的appid +# 设置用户属性, 包括secret_id, secret_key, region secret_id = 'AKID15IsskiBQACGbAo6WhgcQbVls7HmuG00' # 替换为用户的secret_id secret_key = 'csivKvxxrMvSvQpMWHuIz12pThQQlWRW' # 替换为用户的secret_key region = 'ap-beijing-1' # 替换为用户的region token = '' # 使用临时秘钥需要传入Token,默认为空,可不填 -config = CosConfig(Appid=appid, Region=region, Access_id=secret_id, Access_key=secret_key, Token=token) # 获取配置对象 +config = CosConfig(Region=region, Access_id=secret_id, Access_key=secret_key, Token=token) # 获取配置对象 client = CosS3Client(config) # 文件流 简单上传 -fp = open('test.txt', 'rb') file_name = 'test.txt' -response = client.put_object( - Bucket='test04', - Body=fp, - Key=file_name, - StorageClass='STANDARD', - CacheControl='no-cache', - ContentDisposition='download.txt' -) -fp.close() -print response['ETag'] +with open('test.txt', 'rb') as fp: + response = client.put_object( + Bucket='test04-123456789', + Body=fp, + Key=file_name, + StorageClass='STANDARD', + CacheControl='no-cache', + ContentDisposition='download.txt' + ) + print response['ETag'] # 字节流 简单上传 response = client.put_object( - Bucket='test04', + Bucket='test04-123456789', Body='abcdefg', Key=file_name, CacheControl='no-cache', @@ -45,14 +43,14 @@ # 文件下载 获取文件到本地 response = client.get_object( - Bucket='test04', + Bucket='test04-123456789', Key=file_name, ) response['Body'].get_stream_to_file('output.txt') # 文件下载 获取文件流 response = client.get_object( - Bucket='test04', + Bucket='test04-123456789', Key=file_name, ) fp = response['Body'].get_raw_stream() @@ -61,7 +59,7 @@ # 文件下载 捕获异常 try: response = client.get_object( - Bucket='test04', + Bucket='test04-123456789', Key='not_exist.txt', ) fp = response['Body'].get_raw_stream() diff --git a/qcloud_cos/test.py b/qcloud_cos/test.py index ade34355..f509e524 100644 --- a/qcloud_cos/test.py +++ b/qcloud_cos/test.py @@ -1,13 +1,31 @@ # -*- coding=utf-8 import random import sys +import time +import hashlib import os +import requests from cos_client import CosS3Client from cos_client import CosConfig from cos_exception import CosServiceError ACCESS_ID = os.environ["ACCESS_ID"] ACCESS_KEY = os.environ["ACCESS_KEY"] +test_bucket = "test01-1252448703" +test_object = "test.txt" +special_file_name = "中文" + "→↓←→↖↗↙↘! \"#$%&'()*+,-./0123456789:;<=>@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~" +conf = CosConfig( + Region="ap-beijing-1", + Access_id=ACCESS_ID, + Access_key=ACCESS_KEY +) +client = CosS3Client(conf) + + +def get_raw_md5(data): + m2 = hashlib.md5(data) + etag = '"' + str(m2.hexdigest()) + '"' + return etag def gen_file(path, size): @@ -29,137 +47,61 @@ def print_error_msg(e): def setUp(): - print "start test" + print "start test..." def tearDown(): print "function teardown" -def Test(): - conf = CosConfig( - Appid="1252448703", - Region="ap-beijing-1", - Access_id=ACCESS_ID, - Access_key=ACCESS_KEY - ) - client = CosS3Client(conf) - - test_bucket = 'test01' - file_size = 2 # 方便CI通过 +def test_put_get_delete_object_10MB(): + """简单上传下载删除10MB小文件""" + file_size = 5 file_id = str(random.randint(0, 1000)) + str(random.randint(0, 1000)) file_name = "tmp" + file_id + "_" + str(file_size) + "MB" - - print "test put bucket cors " + test_bucket - cors_config = { - 'CORSRule': [ - { - 'ID': '1234', - 'AllowedOrigin': ['http://www.qq.com'], - 'AllowedMethod': ['GET', 'PUT'], - 'AllowedHeader': ['x-cos-meta-test'], - 'ExposeHeader': ['x-cos-meta-test1'], - 'MaxAgeSeconds': 500 - }] - } - response = client.put_bucket_cors( - Bucket=test_bucket, - CORSConfiguration=cors_config - ) - - print "test get bucket cors " + test_bucket - response = client.get_bucket_cors( - Bucket=test_bucket - ) - print response - - print "test delete bucket cors " + test_bucket - response = client.delete_bucket_cors( - Bucket=test_bucket - ) - - print "test put bucket lifecycle " + test_bucket - life_config = { - 'Rule': [ - { - 'Expiration': {'Days': 100}, - 'ID': '123', - 'Filter': {'Prefix': '456'}, - 'Status': 'Enabled', - } - ] - } - response = client.put_bucket_lifecycle( - Bucket=test_bucket, - LifecycleConfiguration=life_config - ) - - print "test get bucket lifecycle " + test_bucket - response = client.get_bucket_lifecycle( - Bucket=test_bucket - ) - print response - - print "test delete bucket lifecycle " + test_bucket - response = client.delete_bucket_lifecycle( - Bucket=test_bucket - ) - - print "test put bucket versioning " + test_bucket - response = client.put_bucket_versioning( - Bucket=test_bucket, - Status='Enabled' - ) - - print "test get bucket versioning " + test_bucket - response = client.get_bucket_versioning( - Bucket=test_bucket - ) - print response - - print "test get bucket location " + test_bucket - response = client.get_bucket_location( - Bucket=test_bucket - ) - print response - - print "test head bucket " + test_bucket - response = client.head_bucket( - Bucket=test_bucket - ) - - print "Test Get Presigned Download URL " - url = client.get_presigned_download_url( - Bucket=test_bucket, - Key='中文.txt' - ) - print url - - print "Test List Buckets" - response = client.list_buckets() - - copy_source = {'Appid': '1252448703', 'Bucket': 'test01', 'Key': '/test.txt', 'Region': 'ap-beijing-1'} - print "Test Copy Object From Other Object" - response = client.copy_object( - Bucket='test04', - Key='test.txt', - CopySource=copy_source - ) - - print "Test Put Object That Bucket Not Exist " + file_name + gen_file(file_name, 10) + with open(file_name, 'rb') as f: + etag = get_raw_md5(f.read()) try: - response = client.put_object( - Bucket='test0xx', - Body='T'*1024*1024, + # put object + with open(file_name, 'rb') as fp: + put_response = client.put_object( + Bucket=test_bucket, + Body=fp, + Key=file_name, + CacheControl='no-cache', + ContentDisposition='download.txt' + ) + assert etag == put_response['ETag'] + # head object + head_response = client.get_object( + Bucket=test_bucket, + Key=file_name + ) + assert etag == head_response['ETag'] + # get object + get_response = client.get_object( + Bucket=test_bucket, Key=file_name, - CacheControl='no-cache', - ContentDisposition='download.txt' + ResponseCacheControl='private' + ) + assert etag == get_response['ETag'] + assert 'private' == get_response['Cache-Control'] + download_fp = get_response['Body'].get_raw_stream() + assert download_fp + # delete object + delete_response = client.delete_object( + Bucket=test_bucket, + Key=file_name ) except CosServiceError as e: print_error_msg(e) + if os.path.exists(file_name): + os.remove(file_name) + - special_file_name = "中文" + "→↓←→↖↗↙↘! \"#$%&'()*+,-./0123456789:;<=>@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~" - print "Test Put Object Contains Special Characters " + special_file_name +def test_put_object_speacil_names(): + """特殊字符文件上传""" response = client.put_object( Bucket=test_bucket, Body='S'*1024*1024, @@ -167,142 +109,135 @@ def Test(): CacheControl='no-cache', ContentDisposition='download.txt' ) + assert response - print "Test Get Object Contains Special Characters " + special_file_name + +def test_get_object_special_names(): + """特殊字符文件下载""" response = client.get_object( Bucket=test_bucket, - Key=special_file_name, + Key=special_file_name ) + assert response - print "Test Delete Object Contains Special Characters " + special_file_name + +def test_delete_object_special_names(): + """特殊字符文件删除""" response = client.delete_object( Bucket=test_bucket, Key=special_file_name ) - print "Test Put Object " + file_name - gen_file(file_name, file_size) - fp = open(file_name, 'rb') - response = client.put_object( - Bucket=test_bucket, - Body=fp, - Key=file_name, - CacheControl='no-cache', - ContentDisposition='download.txt', - Metadata={ - "x-cos-meta-tiedu": "value1" - } - ) - fp.close() - os.remove(file_name) - - print "Test Get Object " + file_name - response = client.get_object( - Bucket=test_bucket, - Key=file_name, - ) - # 返回一个raw stream - # fp = response['Body'].get_raw_stream() - # 返回一个generator - # stream_generator = response['Body'].get_stream(stream_size=1024*512) - response['Body'].get_stream_to_file('cos.txt') - if os.path.exists('cos.txt'): - os.remove('cos.txt') - - print "Test Head Object " + file_name - response = client.head_object( - Bucket=test_bucket, - Key=file_name - ) - print "Test Head Object " + file_name + "123" +def test_put_object_non_exist_bucket(): + """文件上传至不存在bucket""" try: - response = client.head_object( - Bucket=test_bucket, - Key=file_name+"123" + response = client.put_object( + Bucket='test0xx-1252448703', + Body='T'*1024*1024, + Key=test_object, + CacheControl='no-cache', + ContentDisposition='download.txt' ) except CosServiceError as e: print_error_msg(e) - print "Test Put Object ACL " + file_name + +def test_put_object_acl(): + """设置object acl""" response = client.put_object_acl( Bucket=test_bucket, - Key=file_name, + Key=test_object, ACL='public-read-write' ) - print "Test Get Object ACL" + file_name - response = client.get_object_acl( - Bucket=test_bucket, - Key=file_name - ) - print "Test Delete Object " + file_name - response = client.delete_object( +def test_get_object_acl(): + """获取object acl""" + response = client.get_object_acl( Bucket=test_bucket, - Key=file_name + Key=test_object ) + assert response - print "Test List Objects" - response = client.list_objects( - Bucket=test_bucket - ) - print "Test Create Bucket" - response = client.create_bucket( - Bucket='test'+file_id, - ACL='public-read' +def test_copy_object_diff_bucket(): + """从另外的bucket拷贝object""" + copy_source = {'Bucket': 'test04-1252448703', 'Key': '/test.txt', 'Region': 'ap-beijing-1'} + response = client.copy_object( + Bucket=test_bucket, + Key='test.txt', + CopySource=copy_source ) + assert response - print "Test PUT Bucket ACL" - try: - response = client.put_bucket_acl( - Bucket='test'+file_id, - ACL='public-read-writea' - ) - except CosServiceError as e: - print_error_msg(e) - print "Test GET Bucket ACL" - response = client.get_bucket_acl( - Bucket='test'+file_id, +def test_create_abort_multipart_upload(): + """创建一个分块上传,然后终止它""" + # create + response = client.create_multipart_upload( + Bucket=test_bucket, + Key='multipartfile.txt', ) - - print "Test Delete Bucket" - response = client.delete_bucket( - Bucket='test'+file_id + assert response + uploadid = response['UploadId'] + # abort + response = client.abort_multipart_upload( + Bucket=test_bucket, + Key='multipartfile.txt', + UploadId=uploadid ) - print "Test Head Bucket" - try: - response = client.head_bucket( - Bucket='test'+file_id - ) - except CosServiceError as e: - print_error_msg(e) - print "Test Create MultipartUpload" +def test_create_complete_multipart_upload(): + """创建一个分块上传,上传分块,列出分块,完成分块上传""" + # create response = client.create_multipart_upload( Bucket=test_bucket, Key='multipartfile.txt', ) uploadid = response['UploadId'] + # upload part + response = client.upload_part( + Bucket=test_bucket, + Key='multipartfile.txt', + UploadId=uploadid, + PartNumber=1, + Body='A'*1024*1024*2 + ) - print "Test Abort MultipartUpload" - response = client.abort_multipart_upload( + response = client.upload_part( + Bucket=test_bucket, + Key='multipartfile.txt', + UploadId=uploadid, + PartNumber=2, + Body='B'*1024*1024*2 + ) + # list parts + response = client.list_parts( Bucket=test_bucket, Key='multipartfile.txt', UploadId=uploadid ) + lst = response['Part'] + # complete + response = client.complete_multipart_upload( + Bucket=test_bucket, + Key='multipartfile.txt', + UploadId=uploadid, + MultipartUpload={'Part': lst} + ) + - print "Test Create MultipartUpload" +def test_upload_part_copy(): + """创建一个分块上传,上传分块拷贝,列出分块,完成分块上传""" + # create response = client.create_multipart_upload( Bucket=test_bucket, Key='multipartfile.txt', ) uploadid = response['UploadId'] - - print "Test Upload Part1" + # upload part response = client.upload_part( Bucket=test_bucket, Key='multipartfile.txt', @@ -311,7 +246,6 @@ def Test(): Body='A'*1024*1024*2 ) - print "Test Upload Part2" response = client.upload_part( Bucket=test_bucket, Key='multipartfile.txt', @@ -320,15 +254,23 @@ def Test(): Body='B'*1024*1024*2 ) - print "List Upload Parts" + # upload part copy + copy_source = {'Bucket': 'test04-1252448703', 'Key': '/test.txt', 'Region': 'ap-beijing-1'} + response = client.upload_part_copy( + Bucket=test_bucket, + Key='multipartfile.txt', + UploadId=uploadid, + PartNumber=3, + CopySource=copy_source + ) + # list parts response = client.list_parts( Bucket=test_bucket, Key='multipartfile.txt', UploadId=uploadid ) lst = response['Part'] - - print "Test Complete MultipartUpload" + # complete response = client.complete_multipart_upload( Bucket=test_bucket, Key='multipartfile.txt', @@ -336,6 +278,347 @@ def Test(): MultipartUpload={'Part': lst} ) + +def test_delete_multiple_objects(): + """批量删除文件""" + file_id = str(random.randint(0, 1000)) + str(random.randint(0, 1000)) + file_name1 = "tmp" + file_id + "_delete1" + file_name2 = "tmp" + file_id + "_delete2" + response1 = client.put_object( + Bucket=test_bucket, + Key=file_name1, + Body='A'*1024*1024 + ) + assert response1 + response2 = client.put_object( + Bucket=test_bucket, + Key=file_name2, + Body='B'*1024*1024*2 + ) + assert response2 + objects = { + "Quite": "true", + "Object": [ + { + "Key": file_name1 + }, + { + "Key": file_name2 + } + ] + } + response = client.delete_objects( + Bucket=test_bucket, + Delete=objects + ) + assert response + + +def test_create_head_delete_bucket(): + """创建一个bucket,head它是否存在,最后删除一个空bucket""" + bucket_id = str(random.randint(0, 1000)) + str(random.randint(0, 1000)) + bucket_name = 'buckettest' + bucket_id + '-1252448703' + response = client.create_bucket( + Bucket=bucket_name, + ACL='public-read' + ) + response = client.head_bucket( + Bucket=bucket_name + ) + response = client.delete_bucket( + Bucket=bucket_name + ) + + +def test_put_bucket_acl_illegal(): + """设置非法的ACL""" + try: + response = client.put_bucket_acl( + Bucket=test_bucket, + ACL='public-read-writ' + ) + except CosServiceError as e: + print_error_msg(e) + + +def test_get_bucket_acl_normal(): + """正常获取bucket ACL""" + response = client.get_bucket_acl( + Bucket=test_bucket + ) + assert response + + +def test_list_objects(): + """列出bucket下的objects""" + response = client.list_objects( + Bucket=test_bucket, + MaxKeys=100 + ) + assert response + + +def test_list_objects_versions(): + """列出bucket下的带版本信息的objects""" + response = client.list_objects_versions( + Bucket=test_bucket, + MaxKeys=50 + ) + assert response + + +def test_get_presigned_url(): + """生成预签名的url下载地址""" + url = client.get_presigned_download_url( + Bucket=test_bucket, + Key='中文.txt' + ) + assert url + print url + + +def test_get_bucket_location(): + """获取bucket的地域信息""" + response = client.get_bucket_location( + Bucket=test_bucket + ) + assert response['LocationConstraint'] == "ap-beijing-1" + + +def test_get_service(): + """列出账号下所有的bucket信息""" + response = client.list_buckets() + assert response + + +def test_put_get_delete_cors(): + """设置、获取、删除跨域配置""" + cors_config = { + 'CORSRule': [ + { + 'ID': '1234', + 'AllowedOrigin': ['http://www.qq.com'], + 'AllowedMethod': ['GET', 'PUT'], + 'AllowedHeader': ['x-cos-meta-test'], + 'ExposeHeader': ['x-cos-meta-test1'], + 'MaxAgeSeconds': 500 + } + ] + } + # put cors + response = client.put_bucket_cors( + Bucket=test_bucket, + CORSConfiguration=cors_config + ) + # wait for sync + # get cors + time.sleep(4) + response = client.get_bucket_cors( + Bucket=test_bucket + ) + assert response + # delete cors + response = client.get_bucket_cors( + Bucket=test_bucket + ) + + +def test_put_get_delete_lifecycle(): + """设置、获取、删除生命周期配置""" + lifecycle_config = { + 'Rule': [ + { + 'Expiration': {'Days': 100}, + 'ID': '123', + 'Filter': {'Prefix': ''}, + 'Status': 'Enabled', + } + ] + } + # put lifecycle + response = client.put_bucket_lifecycle( + Bucket=test_bucket, + LifecycleConfiguration=lifecycle_config + ) + # wait for sync + # get lifecycle + time.sleep(4) + response = client.get_bucket_lifecycle( + Bucket=test_bucket + ) + assert response + # delete lifecycle + response = client.delete_bucket_lifecycle( + Bucket=test_bucket + ) + + +def test_put_get_versioning(): + """设置、获取版本控制""" + # put versioning + response = client.put_bucket_versioning( + Bucket=test_bucket, + Status='Enabled' + ) + # wait for sync + # get versioning + time.sleep(4) + response = client.get_bucket_versioning( + Bucket=test_bucket + ) + assert response['Status'] == 'Enabled' + + +def test_put_get_delete_replication(): + """设置、获取、删除跨园区复制配置""" + replication_config = { + 'Role': 'qcs::cam::uin/735905558:uin/735905558', + 'Rule': [ + { + 'ID': '123', + 'Status': 'Enabled', + 'Prefix': 'replication', + 'Destination': { + 'Bucket': 'qcs:id/0:cos:cn-south:appid/1252448703:replicationsouth' + } + } + ] + } + # source dest bucket must enable versioning + # put replication + response = client.put_bucket_replication( + Bucket=test_bucket, + ReplicationConfiguration=replication_config + ) + # wait for sync + # get replication + time.sleep(4) + response = client.get_bucket_replication( + Bucket=test_bucket + ) + assert response + # delete lifecycle + response = client.delete_bucket_replication( + Bucket=test_bucket + ) + + +def test_list_multipart_uploads(): + """获取所有正在进行的分块上传""" + response = client.list_multipart_uploads( + Bucket=test_bucket, + Prefix="multipart", + MaxUploads=100 + ) + # abort make sure delete all uploads + if 'Upload' in response.keys(): + for data in response['Upload']: + response = client.abort_multipart_upload( + Bucket=test_bucket, + Key=data['Key'], + UploadId=data['UploadId'] + ) + # create a new upload + response = client.create_multipart_upload( + Bucket=test_bucket, + Key='multipartfile.txt', + ) + assert response + uploadid = response['UploadId'] + # list again + response = client.list_multipart_uploads( + Bucket=test_bucket, + Prefix="multipart", + MaxUploads=100 + ) + assert response['Upload'][0]['Key'] == "multipartfile.txt" + assert response['Upload'][0]['UploadId'] == uploadid + # abort again make sure delete all uploads + for data in response['Upload']: + response = client.abort_multipart_upload( + Bucket=test_bucket, + Key=data['Key'], + UploadId=data['UploadId'] + ) + + +def test_upload_file_multithreading(): + """根据文件大小自动选择分块大小,多线程并发上传提高上传速度""" + file_name = "thread_1GB" + gen_file(file_name, 5) # set 5MB beacuse travis too slow + st = time.time() # 记录开始时间 + response = client.upload_file( + Bucket=test_bucket, + Key=file_name, + LocalFilePath=file_name, + MAXThread=10, + CacheControl='no-cache', + ContentDisposition='download.txt' + ) + ed = time.time() # 记录结束时间 + if os.path.exists(file_name): + os.remove(file_name) + print ed - st + + +def test_copy_file_automatically(): + """根据拷贝源文件的大小自动选择拷贝策略,不同园区,小于5G直接copy_object,大于5G分块拷贝""" + copy_source = {'Bucket': 'testtiedu-1252448703', 'Key': '/thread_1MB', 'Region': 'ap-guangzhou'} + response = client.copy( + Bucket=test_bucket, + Key='copy_10G.txt', + CopySource=copy_source, + MAXThread=10 + ) + + +def test_upload_empty_file(): + """上传一个空文件,不能返回411错误""" + file_name = "empty.txt" + with open(file_name, 'wb') as f: + pass + with open(file_name, 'rb') as fp: + response = client.put_object( + Bucket=test_bucket, + Body=fp, + Key=file_name, + CacheControl='no-cache', + ContentDisposition='download.txt' + ) + + +def test_copy_10G_file_in_same_region(): + """同园区的拷贝,应该直接用copy_object接口,可以直接秒传""" + copy_source = {'Bucket': 'test01-1252448703', 'Key': '/10G.txt', 'Region': 'ap-beijing-1'} + response = client.copy( + Bucket='test04-1252448703', + Key='10G.txt', + CopySource=copy_source, + MAXThread=10 + ) + + +def test_use_get_auth(): + """测试利用get_auth方法直接生产签名,然后访问COS""" + auth = client.get_auth( + Method='GET', + Bucket=test_bucket, + Key='test.txt', + Params={'acl': '', 'unsed': '123'} + ) + response = requests.get('http://test01-1252448703.cos.ap-beijing-1.myqcloud.com/test.txt?acl&unsed=123', headers={'Authorization': auth}) + assert response.status_code == 200 + + if __name__ == "__main__": setUp() - Test() + test_upload_empty_file() + test_put_get_delete_object_10MB() + test_put_get_versioning() + test_put_get_delete_replication() + test_upload_part_copy() + test_upload_file_multithreading() + test_copy_file_automatically() + test_copy_10G_file_in_same_region() + test_use_get_auth() + tearDown()