diff --git a/demo/ci_media.py b/demo/ci_media.py index 9acd1479..608e1e1d 100644 --- a/demo/ci_media.py +++ b/demo/ci_media.py @@ -52,6 +52,16 @@ def ci_get_media_queue(): return response +def ci_open_pic_bucket(): + # 图片处理异步服务开通 + response, data = client.ci_open_pic_bucket( + Bucket=bucket_name, + ) + print(response) + print(data) + return response, data + + def ci_get_pic_bucket(): # 查询图片处理异步服务开通状态 response = client.ci_get_pic_bucket( @@ -65,6 +75,16 @@ def ci_get_pic_bucket(): return response +def ci_close_pic_bucket(): + # 图片处理异步服务关闭 + response, data = client.ci_close_pic_bucket( + Bucket=bucket_name, + ) + print(response) + print(data) + return response, data + + def ci_get_ai_bucket(): # 查询ai处理异步服务开通状态 response = client.ci_get_ai_bucket( @@ -1970,7 +1990,9 @@ def ci_update_hls_play_key(): # ci_create_image_inspect_jobs() # ci_create_sound_hound_jobs() # ci_list_inventory_trigger_jobs() + # ci_open_pic_bucket() # ci_get_pic_bucket() + # ci_close_pic_bucket() # ci_get_inventory_trigger_jobs() # ci_get_ai_bucket() # ci_get_ai_queue() diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index aa1cecd6..0b0a0b82 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -7316,6 +7316,63 @@ def ci_update_media_queue(self, Bucket, QueueId, Request={}, UrlPath="/queue/", format_dict(data, ['Queue']) return data + def ci_open_pic_bucket(self, Bucket, **kwargs): + """ 开通图片处理异步服务 https://cloud.tencent.com/document/product/460/95749 + + :param Bucket(string) 存储桶名称. + :param kwargs:(dict) 设置上传的headers. + :return(dict): response header. + :return(dict): 请求成功返回的结果, dict类型. + + . code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 开通图片处理异步服务 + response, data = client.ci_open_pic_bucket( + Bucket='bucket') + print data + print response + """ + headers = mapped(kwargs) + final_headers = {} + params = {} + for key in headers: + if key.startswith("response"): + params[key] = headers[key] + else: + final_headers[key] = headers[key] + headers = final_headers + + params = format_values(params) + + path = "/" + "picbucket" + url = self._conf.uri(bucket=Bucket, path=path, endpoint=self._conf._endpoint_ci) + + logger.info("ci_open_pic_bucket result, url=:{url} ,headers=:{headers}, params=:{params}".format( + url=url, + headers=headers, + params=params)) + rt = self.send_request( + method='POST', + url=url, + auth=CosS3Auth(self._conf, path, params=params), + params=params, + headers=headers, + ci_request=True) + + data = rt.content + response = dict(**rt.headers) + if 'Content-Type' in response: + if response['Content-Type'] == 'application/xml' and 'Content-Length' in response and \ + response['Content-Length'] != 0: + data = xml_to_dict(rt.content) + format_dict(data, ['Response']) + elif response['Content-Type'].startswith('application/json'): + data = rt.json() + + return response, data + def ci_get_pic_bucket(self, Regions='', BucketName='', BucketNames='', PageNumber='', PageSize='', **kwargs): """查询图片异步处理开通状态接口 @@ -7345,6 +7402,63 @@ def ci_get_pic_bucket(self, Regions='', BucketName='', BucketNames='', PageNumbe BucketNames=BucketNames, PageNumber=PageNumber, PageSize=PageSize, Path='/picbucket', **kwargs) + def ci_close_pic_bucket(self, Bucket, **kwargs): + """ 关闭图片处理异步服务 https://cloud.tencent.com/document/product/460/95751 + + :param Bucket(string) 存储桶名称. + :param kwargs:(dict) 设置上传的headers. + :return(dict): response header. + :return(dict): 请求成功返回的结果, dict类型. + + . code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 关闭图片处理异步服务 + response, data = client.ci_close_pic_bucket( + Bucket='bucket') + print data + print response + """ + headers = mapped(kwargs) + final_headers = {} + params = {} + for key in headers: + if key.startswith("response"): + params[key] = headers[key] + else: + final_headers[key] = headers[key] + headers = final_headers + + params = format_values(params) + + path = "/" + "picbucket" + url = self._conf.uri(bucket=Bucket, path=path, endpoint=self._conf._endpoint_ci) + + logger.info("ci_close_pic_bucket result, url=:{url} ,headers=:{headers}, params=:{params}".format( + url=url, + headers=headers, + params=params)) + rt = self.send_request( + method='DELETE', + url=url, + auth=CosS3Auth(self._conf, path, params=params), + params=params, + headers=headers, + ci_request=True) + + data = rt.content + response = dict(**rt.headers) + if 'Content-Type' in response: + if response['Content-Type'] == 'application/xml' and 'Content-Length' in response and \ + response['Content-Length'] != 0: + data = xml_to_dict(rt.content) + format_dict(data, ['Response']) + elif response['Content-Type'].startswith('application/json'): + data = rt.json() + + return response, data + def ci_update_media_pic_queue(self, Bucket, QueueId, Request={}, **kwargs): """ 更新图片处理队列接口 diff --git a/ut/test.py b/ut/test.py index 18a1855c..975f77ae 100644 --- a/ut/test.py +++ b/ut/test.py @@ -3623,7 +3623,7 @@ def test_append_object(): def test_ci_delete_asr_template(): - response = client.ci_get_asr_bucket(BucketName=ci_bucket_name) + response = client.ci_get_asr_bucket(BucketName=ci_bucket_name, PageSize="1") if response['TotalCount'] == '0': kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"} client.ci_open_asr_bucket( @@ -3640,7 +3640,7 @@ def test_ci_delete_asr_template(): def test_ci_get_asr_template(): # 获取语音识别模板 - response = client.ci_get_asr_bucket(BucketName=ci_bucket_name) + response = client.ci_get_asr_bucket(BucketName=ci_bucket_name, PageSize="1") if response['TotalCount'] == '0': kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"} client.ci_open_asr_bucket( @@ -3658,7 +3658,7 @@ def test_ci_get_asr_template(): def test_ci_create_asr_template(): # 创建语音识别模板 - response = client.ci_get_asr_bucket(BucketName=ci_bucket_name) + response = client.ci_get_asr_bucket(BucketName=ci_bucket_name, PageSize="1") if response['TotalCount'] == '0': client.ci_open_asr_bucket( Bucket=ci_bucket_name, @@ -3702,7 +3702,7 @@ def test_ci_create_asr_template(): def test_ci_list_asr_jobs(): - response = client.ci_get_asr_bucket(BucketName=ci_bucket_name) + response = client.ci_get_asr_bucket(BucketName=ci_bucket_name, PageSize="1") if response['TotalCount'] == '0': client.ci_open_asr_bucket( Bucket=ci_bucket_name, @@ -3732,7 +3732,7 @@ def test_ci_list_asr_jobs(): def test_ci_get_asr_jobs(): # 获取语音识别任务信息 - response = client.ci_get_asr_bucket(BucketName=ci_bucket_name) + response = client.ci_get_asr_bucket(BucketName=ci_bucket_name, PageSize="1") if response['TotalCount'] == '0': client.ci_open_asr_bucket( Bucket=ci_bucket_name, @@ -3747,7 +3747,7 @@ def test_ci_get_asr_jobs(): def test_ci_create_asr_jobs(): - response = client.ci_get_asr_bucket(BucketName=ci_bucket_name) + response = client.ci_get_asr_bucket(BucketName=ci_bucket_name, PageSize="1") if response['TotalCount'] == '0': client.ci_open_asr_bucket( Bucket=ci_bucket_name, @@ -3787,7 +3787,7 @@ def test_ci_create_asr_jobs(): def test_ci_put_asr_queue(): - response = client.ci_get_asr_bucket(BucketName=ci_bucket_name) + response = client.ci_get_asr_bucket(BucketName=ci_bucket_name, PageSize="1") if response['TotalCount'] == '0': client.ci_open_asr_bucket( Bucket=ci_bucket_name, @@ -3820,7 +3820,7 @@ def test_ci_put_asr_queue(): def test_ci_get_asr_queue(): # 查询语音识别队列信息 - response = client.ci_get_asr_bucket(BucketName=ci_bucket_name) + response = client.ci_get_asr_bucket(BucketName=ci_bucket_name, PageSize="1") if response['TotalCount'] == '0': client.ci_open_asr_bucket( Bucket=ci_bucket_name, @@ -3946,28 +3946,6 @@ def test_ci_list_workflowexecution(): assert response -def test_ci_trigger_workflow(): - # 触发工作流接口 - kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"} - response = client.ci_trigger_workflow( - Bucket=ci_bucket_name, - WorkflowId='w5307ee7a60d6489383c3921c715dd1c5', - Key=ci_test_image, - **kwargs - ) - assert response - print(response) - instance_id = response['InstanceId'] - # 查询工作流实例接口 - kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"} - response = client.ci_get_workflowexecution( - Bucket=ci_bucket_name, - RunId=instance_id, - **kwargs - ) - assert response - - def test_ci_get_media_transcode_jobs(): # 转码任务详情 kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"} @@ -4592,6 +4570,7 @@ def test_ci_get_ai_bucket(): kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"} response = client.ci_get_ai_bucket( BucketName=ci_bucket_name, + PageSize = "1", **kwargs ) print(response) @@ -4769,6 +4748,8 @@ def test_ci_workflow(): ) assert response['MediaWorkflow']['State'] == 'Paused' + ci_trigger_workflow(workflow_id) + response = client.ci_delete_workflow( Bucket=ci_bucket_name, # 桶名称 WorkflowId=workflow_id, # 需要删除的工作流ID @@ -4777,6 +4758,27 @@ def test_ci_workflow(): print(response) assert response['WorkflowId'] == workflow_id +def ci_trigger_workflow(workflow_id): + # 触发工作流接口 + kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"} + response = client.ci_trigger_workflow( + Bucket=ci_bucket_name, + WorkflowId=workflow_id, + Key=ci_test_image, + **kwargs + ) + assert response + print(response) + instance_id = response['InstanceId'] + # 查询工作流实例接口 + kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"} + response = client.ci_get_workflowexecution( + Bucket=ci_bucket_name, + RunId=instance_id, + **kwargs + ) + assert response + def test_ci_auditing_report_badcase(): if TEST_CI != 'true': @@ -5905,6 +5907,31 @@ def test_ci_ai_bucket(): assert data['AiBucket']['Name'] == ci_bucket_name +def test_ci_pic_bucket(): + # 关闭图片处理异步服务 + response, data = ai_recognition_client.ci_close_pic_bucket( + Bucket=ci_bucket_name + ) + assert data['BucketName'] == ci_bucket_name + # 开通图片处理异步服务 + response, data = ai_recognition_client.ci_open_pic_bucket( + Bucket=ci_bucket_name + ) + assert data['PicBucket']['Name'] == ci_bucket_name + + response, data = ai_recognition_client.ci_close_pic_bucket( + Bucket=ci_bucket_name, + Accept="application/json" + ) + assert data['BucketName'] == ci_bucket_name + # 开通AI内容识别服务 + response, data = ai_recognition_client.ci_open_pic_bucket( + Bucket=ci_bucket_name, + Accept="application/json" + ) + assert data['PicBucket']['Name'] == ci_bucket_name + + def test_ci_hls_play_key(): kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}