From 61b110d4bda14cf1c8c82a15f9884172161d11d3 Mon Sep 17 00:00:00 2001 From: jayzhenghan Date: Wed, 7 Apr 2021 10:47:33 +0800 Subject: [PATCH 1/4] add put/get bucket intelligenttiering --- qcloud_cos/cos_client.py | 70 ++++++++++++++++++++++++++++++++++++++++ ut/test.py | 19 +++++++++++ 2 files changed, 89 insertions(+) diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 735ec221..33efa5e2 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -2797,6 +2797,76 @@ def get_bucket_referer(self, Bucket, **kwargs): format_dict(data['DomainList'], ['Domain']) return data + def put_bucket_intelligenttiering(self, Bucket, IntelligentTieringConfiguration={}, **kwargs): + """设置存储桶智能分层配置 + + :param Bucket(string): 存储桶名称. + :param IntelligentTieringConfiguration(dict): 只能分层配置 + :param kwargs(dict): 设置请求headers. + :return: None. + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + + intelligent_tiering_conf = { + 'Status': 'Enable', + 'Transition': { + 'Days': '30|60|90', + 'RequestFrequent': '1' + } + } + client.put_bucket_intelligenttiering(Bucket="bucket", IntelligentTieringConfiguration=intelligent_tiering_conf) + """ + + xml_config = format_xml(data=IntelligentTieringConfiguration, root='IntelligentTieringConfiguration') + headers = mapped(kwargs) + headers['Content-Type'] = 'application/xml' + params = {'intelligenttiering': ''} + url = self._conf.uri(bucket=Bucket) + logger.info("put bucket intelligenttiering, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='PUT', + url=url, + bucket=Bucket, + data=xml_config, + auth=CosS3Auth(self._conf, params=params), + headers=headers, + params=params) + return None + + def get_bucket_intelligenttiering(self, Bucket, **kwargs): + """获取存储桶智能分层配置 + :param Bucket(string): 存储桶名称. + :param IntelligentTieringConfiguration(dict): 只能分层配置 + :param kwargs(dict): 设置请求headers. + :return(dict): 智能分层配置. + + .. code-block:: python + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + client.get_bucket_intelligenttiering(Bucket='bucket') + """ + + headers = mapped(kwargs) + params = {'intelligenttiering': ''} + url = self._conf.uri(bucket=Bucket) + logger.info("get bucket intelligenttiering, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='GET', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params), + headers=headers, + params=params) + data = xml_to_dict(rt.content) + return data + # service interface begin def list_buckets(self, **kwargs): """列出所有bucket diff --git a/ut/test.py b/ut/test.py index 7f54ede0..cf77736f 100644 --- a/ut/test.py +++ b/ut/test.py @@ -1241,6 +1241,24 @@ def test_download_file(): if os.path.exists(file_name): os.remove(file_name) +def test_put_get_bucket_intelligenttiering(): + """测试设置获取智能分层""" + intelligent_tiering_conf = { + 'Status': 'Enable', + 'Transition': { + 'Days': '30', + 'RequestFrequent': '1' + } + } + response = client.put_bucket_intelligenttiering( + Bucket=test_bucket, + IntelligentTieringConfiguration=intelligent_tiering_conf + ) + time.sleep(2) + response = client.get_bucket_intelligenttiering( + Bucket=test_bucket, + ) + if __name__ == "__main__": setUp() """ @@ -1268,6 +1286,7 @@ def test_download_file(): test_select_object() _test_get_object_sensitive_content_recognition() test_download_file() + test_put_get_bucket_intelligenttiering() """ tearDown() From cd5207d1692d00e843cab6c033dbeaf438f7132b Mon Sep 17 00:00:00 2001 From: jayzhenghan Date: Wed, 7 Apr 2021 10:52:12 +0800 Subject: [PATCH 2/4] add put/get bucket intelligenttiering --- qcloud_cos/cos_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 33efa5e2..6e88c4e5 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -2809,7 +2809,7 @@ def put_bucket_intelligenttiering(self, Bucket, IntelligentTieringConfiguration= config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 client = CosS3Client(config) - + intelligent_tiering_conf = { 'Status': 'Enable', 'Transition': { From 9350823949589651fe3a705132c65efee8c78893 Mon Sep 17 00:00:00 2001 From: jayzhenghan Date: Wed, 7 Apr 2021 11:10:41 +0800 Subject: [PATCH 3/4] add put/get bucket intelligenttiering --- qcloud_cos/cos_client.py | 594 ++++++++++++++++++++++++++++++++++++++- ut/test.py | 282 ++++++++++++++++++- 2 files changed, 857 insertions(+), 19 deletions(-) diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 6e88c4e5..28a270c7 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -12,14 +12,16 @@ import threading import xml.dom.minidom import xml.etree.ElementTree -from requests import Request, Session +from requests import Request, Session, ConnectionError, Timeout from datetime import datetime from six.moves.urllib.parse import quote, unquote, urlencode +from six import text_type, binary_type from hashlib import md5 from dicttoxml import dicttoxml from .streambody import StreamBody from .xml2dict import Xml2Dict from .cos_auth import CosS3Auth +from .cos_auth import CosRtmpAuth from .cos_comm import * from .cos_threadpool import SimpleThreadPool from .cos_exception import CosClientError @@ -222,7 +224,7 @@ def get_auth(self, Method, Bucket, Key, Expired=300, Headers={}, Params={}): auth = CosS3Auth(self._conf, Key, Params, Expired) return auth(r).headers['Authorization'] - def send_request(self, method, url, bucket, timeout=30, **kwargs): + def send_request(self, method, url, bucket, timeout=30, cos_request=True, **kwargs): """封装request库发起http请求""" if self._conf._timeout is not None: # 用户自定义超时时间 timeout = self._conf._timeout @@ -238,7 +240,12 @@ def send_request(self, method, url, bucket, timeout=30, **kwargs): elif bucket is not None: kwargs['headers']['Host'] = self._conf.get_host(bucket) kwargs['headers'] = format_values(kwargs['headers']) + + file_position = None if 'data' in kwargs: + body = kwargs['data'] + if hasattr(body, 'tell') and hasattr(body, 'seek') and hasattr(body, 'read'): + file_position = body.tell() # 记录文件当前位置 kwargs['data'] = to_bytes(kwargs['data']) if self._conf._ip is not None and self._conf._scheme == 'https': kwargs['verify'] = False @@ -260,12 +267,20 @@ def send_request(self, method, url, bucket, timeout=30, **kwargs): return res elif res.status_code < 500: # 4xx 不重试 break + else: + if j < self._retry and client_can_retry(file_position, **kwargs): + continue + else: + break except Exception as e: # 捕获requests抛出的如timeout等客户端错误,转化为客户端错误 logger.exception('url:%s, retry_time:%d exception:%s' % (url, j, str(e))) - if j < self._retry: - continue + if j < self._retry and (isinstance(e, ConnectionError) or isinstance(e, Timeout)): # 只重试网络错误 + if client_can_retry(file_position, **kwargs): + continue raise CosClientError(str(e)) + if not cos_request: + return res if res.status_code >= 400: # 所有的4XX,5XX都认为是COSServiceError if method == 'HEAD' and res.status_code == 404: # Head 需要处理 info = dict() @@ -276,7 +291,7 @@ def send_request(self, method, url, bucket, timeout=30, **kwargs): info['requestid'] = res.headers['x-cos-request-id'] if 'x-cos-trace-id' in res.headers: info['traceid'] = res.headers['x-cos-trace-id'] - logger.error(info) + logger.warn(info) raise CosServiceError(method, info, res.status_code) else: msg = res.text @@ -1027,6 +1042,7 @@ def get_object_acl(self, Bucket, Key, **kwargs): lst = [] lst.append(data['AccessControlList']['Grant']) data['AccessControlList']['Grant'] = lst + data['CannedACL'] = parse_object_canned_acl(data, rt.headers) return data def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs): @@ -1477,6 +1493,7 @@ def get_bucket_acl(self, Bucket, **kwargs): lst = [] lst.append(data['AccessControlList']['Grant']) data['AccessControlList']['Grant'] = lst + data['CannedACL'] = parse_bucket_canned_acl(data) return data def put_bucket_cors(self, Bucket, CORSConfiguration={}, **kwargs): @@ -2247,6 +2264,37 @@ def get_bucket_policy(self, Bucket, **kwargs): data = {'Policy': json.dumps(rt.json())} return data + def delete_bucket_policy(self, Bucket, **kwargs): + """删除bucket policy + + :param Bucket(string): 存储桶名称. + :param kwargs(dict): 设置请求headers. + :return: None. + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 删除bucket policy服务配置 + response = client.delete_bucket_policy( + Bucket=bucket + ) + """ + headers = mapped(kwargs) + params = {'policy': ''} + url = self._conf.uri(bucket=Bucket) + logger.info("delete bucket policy, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='DELETE', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params), + headers=headers, + params=params) + return None + def put_bucket_domain(self, Bucket, DomainConfiguration={}, **kwargs): """设置bucket的自定义域名 @@ -2797,7 +2845,37 @@ def get_bucket_referer(self, Bucket, **kwargs): format_dict(data['DomainList'], ['Domain']) return data - def put_bucket_intelligenttiering(self, Bucket, IntelligentTieringConfiguration={}, **kwargs): + def delete_bucket_referer(self, Bucket, **kwargs): + """删除bucket防盗链规则 + :param Bucket(string): 存储桶名称. + :param kwargs(dict): 设置请求headers. + :return(dict): None. + .. code-block:: python + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 获取bucket标签 + response = client.delete_bucket_referer( + Bucket='bucket' + ) + """ + + xml_config = '' + headers = mapped(kwargs) + headers['Content-MD5'] = get_md5(xml_config) + headers['Content-Type'] = 'application/xml' + params = {'referer': ''} + url = self._conf.uri(bucket=Bucket) + rt = self.send_request( + method='PUT', + url=url, + bucket=Bucket, + data=xml_config, + auth=CosS3Auth(self._conf, params=params), + headers=headers, + params=params) + return None + + def put_bucket_intelligenttiering(self, Bucket, IntelligentTieringConfiguration=None, **kwargs): """设置存储桶智能分层配置 :param Bucket(string): 存储桶名称. @@ -2820,6 +2898,8 @@ def put_bucket_intelligenttiering(self, Bucket, IntelligentTieringConfiguration= client.put_bucket_intelligenttiering(Bucket="bucket", IntelligentTieringConfiguration=intelligent_tiering_conf) """ + if IntelligentTieringConfiguration is None: + IntelligentTieringConfiguration = {} xml_config = format_xml(data=IntelligentTieringConfiguration, root='IntelligentTieringConfiguration') headers = mapped(kwargs) headers['Content-Type'] = 'application/xml' @@ -2878,9 +2958,7 @@ def list_buckets(self, **kwargs): config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 client = CosS3Client(config) # 获取账户下所有存储桶信息 - response = logging_client.list_buckets( - Bucket='bucket' - ) + response = client.list_buckets() """ headers = mapped(kwargs) url = '{scheme}://service.cos.myqcloud.com/'.format(scheme=self._conf._scheme) @@ -3016,7 +3094,7 @@ def _check_all_upload_parts(self, bucket, key, uploadid, local_path, parts_num, already_exist_parts[part_num] = part['ETag'] return True - def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAZThread=5, EnableCRC=False, **Kwargs): + def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAXThread=5, EnableCRC=False, **Kwargs): """小于等于20MB的文件简单下载,大于20MB的文件使用续传下载 :param Bucket(string): 存储桶名称. @@ -3028,16 +3106,16 @@ def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAZThread=5, Ena :param kwargs(dict): 设置请求headers. """ logger.debug("Start to download file, bucket: {0}, key: {1}, dest_filename: {2}, part_size: {3}MB,\ - max_thread: {4}".format(Bucket, Key, DestFilePath, PartSize, MAZThread)) + max_thread: {4}".format(Bucket, Key, DestFilePath, PartSize, MAXThread)) object_info = self.head_object(Bucket, Key) - file_size = object_info['Content-Length'] + file_size = int(object_info['Content-Length']) if file_size <= 1024*1024*20: response = self.get_object(Bucket, Key, **Kwargs) response['Body'].get_stream_to_file(DestFilePath) return - downloader = ResumableDownLoader(self, Bucket, Key, DestFilePath, object_info, PartSize, MAZThread, EnableCRC, **Kwargs) + downloader = ResumableDownLoader(self, Bucket, Key, DestFilePath, object_info, PartSize, MAXThread, EnableCRC, **Kwargs) downloader.start() def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, EnableMD5=False, progress_callback=None, **kwargs): @@ -3136,7 +3214,7 @@ def _inner_head_object(self, CopySource): params = {} if versionid != '': params['versionId'] = versionid - url = u"{scheme}://{bucket}.{endpoint}/{path}".format(scheme=self._conf._scheme, bucket=bucket, endpoint=endpoint, path=path) + url = u"{scheme}://{bucket}.{endpoint}/{path}".format(scheme=self._conf._scheme, bucket=bucket, endpoint=endpoint, path=quote(to_bytes(path), '/-_.~')) rt = self.send_request( method='HEAD', url=url, @@ -3500,6 +3578,494 @@ def update_object_meta(self, Bucket, Key, **kwargs): ) return response + def put_bucket_encryption(self, Bucket, ServerSideEncryptionConfiguration={}, **kwargs): + """设置执行存储桶下的默认加密配置 + + :param Bucket(string): 存储桶名称. + :param ServerSideEncryptionConfiguration(dict): 设置Bucket的加密规则 + :param kwargs(dict): 设置请求的headers. + :return: None. + """ + # 类型为list的标签 + lst = [ + '', + '' + ] + xml_config = format_xml(data=ServerSideEncryptionConfiguration, root='ServerSideEncryptionConfiguration', lst=lst) + headers = mapped(kwargs) + params = {'encryption': ''} + url = self._conf.uri(bucket=Bucket) + logger.info("put bucket encryption, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='PUT', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params), + data=xml_config, + headers=headers, + params=params) + + return None + + def get_bucket_encryption(self, Bucket, **kwargs): + """获取存储桶下的默认加密配置 + + :param Bucket(string): 存储桶名称. + :param kwargs(dict): 设置请求的headers. + :return(dict): 返回bucket的加密规则. + """ + headers = mapped(kwargs) + params = {'encryption': ''} + url = self._conf.uri(bucket=Bucket) + logger.info("get bucket encryption, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='GET', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params), + headers=headers, + params=params) + + data = xml_to_dict(rt.content) + format_dict(data, ['Rule']) + return data + + def delete_bucket_encryption(self, Bucket, **kwargs): + """用于删除指定存储桶下的默认加密配置 + + :param Bucket(string): 存储桶名称. + :param kwargs(dict): 设置请求的headers. + :return: None. + """ + headers = mapped(kwargs) + params = {'encryption': ''} + url = self._conf.uri(bucket=Bucket) + logger.info("delete bucket encryption, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + rt = self.send_request( + method='DELETE', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params), + headers=headers, + params=params) + + return None + + def put_async_fetch_task(self, Bucket, FetchTaskConfiguration={}, **kwargs): + """发起异步拉取对象到COS的任务 + + :param Bucket(string): 存储桶名称. + :param FetchTaskConfiguration(dict): 异步拉取任务的配置. + :kwargs(dict): 扩展参数. + :return(dict): 异步任务成功返回的结果,包含Taskid等信息. + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 发起异步拉取任务 + response = client.put_async_fetch_task( + Bucket='bucket', + FetchTaskConfiguration={ + 'Url': + 'Key': + 'MD5': + 'SuccessCallbackUrl': + 'FailureCallbackUrl': + } + ) + """ + url = '{scheme}://{region}.migration.myqcloud.com/{bucket}/'.format(scheme=self._conf._scheme, region=self._conf._region, bucket=Bucket) + if self._conf._domain is not None: + url = '{scheme}://{domain}/{bucket}/'.format(scheme=self._conf._scheme, domain=self._conf._domain, bucket=Bucket) + headers = {'Content-Type': 'application/json'} + signed_key = Bucket + '/' + rt = self.send_request( + method='POST', + url=url, + bucket=None, + data=json.dumps(FetchTaskConfiguration), + headers=headers, + auth=CosS3Auth(self._conf, signed_key), + cos_request=False + ) + data = rt.json() + return data + + def get_async_fetch_task(self, Bucket, TaskId, **kwargs): + """获取异步拉取对象到COS的任务状态 + + :param Bucket(string): 存储桶名称. + :param TaskId(string): 异步拉取任务查询的唯一标识. + :kwargs(dict): 扩展参数. + :return(dict): 异步任务的状态 + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 获取异步拉取任务 + response = client.get_async_fetch_task( + Bucket='bucket', + TaskId='string' + ) + """ + url = '{scheme}://{region}.migration.myqcloud.com/{bucket}/{task_id}'.format(scheme=self._conf._scheme, region=self._conf._region, bucket=Bucket, task_id=TaskId) + if self._conf._domain is not None: + url = '{scheme}://{domain}/{bucket}/{task_id}'.format(scheme=self._conf._scheme, domain=self._conf._domain, bucket=Bucket, task_id=TaskId) + headers = {'Content-Type': 'application/json'} + signed_key = '{bucket}/{task_id}'.format(bucket=Bucket, task_id=TaskId) + rt = self.send_request( + method='GET', + url=url, + bucket=None, + headers=headers, + auth=CosS3Auth(self._conf, signed_key), + cos_request=False + ) + data = rt.json() + return data + + def put_live_channel(self, Bucket, ChannelName, Expire=3600, LiveChannelConfiguration={}, **kwargs): + """创建直播通道 + + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :param Expire(int): 推流url签名过期时间. + :param LiveChannelConfiguration(dict): 直播通道配置. + :param kwargs(dict): 设置请求headers. + :return(dict): publish url and playurl. + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 设置直播通道配置 + livechannel_config = { + 'Description': 'channel description', + 'Switch': 'Enabled', + 'Target': { + 'Type': 'HLS', + 'FragDuration': '3', + 'FragCount': '5', + } + } + response = client.put_live_channel(Bucket='bucket', ChannelName='ch1', LiveChannelConfiguration=livechannel_config) + """ + xml_config = format_xml(data=LiveChannelConfiguration, root='LiveChannelConfiguration') + headers = mapped(kwargs) + headers['Content-MD5'] = get_md5(xml_config) + headers['Content-Type'] = 'application/xml' + params = {'live': ''} + url = self._conf.uri(bucket=Bucket, path=ChannelName) + logger.info("put live channel, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + rt = self.send_request( + method='PUT', + url=url, + bucket=Bucket, + data=xml_config, + auth=CosS3Auth(self._conf, params=params, key=ChannelName), + headers=headers, + params=params) + data = xml_to_dict(rt.content) + if data['PublishUrls']['Url'] is not None: + rtmpSign = CosRtmpAuth(self._conf, bucket=Bucket, channel=ChannelName, expire=Expire) + url = data['PublishUrls']['Url'] + url += '?' + rtmpSign.get_rtmp_sign() + data['PublishUrls']['Url'] = url + return data + + def get_rtmp_signed_url(self, Bucket, ChannelName, Expire=3600, Params={}): + """获取直播通道带签名的推流url + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :return: dict. + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + resp = client.get_rtmp_signed_url(Bucket='bucket', ChannelName='ch1') + """ + rtmp_signed_url = 'rtmp://{bucket}.cos.{region}.myqcloud.com/live/{channel}'.format(bucket=Bucket, + region=self._conf._region, + channel=ChannelName) + rtmpAuth = CosRtmpAuth(self._conf, bucket=Bucket, channel=ChannelName, params=Params, expire=Expire) + return rtmp_signed_url + '?' + rtmpAuth.get_rtmp_sign() + + def get_live_channel_info(self, Bucket, ChannelName, **kwargs): + """获取直播通道配置信息 + + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :param kwargs(dict): 设置请求headers. + :return: dict. + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + resp = client.get_live_channel_info(Bucket='bucket', ChannelName='ch1') + """ + params = {'live': ''} + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path=ChannelName) + logger.info("get live channel info, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + rt = self.send_request( + method='GET', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params, key=ChannelName), + headers=headers, + params=params) + data = xml_to_dict(rt.content) + return data + + def put_live_channel_switch(self, Bucket, ChannelName, Switch, **kwargs): + """禁用或者开启直播通道 + + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :param Switch(string): 'enabled'或'disabled'. + :param kwargs(dict): 设置请求headers. + :return(None). + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + client.put_live_channel_switch(Bucket='bucket', ChannelName='ch1', Switch='enabled') + """ + params = {'live': ''} + if Switch in ['enabled', 'disabled']: + params['switch'] = Switch + else: + raise CosClientError('switch must be enabled or disabled') + + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path=ChannelName) + logger.info("put live channel switch, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + self.send_request( + method='PUT', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params, key=ChannelName), + headers=headers, + params=params) + return None + + def get_live_channel_history(self, Bucket, ChannelName, **kwargs): + """获取直播通道推流历史 + + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :param kwargs(dict): 设置请求headers. + :return(dict). + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + resp = client.get_live_channel_history(Bucket='bucket', ChannelName='ch1') + """ + params = {'live': '', 'comp': 'history'} + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path=ChannelName) + logger.info("get live channel history, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + rt = self.send_request( + method='GET', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params, key=ChannelName), + headers=headers, + params=params) + data = xml_to_dict(rt.content) + format_dict(data, ['LiveRecord']) + return data + + def get_live_channel_status(self, Bucket, ChannelName, **kwargs): + """获取直播通道推流状态 + + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :param kwargs(dict): 设置请求headers. + :return(dict). + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + resp = client.get_live_channel_status(Bucket='bucket', ChannelName='ch1') + """ + params = {'live': '', 'comp': 'status'} + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path=ChannelName) + logger.info("get live channel status, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + rt = self.send_request( + method='GET', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params, key=ChannelName), + headers=headers, + params=params) + data = xml_to_dict(rt.content) + return data + + def delete_live_channel(self, Bucket, ChannelName, **kwargs): + """删除直播通道 + + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :param kwargs(dict): 设置请求headers. + :return(dict). + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + client.delete_live_channel(Bucket='bucket', ChannelName='ch1') + """ + params = {'live': ''} + url = self._conf.uri(bucket=Bucket, path=ChannelName) + headers = mapped(kwargs) + logger.info("delete live channel, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + rt = self.send_request( + method='DELETE', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params, key=ChannelName), + headers=headers, + params=params) + data = dict(**rt.headers) + return data + + def get_vod_playlist(self, Bucket, ChannelName, StartTime=0, EndTime=0, **kwargs): + """查询指定时间段播放列表文件 + + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :param StartTime(int): 播放列表ts文件的起始时间,格式为unix时间戳. + :param EndTime(int): 播放列表ts文件的结束时间,格式为unix时间戳. + :param kwargs(dict): 设置请求headers. + :return(string). + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + resp = client.get_vod_playlist(Bucket='bucket', ChannelName='ch1', StartTime=1611218201, EndTime=1611218300) + """ + if StartTime <= 0 or EndTime <= 0: + raise CosClientError('invalid timestamp') + if StartTime >= EndTime: + raise CosClientError('StartTime must be less than EndTime') + + params = {'vod': '', 'starttime': StartTime, 'endtime': EndTime} + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path=ChannelName) + logger.info("get vod playlist, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + rt = self.send_request( + method='GET', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params, key=ChannelName), + headers=headers, + params=params) + return rt.content + + def post_vod_playlist(self, Bucket, ChannelName, PlaylistName, StartTime=0, EndTime=0, **kwargs): + """生成点播播放列表文件 + + :param Bucket(string): 存储桶名称. + :param ChannelName(string): 直播通道名称. + :param PlaylistName(string): 播放列表文件名称. + :param StartTime(int): 播放列表ts文件的起始时间,格式为unix时间戳. + :param EndTime(int): 播放列表ts文件的结束时间,格式为unix时间戳. + :param kwargs(dict): 设置请求headers. + :return(None). + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + resp = client.post_vod_playlist(Bucket='bucket', ChannelName='ch1', PlaylistName='test.m3u8', StartTime=1611218201, EndTime=1611218300) + """ + if StartTime <= 0 or EndTime <= 0: + raise CosClientError('invalid timestamp') + if StartTime >= EndTime: + raise CosClientError('StartTime must be less than EndTime') + if not PlaylistName.endswith('.m3u8'): + raise CosClientError('PlaylistName must be end with .m3u8') + + params = {'vod': '', 'starttime': StartTime, 'endtime': EndTime} + headers = mapped(kwargs) + file_path = ChannelName + '/' + PlaylistName + url = self._conf.uri(bucket=Bucket, path=file_path) + logger.info("post vod playlist, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + rt = self.send_request( + method='POST', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params, key=file_path), + headers=headers, + params=params) + return None + + def list_live_channel(self, Bucket, MaxKeys=100, Prefix='', Marker='', **kwargs): + """获取直播通道列表 + + :param Bucket(string): 存储桶名称. + :param MaxKeys(int): 每页可以列出通道数量的最大值,有效值范围为[1, 1000],默认值:100. + :param Prefix(string): 限定返回的 LiveChannel 必须以 prefix 作为前缀. + :param Marker(string): 从 marker 之后按字母排序的第一个开始返回. + :param kwargs(dict): 设置请求headers. + :return: string. + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + resp = client.list_channel(Bucket='bucket', MaxKeys=100) + """ + params = {'live': ''} + if MaxKeys >= 1: + params['max-keys'] = MaxKeys + if Prefix != '': + params['prefix'] = Prefix + if Marker != '': + params['marker'] = Marker + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket) + logger.info("list live channel, url=:{url} ,headers=:{headers}".format(url=url, headers=headers)) + rt = self.send_request( + method='GET', + url=url, + bucket=Bucket, + auth=CosS3Auth(self._conf, params=params), + headers=headers, + params=params) + data = xml_to_dict(rt.content) + format_dict(data, ['LiveChannel']) + decode_result( + data, + [ + 'Prefix', + 'Marker', + 'MaxKeys', + 'IsTruncated', + 'NextMarker' + ], + [ + ['LiveChannel', 'Name'], + ]) + return data + if __name__ == "__main__": pass diff --git a/ut/test.py b/ut/test.py index cf77736f..43847a69 100644 --- a/ut/test.py +++ b/ut/test.py @@ -10,6 +10,9 @@ from qcloud_cos import CosConfig from qcloud_cos import CosServiceError from qcloud_cos import get_date +from qcloud_cos.cos_encryption_client import CosEncryptionClient +from qcloud_cos.crypto import AESProvider +from qcloud_cos.crypto import RSAProvider from qcloud_cos.cos_comm import CiDetectType SECRET_ID = os.environ["SECRET_ID"] @@ -27,6 +30,10 @@ SecretKey=SECRET_KEY, ) client = CosS3Client(conf, retry=3) +rsa_provider = RSAProvider() +client_for_rsa = CosEncryptionClient(conf, rsa_provider) +aes_provider = AESProvider() +client_for_aes = CosEncryptionClient(conf, aes_provider) def _create_test_bucket(test_bucket, create_region=None): @@ -884,8 +891,8 @@ def test_bucket_exists(): assert status is True -def test_put_get_bucket_policy(): - """设置获取bucket的policy配置""" +def test_put_get_delete_bucket_policy(): + """设置获取删除bucket的policy配置""" resource = "qcs::cos:" + REGION + ":uid/" + APPID + ":" + test_bucket + "/*" resource_list = [resource] policy = { @@ -913,6 +920,9 @@ def test_put_get_bucket_policy(): response = client.get_bucket_policy( Bucket=test_bucket, ) + response = client.delete_bucket_policy( + Bucket=test_bucket, + ) def test_put_file_like_object(): @@ -969,6 +979,12 @@ def test_put_get_delete_bucket_domain(): }, ] } + + response = client.delete_bucket_domain( + Bucket=test_bucket + ) + + time.sleep(2) response = client.put_bucket_domain( Bucket=test_bucket, DomainConfiguration=domain_config @@ -1100,8 +1116,8 @@ def _test_put_get_delete_bucket_origin(): ) -def test_put_get_bucket_referer(): - """测试设置获取bucket防盗链规则""" +def test_put_get_delete_bucket_referer(): + """测试设置获取删除bucket防盗链规则""" referer_config = { 'Status': 'Enabled', 'RefererType': 'White-List', @@ -1121,6 +1137,14 @@ def test_put_get_bucket_referer(): response = client.get_bucket_referer( Bucket=test_bucket, ) + response = client.delete_bucket_referer( + Bucket=test_bucket, + ) + time.sleep(4) + response = client.get_bucket_referer( + Bucket=test_bucket, + ) + assert len(response)==0 def test_put_get_traffic_limit(): @@ -1259,6 +1283,252 @@ def test_put_get_bucket_intelligenttiering(): Bucket=test_bucket, ) +def test_bucket_encryption(): + """测试存储桶默认加密配置""" + # 测试设置存储桶的默认加密配置 + config_dict = { + 'Rule': [ + { + 'ApplySideEncryptionConfiguration': { + 'SSEAlgorithm': 'AES256', + } + }, + ] + } + client.put_bucket_encryption(test_bucket, config_dict) + + # 测试获取存储桶默认加密配置 + ret = client.get_bucket_encryption(test_bucket) + sse_algorithm = ret['Rule'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'] + assert(sse_algorithm == 'AES256') + + # 删除存储桶默认加密配置 + client.delete_bucket_encryption(test_bucket) + +def test_aes_client(): + """测试aes加密客户端的上传下载操作""" + content = '123456' * 1024 + '1' + client_for_aes.delete_object(test_bucket, 'test_for_aes') + client_for_aes.put_object(test_bucket, content, 'test_for_aes') + # 测试整个文件的md5 + response = client_for_aes.get_object(test_bucket, 'test_for_aes') + response['Body'].get_stream_to_file('test_for_aes_local') + local_file_md5 = None + content_md5 = None + with open('test_for_aes_local', 'rb') as f: + local_file_md5 = get_raw_md5(f.read()) + content_md5 = get_raw_md5(content.encode("utf-8")) + assert local_file_md5 and content_md5 and local_file_md5 == content_md5 + if os.path.exists('test_for_aes_local'): + os.remove('test_for_aes_local') + + # 测试读取部分数据的md5 + response = client_for_aes.get_object(test_bucket, 'test_for_aes', Range='bytes=5-3000') + response['Body'].get_stream_to_file('test_for_aes_local') + with open('test_for_aes_local', 'rb') as f: + local_file_md5 = get_raw_md5(f.read()) + content_md5 = get_raw_md5(content[5:3001].encode("utf-8")) + assert local_file_md5 and content_md5 and local_file_md5 == content_md5 + if os.path.exists('test_for_aes_local'): + os.remove('test_for_aes_local') + + client_for_aes.delete_object(test_bucket, 'test_for_aes') + + content = '1' * 1024 * 1024 + # 测试分片上传 + client_for_rsa.delete_object(test_bucket, 'test_multi_upload') + response = client_for_aes.create_multipart_upload(test_bucket, 'test_multi_upload') + uploadid = response['UploadId'] + client_for_aes.upload_part(test_bucket, 'test_multi_upload', content, 1, uploadid) + client_for_aes.upload_part(test_bucket,'test_multi_upload', content, 2, uploadid) + response = client_for_aes.list_parts(test_bucket,'test_multi_upload', uploadid) + client_for_aes.complete_multipart_upload(test_bucket, 'test_multi_upload', uploadid, {'Part':response['Part']}) + response = client_for_aes.get_object(test_bucket, 'test_multi_upload') + response['Body'].get_stream_to_file('test_multi_upload_local') + with open('test_multi_upload_local', 'rb') as f: + local_file_md5 = get_raw_md5(f.read()) + content_md5 = get_raw_md5((content+content).encode("utf-8")) + assert local_file_md5 and content_md5 and local_file_md5 == content_md5 + if os.path.exists('test_multi_upload_local'): + os.remove('test_multi_upload_local') + + client_for_rsa.delete_object(test_bucket, 'test_multi_upload') + +def test_rsa_client(): + """测试rsa加密客户端的上传下载操作""" + content = '123456' * 1024 + '1' + client_for_rsa.delete_object(test_bucket, 'test_for_rsa') + client_for_rsa.put_object(test_bucket, content, 'test_for_rsa') + # 测试整个文件的md5 + response = client_for_rsa.get_object(test_bucket, 'test_for_rsa') + response['Body'].get_stream_to_file('test_for_rsa_local') + local_file_md5 = None + content_md5 = None + with open('test_for_rsa_local', 'rb') as f: + local_file_md5 = get_raw_md5(f.read()) + content_md5 = get_raw_md5(content.encode("utf-8")) + assert local_file_md5 and content_md5 and local_file_md5 == content_md5 + if os.path.exists('test_for_rsa_local'): + os.remove('test_for_rsa_local') + + # 测试读取部分数据的md5 + response = client_for_rsa.get_object(test_bucket, 'test_for_rsa', Range='bytes=5-3000') + response['Body'].get_stream_to_file('test_for_rsa_local') + with open('test_for_rsa_local', 'rb') as f: + local_file_md5 = get_raw_md5(f.read()) + content_md5 = get_raw_md5(content[5:3001].encode("utf-8")) + assert local_file_md5 and content_md5 and local_file_md5 == content_md5 + if os.path.exists('test_for_rsa_local'): + os.remove('test_for_rsa_local') + + client_for_rsa.delete_object(test_bucket, 'test_for_rsa') + + content = '1' * 1024 * 1024 + # 测试分片上传 + client_for_rsa.delete_object(test_bucket, 'test_multi_upload') + response = client_for_rsa.create_multipart_upload(test_bucket, 'test_multi_upload') + uploadid = response['UploadId'] + client_for_rsa.upload_part(test_bucket, 'test_multi_upload', content, 1, uploadid) + client_for_rsa.upload_part(test_bucket,'test_multi_upload', content, 2, uploadid) + response = client_for_rsa.list_parts(test_bucket,'test_multi_upload', uploadid) + client_for_rsa.complete_multipart_upload(test_bucket, 'test_multi_upload', uploadid, {'Part':response['Part']}) + response = client_for_rsa.get_object(test_bucket, 'test_multi_upload') + response['Body'].get_stream_to_file('test_multi_upload_local') + with open('test_multi_upload_local', 'rb') as f: + local_file_md5 = get_raw_md5(f.read()) + content_md5 = get_raw_md5((content+content).encode("utf-8")) + assert local_file_md5 and content_md5 and local_file_md5 == content_md5 + if os.path.exists('test_multi_upload_local'): + os.remove('test_multi_upload_local') + + client_for_rsa.delete_object(test_bucket, 'test_multi_upload') + + +def test_live_channel(): + """测试rtmp推流功能""" + livechannel_config = { + 'Description': 'cos python sdk test', + 'Switch': 'Enabled', + 'Target': { + 'Type': 'HLS', + 'FragDuration': '3', + 'FragCount': '5', + } + } + channel_name = 'cos-python-sdk-uttest-ch1' + + try: + response = client.put_live_channel( + Bucket=test_bucket, + ChannelName=channel_name, + LiveChannelConfiguration=livechannel_config) + assert (response) + except Exception as e: + if e.get_error_code() != 'ChannelStillLive': + return + + print("get live channel info...") + response = client.get_live_channel_info( + Bucket=test_bucket, + ChannelName=channel_name) + print(response) + assert (response['Switch'] == 'Enabled') + assert (response['Description'] == 'cos python sdk test') + assert (response['Target']['Type'] == 'HLS') + assert (response['Target']['FragDuration'] == '3') + assert (response['Target']['FragCount'] == '5') + assert (response['Target']['PlaylistName'] == 'playlist.m3u8') + + print("put live channel switch...") + client.put_live_channel_switch( + Bucket=test_bucket, + ChannelName=channel_name, + Switch='disabled') + response = client.get_live_channel_info( + Bucket=test_bucket, + ChannelName=channel_name) + assert (response['Switch'] == 'Disabled') + client.put_live_channel_switch( + Bucket=test_bucket, + ChannelName=channel_name, + Switch='enabled') + response = client.get_live_channel_info( + Bucket=test_bucket, + ChannelName=channel_name) + assert (response['Switch'] == 'Enabled') + + print("get live channel history...") + response = client.get_live_channel_history( + Bucket=test_bucket, + ChannelName=channel_name) + print(response) + + print("get live channel status...") + response = client.get_live_channel_status( + Bucket=test_bucket, + ChannelName=channel_name) + print(response) + assert (response['Status'] == 'Idle' or response['Status'] == 'Live') + + print("list channel...") + create_chan_num = 20 + for i in range(1, create_chan_num): + ch_name = 'test-list-channel-' + str(i) + client.put_live_channel( + Bucket=test_bucket, + ChannelName=ch_name, + LiveChannelConfiguration=livechannel_config) + response = client.list_live_channel(Bucket=test_bucket, MaxKeys=10) + print(response) + assert (response['MaxKeys'] == '10') + assert (response['IsTruncated'] == 'true') + response = client.list_live_channel(Bucket=test_bucket, MaxKeys=5, Marker=response['NextMarker']) + print(response) + assert (response['MaxKeys'] == '5') + assert (response['IsTruncated'] == 'true') + + for i in range(1, create_chan_num): + ch_name = 'test-list-channel-' + str(i) + client.delete_live_channel(Bucket=test_bucket, ChannelName=ch_name) + + print("post vod playlist") + '''playlist不以.m3u8结尾''' + try: + client.post_vod_playlist( + Bucket=test_bucket, + ChannelName=channel_name, + PlaylistName='test', + StartTime=int(time.time()) - 10000, + EndTime=int(time.time())) + except Exception as e: + pass + + '''starttime大于endtimne''' + try: + client.post_vod_playlist( + Bucket=test_bucket, + ChannelName=channel_name, + PlaylistName='test.m3u8', + StartTime=10, + EndTime=9) + except Exception as e: + pass + + client.post_vod_playlist( + Bucket=test_bucket, + ChannelName=channel_name, + PlaylistName='test.m3u8', + StartTime=int(time.time()) - 10000, + EndTime=int(time.time())) + response = client.head_object( + Bucket=test_bucket, + Key=channel_name + '/test.m3u8') + assert (response) + + print("delete live channel...") + response = client.delete_live_channel(Bucket=test_bucket, ChannelName=channel_name) + assert (response) + if __name__ == "__main__": setUp() """ @@ -1285,8 +1555,10 @@ def test_put_get_bucket_intelligenttiering(): test_put_get_delete_bucket_domain() test_select_object() _test_get_object_sensitive_content_recognition() + test_live_channel() test_download_file() test_put_get_bucket_intelligenttiering() + test_aes_client() + test_rsa_client() """ - tearDown() From a9d70159a7b69cba5638e786e9877ddf6cb48b0a Mon Sep 17 00:00:00 2001 From: jayzhenghan Date: Wed, 7 Apr 2021 11:29:31 +0800 Subject: [PATCH 4/4] add put/get bucket intelligenttiering --- qcloud_cos/cos_client.py | 1 + ut/test.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index 20911826..ddfe5309 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -2847,6 +2847,7 @@ def get_bucket_referer(self, Bucket, **kwargs): def delete_bucket_referer(self, Bucket, **kwargs): """删除bucket防盗链规则 + :param Bucket(string): 存储桶名称. :param kwargs(dict): 设置请求headers. :return(dict): None. diff --git a/ut/test.py b/ut/test.py index 53bdf87c..69d45595 100644 --- a/ut/test.py +++ b/ut/test.py @@ -1356,6 +1356,35 @@ def test_aes_client(): if os.path.exists('test_multi_upload_local'): os.remove('test_multi_upload_local') + client_for_rsa.delete_object(test_bucket, 'test_multi_upload') + +def test_rsa_client(): + """测试rsa加密客户端的上传下载操作""" + content = '123456' * 1024 + '1' + client_for_rsa.delete_object(test_bucket, 'test_for_rsa') + client_for_rsa.put_object(test_bucket, content, 'test_for_rsa') + # 测试整个文件的md5 + response = client_for_rsa.get_object(test_bucket, 'test_for_rsa') + response['Body'].get_stream_to_file('test_for_rsa_local') + local_file_md5 = None + content_md5 = None + with open('test_for_rsa_local', 'rb') as f: + local_file_md5 = get_raw_md5(f.read()) + content_md5 = get_raw_md5(content.encode("utf-8")) + assert local_file_md5 and content_md5 and local_file_md5 == content_md5 + if os.path.exists('test_for_rsa_local'): + os.remove('test_for_rsa_local') + + # 测试读取部分数据的md5 + response = client_for_rsa.get_object(test_bucket, 'test_for_rsa', Range='bytes=5-3000') + response['Body'].get_stream_to_file('test_for_rsa_local') + with open('test_for_rsa_local', 'rb') as f: + local_file_md5 = get_raw_md5(f.read()) + content_md5 = get_raw_md5(content[5:3001].encode("utf-8")) + assert local_file_md5 and content_md5 and local_file_md5 == content_md5 + if os.path.exists('test_for_rsa_local'): + os.remove('test_for_rsa_local') + client_for_rsa.delete_object(test_bucket, 'test_for_rsa') content = '1' * 1024 * 1024 @@ -1375,6 +1404,7 @@ def test_aes_client(): assert local_file_md5 and content_md5 and local_file_md5 == content_md5 if os.path.exists('test_multi_upload_local'): os.remove('test_multi_upload_local') + client_for_rsa.delete_object(test_bucket, 'test_multi_upload')