diff --git a/demo/ci_media.py b/demo/ci_media.py
index 099a0a59..15c6dd24 100644
--- a/demo/ci_media.py
+++ b/demo/ci_media.py
@@ -929,7 +929,7 @@ def ci_list_media_pic_jobs():
def ci_get_media_pic_jobs():
# 图片处理任务详情
- response = client.ci_get_media_jobs(
+ response = client.ci_get_media_pic_jobs(
Bucket=bucket_name,
JobIDs='c01742xxxxxxxxxxxxxxxxxx7438e39',
ContentType='application/xml'
diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py
index 2273682e..9e1a28e0 100644
--- a/qcloud_cos/cos_client.py
+++ b/qcloud_cos/cos_client.py
@@ -42,7 +42,7 @@ class CosConfig(object):
def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token=None, CredentialInstance=None, Scheme=None, Timeout=None,
Access_id=None, Access_key=None, Secret_id=None, Secret_key=None, Endpoint=None, IP=None, Port=None,
- Anonymous=None, UA=None, Proxies=None, Domain=None, ServiceDomain=None, PoolConnections=10,
+ Anonymous=None, UA=None, Proxies=None, Domain=None, ServiceDomain=None, KeepAlive=True, PoolConnections=10,
PoolMaxSize=10, AllowRedirects=False, SignHost=True, EndpointCi=None, EndpointPic=None, EnableOldDomain=True, EnableInternalDomain=True):
"""初始化,保存用户的信息
@@ -65,6 +65,7 @@ def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token
:param Proxies(dict): 使用代理来访问COS
:param Domain(string): 使用自定义的域名来访问COS
:param ServiceDomain(string): 使用自定义的域名来访问cos service
+ :param KeepAlive(bool): 是否使用长连接
:param PoolConnections(int): 连接池个数
:param PoolMaxSize(int): 连接池中最大连接数
:param AllowRedirects(bool): 是否重定向
@@ -87,6 +88,7 @@ def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token
self._proxies = Proxies
self._domain = Domain
self._service_domain = ServiceDomain
+ self._keep_alive = KeepAlive
self._pool_connections = PoolConnections
self._pool_maxsize = PoolMaxSize
self._allow_redirects = AllowRedirects
@@ -332,6 +334,8 @@ def send_request(self, method, url, bucket, timeout=30, cos_request=True, **kwar
kwargs['headers']['Host'] = self._conf._domain
elif bucket is not None:
kwargs['headers']['Host'] = self._conf.get_host(bucket)
+ if self._conf._keep_alive == False:
+ kwargs['headers']['Connection'] = 'close'
kwargs['headers'] = format_values(kwargs['headers'])
file_position = None
@@ -1273,7 +1277,7 @@ def select_object_content(self, Bucket, Key, Expression, ExpressionType, InputSe
return data
# s3 bucket interface begin
- def create_bucket(self, Bucket, BucketAZConfig=None, **kwargs):
+ def create_bucket(self, Bucket, BucketAZConfig=None, BucketArchConfig=None, **kwargs):
"""创建一个bucket
:param Bucket(string): 存储桶名称. 存储桶名称不支持大写字母,COS 后端会将用户传入的大写字母自动转换为小写字母用于创建存储桶.
@@ -1297,11 +1301,16 @@ def create_bucket(self, Bucket, BucketAZConfig=None, **kwargs):
"""
headers = mapped(kwargs)
xml_config = None
+ bucket_config = dict()
if BucketAZConfig == 'MAZ':
- bucket_config = {'BucketAZConfig': 'MAZ'}
+ bucket_config.update({'BucketAZConfig': 'MAZ'})
+ if BucketArchConfig == 'OFS':
+ bucket_config.update({'BucketArchConfig': 'OFS'})
+ if len(bucket_config) != 0:
xml_config = format_xml(data=bucket_config, root='CreateBucketConfiguration')
headers['Content-MD5'] = get_md5(xml_config)
headers['Content-Type'] = 'application/xml'
+
url = self._conf.uri(bucket=Bucket)
logger.info("create bucket, url=:{url} ,headers=:{headers}".format(
url=url,
@@ -3531,8 +3540,8 @@ def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAXThread=5, Ena
def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, EnableMD5=False, progress_callback=None,
**kwargs):
- """小于等于20MB的文件简单上传,大于20MB的文件使用分块上传
+ """
:param Bucket(string): 存储桶名称.
:param key(string): 分块上传路径名.
:param LocalFilePath(string): 本地文件路径名.
@@ -3557,7 +3566,7 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, Enabl
)
"""
file_size = os.path.getsize(LocalFilePath)
- if file_size <= 1024 * 1024 * 20:
+ if file_size <= 1024 * 1024 * PartSize:
with open(LocalFilePath, 'rb') as fp:
rt = self.put_object(Bucket=Bucket, Key=Key, Body=fp, EnableMD5=EnableMD5, **kwargs)
return rt
@@ -7012,9 +7021,9 @@ def ci_list_workflowexecution(self, Bucket, WorkflowId, Name='', StartCreationTi
NextToken=to_unicode('nextToken='+NextToken)
)
if StartCreationTime is not None:
- url = u"{url}&{StartCreationTime}".format(StartCreationTime=to_unicode('startCreationTime='+StartCreationTime))
+ url = u"{url}&{StartCreationTime}".format(url=to_unicode(url), StartCreationTime=quote(to_bytes(to_unicode('startCreationTime='+StartCreationTime)), b'/-_.~='))
if EndCreationTime is not None:
- url = u"{url}&{EndCreationTime}".format(EndCreationTime=to_unicode('endCreationTime='+EndCreationTime))
+ url = u"{url}&{EndCreationTime}".format(url=to_unicode(url), EndCreationTime=quote(to_bytes(to_unicode('endCreationTime='+EndCreationTime)), b'/-_.~='))
logger.info("ci_list_workflowexecution result, url=:{url} ,headers=:{headers}, params=:{params}".format(
url=url,
headers=headers,
@@ -7518,9 +7527,11 @@ def ci_list_doc_jobs(self, Bucket, QueueId, StartCreationTime=None, EndCreationT
NextToken=to_unicode('nextToken='+NextToken)
)
if StartCreationTime is not None:
- url = u"{url}&{StartCreationTime}".format(StartCreationTime=to_unicode('startCreationTime='+StartCreationTime))
+ url = u"{url}&{StartCreationTime}".format(url=to_unicode(url),
+ StartCreationTime=quote(to_bytes(to_unicode('startCreationTime='+StartCreationTime)), b'/-_.~='))
if EndCreationTime is not None:
- url = u"{url}&{EndCreationTime}".format(EndCreationTime=to_unicode('endCreationTime='+EndCreationTime))
+ url = u"{url}&{EndCreationTime}".format(url=to_unicode(url),
+ EndCreationTime=quote(to_bytes(to_unicode('endCreationTime='+EndCreationTime)), b'/-_.~='))
logger.info("list_doc_jobs result, url=:{url} ,headers=:{headers}, params=:{params}".format(
url=url,
headers=headers,
@@ -8130,9 +8141,11 @@ def ci_list_asr_jobs(self, Bucket, QueueId, StartCreationTime=None, EndCreationT
NextToken=to_unicode('nextToken='+NextToken)
)
if StartCreationTime is not None:
- url = u"{url}&{StartCreationTime}".format(StartCreationTime=to_unicode('startCreationTime='+StartCreationTime))
+ url = u"{url}&{StartCreationTime}".format(url=to_unicode(url),
+ StartCreationTime=quote(to_bytes(to_unicode('startCreationTime='+StartCreationTime)), b'/-_.~='))
if EndCreationTime is not None:
- url = u"{url}&{EndCreationTime}".format(EndCreationTime=to_unicode('endCreationTime='+EndCreationTime))
+ url = u"{url}&{EndCreationTime}".format(url=to_unicode(url),
+ EndCreationTime=quote(to_bytes(to_unicode('endCreationTime='+EndCreationTime)), b'/-_.~='))
logger.info("list_asr_jobs result, url=:{url} ,headers=:{headers}, params=:{params}".format(
url=url,
headers=headers,
diff --git a/qcloud_cos/cos_comm.py b/qcloud_cos/cos_comm.py
index c758d4bd..2be7237b 100644
--- a/qcloud_cos/cos_comm.py
+++ b/qcloud_cos/cos_comm.py
@@ -178,13 +178,13 @@ def xml_to_dict(data, origin_str="", replace_str=""):
return xmldict
-def get_id_from_xml(data, name):
- """解析xml中的特定字段"""
- tree = xml.dom.minidom.parseString(data)
- root = tree.documentElement
- result = root.getElementsByTagName(name)
- # use childNodes to get a list, if has no child get itself
- return result[0].childNodes[0].nodeValue
+# def get_id_from_xml(data, name):
+# """解析xml中的特定字段"""
+# tree = xml.dom.minidom.parseString(data)
+# root = tree.documentElement
+# result = root.getElementsByTagName(name)
+# # use childNodes to get a list, if has no child get itself
+# return result[0].childNodes[0].nodeValue
def mapped(headers):
diff --git a/qcloud_cos/cos_threadpool.py b/qcloud_cos/cos_threadpool.py
index dcafecce..6a9548cc 100644
--- a/qcloud_cos/cos_threadpool.py
+++ b/qcloud_cos/cos_threadpool.py
@@ -86,25 +86,26 @@ def get_result(self):
if __name__ == '__main__':
+ pass
- pool = SimpleThreadPool(2)
+ # pool = SimpleThreadPool(2)
- def task_sleep(x):
- from time import sleep
- sleep(x)
- return 'hello, sleep %d seconds' % x
+ # def task_sleep(x):
+ # from time import sleep
+ # sleep(x)
+ # return 'hello, sleep %d seconds' % x
- def raise_exception():
- raise ValueError("Pa! Exception!")
+ # def raise_exception():
+ # raise ValueError("Pa! Exception!")
- for i in range(1000):
- pool.add_task(task_sleep, 0.001)
- print(i)
- pool.add_task(task_sleep, 0)
- pool.add_task(task_sleep, 0)
+ # for i in range(1000):
+ # pool.add_task(task_sleep, 0.001)
+ # print(i)
+ # pool.add_task(task_sleep, 0)
+ # pool.add_task(task_sleep, 0)
# pool.add_task(raise_exception)
# pool.add_task(raise_exception)
- pool.wait_completion()
- print(pool.get_result())
+ # pool.wait_completion()
+ # print(pool.get_result())
# [(1, 0, ['hello, sleep 5 seconds']), (2, 1, ['hello, sleep 2 seconds', 'hello, sleep 3 seconds', ValueError('Pa! Exception!',)])]
diff --git a/qcloud_cos/xml2dict.py b/qcloud_cos/xml2dict.py
index 3496fedf..a9c4ab1d 100644
--- a/qcloud_cos/xml2dict.py
+++ b/qcloud_cos/xml2dict.py
@@ -36,13 +36,14 @@ def updateDict(self, aDict):
if __name__ == "__main__":
- s = """
-
- 10
- 1test1
- 2test2
- 3test3
- """
- root = xml.etree.ElementTree.fromstring(s)
- xmldict = Xml2Dict(root)
- print(xmldict)
+ pass
+ # s = """
+ #
+ # 10
+ # 1test1
+ # 2test2
+ # 3test3
+ # """
+ # root = xml.etree.ElementTree.fromstring(s)
+ # xmldict = Xml2Dict(root)
+ # print(xmldict)
diff --git a/ut/test.py b/ut/test.py
index c63f541b..4eab6d62 100644
--- a/ut/test.py
+++ b/ut/test.py
@@ -17,14 +17,14 @@
from qcloud_cos.crypto import RSAProvider
from qcloud_cos.cos_comm import CiDetectType, get_md5, to_bytes
-SECRET_ID = os.environ["SECRET_ID"]
-SECRET_KEY = os.environ["SECRET_KEY"]
+SECRET_ID = os.environ["COS_SECRET_ID"]
+SECRET_KEY = os.environ["COS_SECRET_KEY"]
TRAVIS_FLAG = os.environ["TRAVIS_FLAG"]
-REGION = os.environ["REGION"]
-APPID = '1251668577'
+REGION = os.environ['COS_REGION']
+APPID = os.environ['COS_APPID']
TEST_CI = os.environ["TEST_CI"]
USE_CREDENTIAL_INST = os.environ["USE_CREDENTIAL_INST"]
-test_bucket = 'cos-python-v5-testbkt-' + str(sys.version_info[0]) + '-' + str(
+test_bucket = 'cos-python-v5-test-' + str(sys.version_info[0]) + '-' + str(
sys.version_info[1]) + '-' + REGION + '-' + APPID
copy_test_bucket = 'copy-' + test_bucket
test_object = "test.txt"
@@ -62,8 +62,14 @@ def token(self):
aes_provider = AESProvider()
client_for_aes = CosEncryptionClient(conf, aes_provider)
-ci_bucket_name = 'ci-qta-gz-1251668577'
+ci_bucket_name = 'cos-python-v5-test-ci-' + APPID
ci_region = 'ap-guangzhou'
+ci_test_media = "test.mp4"
+ci_test_m3u8 = "test.m3u8"
+ci_test_image = "test.png"
+ci_test_ocr_image = "ocr.jpeg"
+ci_test_txt = "test.txt"
+ci_test_car_image = "car.jpeg"
def _create_test_bucket(test_bucket, create_region=None):
@@ -85,10 +91,39 @@ def _create_test_bucket(test_bucket, create_region=None):
except Exception as e:
if e.get_error_code() == 'BucketAlreadyOwnedByYou':
print('BucketAlreadyOwnedByYou')
+ elif e.get_error_code() == 'BucketAlreadyExists':
+ print('BucketAlreadyExists')
else:
raise e
return None
+def _clear_and_delete_bucket(_client, _bucket):
+ marker = ""
+ while True:
+ response = _client.list_objects(
+ Bucket=_bucket, Marker=marker)
+ objects = list()
+ if 'Contents' in response:
+ for content in response['Contents']:
+ key = content['Key']
+ objects.append({'Key': key})
+
+ if response['IsTruncated'] == 'false':
+ break
+
+ del_response = _client.delete_objects(
+ Bucket=_bucket,
+ Delete={
+ 'Object': objects
+ }
+ )
+
+ marker = response["NextMarker"]
+
+ response = _client.delete_bucket(
+ Bucket=_bucket
+ )
+ print("bucket deleted: {}".format(_bucket))
def _upload_test_file(test_bucket, test_key):
response = client.put_object(
@@ -98,6 +133,14 @@ def _upload_test_file(test_bucket, test_key):
)
return None
+def _upload_test_file_from_local_file(test_bucket, test_key, file_name):
+ with open(file_name, 'rb') as f:
+ response = client.put_object(
+ Bucket=test_bucket,
+ Key=test_key,
+ Body=f
+ )
+ return None
def get_raw_md5(data):
m2 = hashlib.md5(data)
@@ -111,6 +154,10 @@ def gen_file(path, size):
_file.write('cos')
_file.close()
+def gen_file_small(path, size):
+ _file = open(path, 'w')
+ _file.write('x'*size)
+ _file.close()
def print_error_msg(e):
print(e.get_origin_msg())
@@ -139,19 +186,105 @@ def setUp():
print("start create bucket " + test_bucket)
_create_test_bucket(test_bucket)
_create_test_bucket(copy_test_bucket)
+ _upload_test_file(test_bucket, test_object)
_upload_test_file(copy_test_bucket, test_object)
def tearDown():
print("function teardown")
+def test_cos_comm_format_region():
+ from qcloud_cos.cos_comm import format_region
+ try:
+ r = format_region(10, u'cos.', False, False)
+ except Exception as e:
+ print(e)
+
+ try:
+ r = format_region('', u'cos.', False, False)
+ except Exception as e:
+ print(e)
+
+ try:
+ r = format_region('ap_beijing', u'cos.', False, False)
+ except Exception as e:
+ print(e)
+
+ r = format_region('cos.ap-beijing', u'cos.', False, False)
+ assert r == 'cos.ap-beijing'
+
+ r = format_region('cn-north', u'cos.', False, False)
+ assert r == 'cn-north'
+
+ r = format_region('cn-south', u'cos.', False, False)
+ assert r == 'cn-south'
+
+ r = format_region('cn-south-2', u'cos.', False, False)
+ assert r == 'cn-south-2'
+
+ r = format_region('cn-southwest', u'cos.', False, False)
+ assert r == 'cn-southwest'
+
+ r = format_region('cn-east', u'cos.', False, False)
+ assert r == 'cn-east'
+
+ r = format_region('sg', u'cos.', False, False)
+ assert r == 'sg'
+
+ r = format_region('ap-beijing', u'cos.', EnableOldDomain=False, EnableInternalDomain=True)
+ assert r == 'cos-internal.ap-beijing'
+
+ regionMap = [
+ ['cossh', 'ap-shanghai'],
+ ['cosgz', 'ap-guangzhou'],
+ ['cosbj', 'ap-beijing'],
+ ['costj', 'ap-beijing-1'],
+ ['coscd', 'ap-chengdu'],
+ ['cossgp', 'ap-singapore'],
+ ['coshk', 'ap-hongkong'],
+ ['cosca', 'na-toronto'],
+ ['cosger', 'eu-frankfurt']
+ ]
+ for data in regionMap:
+ r = format_region(data[0], u'cos.', False, False)
+ assert r == 'cos.' + data[1]
+
+def test_cos_comm_format_bucket():
+ from qcloud_cos.cos_comm import format_bucket
+
+ try:
+ r = format_bucket(10, '1250000000')
+ except Exception as e:
+ print(e)
+
+ try:
+ r = format_bucket('', '1250000000')
+ except Exception as e:
+ print(e)
+
+ try:
+ r = format_bucket('test_bucket', '1250000000')
+ except Exception as e:
+ print(e)
+
+ try:
+ r = format_bucket('test-bucket', 10)
+ except Exception as e:
+ print(e)
+
+ r = format_bucket('test-bucket-1250000000', '1250000000')
+ assert r == 'test-bucket-1250000000'
+
+ r = format_bucket('test-bucket', '1250000000')
+ assert r == 'test-bucket-1250000000'
+
def test_put_get_delete_object_10MB():
"""简单上传下载删除10MB小文件"""
file_size = 10
file_id = str(random.randint(0, 1000)) + str(random.randint(0, 1000))
file_name = "tmp" + file_id + "_" + str(file_size) + "MB"
- gen_file(file_name, 1)
+ gen_file(file_name, file_size)
with open(file_name, 'rb') as f:
etag = get_raw_md5(f.read())
try:
@@ -429,13 +562,14 @@ def test_create_head_delete_bucket():
Bucket=bucket_name
)
-def test_create_head_delete_maz_bucket():
- """创建一个多AZ bucket,head它是否存在,最后删除一个空bucket"""
+def test_create_head_delete_maz_ofs_bucket():
+ """创建一个多AZ OFS bucket,head它是否存在,最后删除一个空bucket"""
bucket_id = str(random.randint(0, 1000)) + str(random.randint(0, 1000))
- bucket_name = 'buckettest-maz' + bucket_id + '-' + APPID
+ bucket_name = 'buckettest-maz-ofs' + bucket_id + '-' + APPID
response = client.create_bucket(
Bucket=bucket_name,
BucketAZConfig='MAZ',
+ BucketArchConfig='OFS',
ACL='public-read'
)
response = client.head_bucket(
@@ -445,12 +579,21 @@ def test_create_head_delete_maz_bucket():
Bucket=bucket_name
)
+
+def test_put_bucket_acl():
+ """正常设置bucket ACL"""
+ response = client.put_bucket_acl(
+ Bucket=test_bucket,
+ ACL='private',
+ )
+ print(response)
+
def test_put_bucket_acl_illegal():
"""设置非法的ACL"""
try:
response = client.put_bucket_acl(
Bucket=test_bucket,
- ACL='public-read-writ'
+ ACL='public-read-writ',
)
except CosServiceError as e:
print_error_msg(e)
@@ -474,6 +617,26 @@ def test_list_objects():
)
assert response
+ # EncodingType为'url'和其他非法值
+ response = client.list_objects(
+ Bucket=test_bucket,
+ MaxKeys=100,
+ Prefix='中文',
+ Delimiter='/',
+ EncodingType='url'
+ )
+ assert response
+
+ try:
+ response = client.list_objects(
+ Bucket=test_bucket,
+ MaxKeys=100,
+ Prefix='中文',
+ Delimiter='/',
+ EncodingType="xml"
+ )
+ except Exception as e:
+ print(e)
def test_list_objects_versions():
"""列出bucket下的带版本信息的objects"""
@@ -483,6 +646,27 @@ def test_list_objects_versions():
)
assert response
+ # EncodingType为'url'和其他非法值
+ response = client.list_objects_versions(
+ Bucket=test_bucket,
+ MaxKeys=100,
+ Prefix='中文',
+ Delimiter='/',
+ EncodingType='url'
+ )
+ assert response
+
+ try:
+ response = client.list_objects_versions(
+ Bucket=test_bucket,
+ MaxKeys=100,
+ Prefix='中文',
+ Delimiter='/',
+ EncodingType="xml"
+ )
+ except Exception as e:
+ print(e)
+
def test_get_presigned_url():
"""生成预签名的url下载地址"""
@@ -731,6 +915,27 @@ def test_list_multipart_uploads():
)
assert response['Upload'][0]['Key'] == "multipartfile.txt"
assert response['Upload'][0]['UploadId'] == uploadid
+
+ # using EncodingType
+ response = client.list_multipart_uploads(
+ Bucket=test_bucket,
+ Prefix="multipart",
+ MaxUploads=100,
+ EncodingType='url'
+ )
+ assert response['Upload'][0]['Key'] == "multipartfile.txt"
+ assert response['Upload'][0]['UploadId'] == uploadid
+
+ try:
+ response = client.list_multipart_uploads(
+ Bucket=test_bucket,
+ Prefix="multipart",
+ MaxUploads=100,
+ EncodingType='xml' # 非法, 只能为url
+ )
+ except Exception as e:
+ print(e)
+
# abort again make sure delete all uploads
for data in response['Upload']:
response = client.abort_multipart_upload(
@@ -751,6 +956,64 @@ def test_upload_file_from_buffer():
PartSize=1
)
+def test_upload_small_file():
+ """使用高级上传接口上传小文件"""
+ file_name = "file_1M"
+ gen_file_small(file_name, 1*1024*1024)
+ response = client.upload_file(
+ Bucket=test_bucket,
+ Key=file_name,
+ LocalFilePath=file_name,
+ MAXThread=5,
+ EnableMD5=True
+ )
+ assert response
+ if os.path.exists(file_name):
+ os.remove(file_name)
+
+ response = client.head_object(
+ Bucket=test_bucket,
+ Key=file_name
+ )
+ assert response['Content-Length'] == '1048576'
+
+ file_name = "file_10B"
+ gen_file_small(file_name, 10)
+ response = client.upload_file(
+ Bucket=test_bucket,
+ Key=file_name,
+ LocalFilePath=file_name,
+ MAXThread=5,
+ EnableMD5=True
+ )
+ assert response
+ if os.path.exists(file_name):
+ os.remove(file_name)
+
+ response = client.head_object(
+ Bucket=test_bucket,
+ Key=file_name
+ )
+ assert response['Content-Length'] == '10'
+
+ file_name = "file_0B"
+ gen_file_small(file_name, 0)
+ response = client.upload_file(
+ Bucket=test_bucket,
+ Key=file_name,
+ LocalFilePath=file_name,
+ MAXThread=5,
+ EnableMD5=True
+ )
+ assert response
+ if os.path.exists(file_name):
+ os.remove(file_name)
+
+ response = client.head_object(
+ Bucket=test_bucket,
+ Key=file_name
+ )
+ assert response['Content-Length'] == '0'
def test_upload_file_multithreading():
"""根据文件大小自动选择分块大小,多线程并发上传提高上传速度"""
@@ -879,6 +1142,8 @@ def test_put_get_bucket_logging():
assert response['LoggingEnabled']['TargetBucket'] == logging_bucket
assert response['LoggingEnabled']['TargetPrefix'] == 'test'
+ _clear_and_delete_bucket(logging_client, logging_bucket)
+
def test_put_object_enable_md5():
"""上传文件,SDK计算content-md5头部"""
@@ -1075,22 +1340,8 @@ def test_put_get_delete_bucket_domain():
def test_put_get_delete_bucket_domain_certificate():
"""测试设置获取删除bucket自定义域名证书"""
- """
- 存储桶 bj-1259654469 专门用于测试自定义域名证书
- """
-
- temp_bucket = 'bj-1259654469'
- temp_conf = CosConfig(
- Region='ap-beijing',
- SecretId=SECRET_ID,
- SecretKey=SECRET_KEY
- )
- temp_client = CosS3Client(
- conf=temp_conf,
- retry=3
- )
-
domain = 'testcertificate.coshelper.com'
+ # 这个域名可能被别的SDK测试占用, 后面 put_bucket_domain 要捕获异常
domain_config = {
'DomainRule': [
{
@@ -1102,10 +1353,17 @@ def test_put_get_delete_bucket_domain_certificate():
}
# put domain
- response = temp_client.put_bucket_domain(
- Bucket=temp_bucket,
- DomainConfiguration=domain_config
- )
+ try:
+ response = client.put_bucket_domain(
+ Bucket=test_bucket,
+ DomainConfiguration=domain_config
+ )
+ except CosServiceError as e:
+ if e.get_error_code() == "RecordAlreadyExist":
+ print(e.get_error_code())
+ return
+ else:
+ raise e
with open('./testcertificate.coshelper.com.key', 'rb') as f:
key = f.read().decode('utf-8')
@@ -1128,34 +1386,35 @@ def test_put_get_delete_bucket_domain_certificate():
}
# put domain certificate
- response = temp_client.delete_bucket_domain_certificate(
- Bucket=temp_bucket,
+ response = client.delete_bucket_domain_certificate(
+ Bucket=test_bucket,
DomainName=domain
)
time.sleep(2)
- response = temp_client.put_bucket_domain_certificate(
- Bucket=temp_bucket,
+ response = client.put_bucket_domain_certificate(
+ Bucket=test_bucket,
DomainCertificateConfiguration=domain_cert_config
)
# wait for sync
# get domain certificate
time.sleep(4)
- response = temp_client.get_bucket_domain_certificate(
- Bucket=temp_bucket,
+ response = client.get_bucket_domain_certificate(
+ Bucket=test_bucket,
DomainName=domain
)
assert response["Status"] == "Enabled"
# delete domain certificate
- response = temp_client.delete_bucket_domain_certificate(
- Bucket=temp_bucket,
+ response = client.delete_bucket_domain_certificate(
+ Bucket=test_bucket,
DomainName=domain
)
# delete domain
- response = temp_client.delete_bucket_domain(
- Bucket=temp_bucket,
+ time.sleep(2)
+ response = client.delete_bucket_domain(
+ Bucket=test_bucket,
)
def test_put_get_delete_bucket_inventory():
@@ -1163,7 +1422,7 @@ def test_put_get_delete_bucket_inventory():
inventory_config = {
'Destination': {
'COSBucketDestination': {
- 'AccountId': '2779643970',
+ 'AccountId': '2832742109',
'Bucket': 'qcs::cos:' + REGION + '::' + test_bucket,
'Format': 'CSV',
'Prefix': 'list1',
@@ -1271,9 +1530,25 @@ def test_put_get_delete_object_tagging():
)
-def _test_put_get_delete_bucket_origin():
+def test_put_get_delete_bucket_origin():
"""测试设置获取删除bucket回源域名"""
- origin_config = {}
+ origin_config = {
+ 'OriginRule': {
+ 'RulePriority': 1,
+ 'OriginType': 'Mirror',
+ 'OriginCondition': {
+ 'HTTPStatusCode': '404'
+ },
+ 'OriginParameter': {
+ 'Protocol': 'HTTP'
+ },
+ 'OriginInfo': {
+ 'HostInfo': {
+ 'HostName': 'examplebucket-1250000000.cos.ap-shanghai.myqcloud.com'
+ }
+ }
+ }
+ }
response = client.put_bucket_origin(
Bucket=test_bucket,
OriginConfiguration=origin_config
@@ -1379,17 +1654,64 @@ def test_select_object():
for event in event_stream:
print(event)
+ # test EventStream.get_select_result
+ response = client.select_object_content(
+ Bucket=test_bucket,
+ Key=select_obj,
+ Expression='Select * from COSObject',
+ ExpressionType='SQL',
+ InputSerialization={
+ 'CompressionType': 'NONE',
+ 'JSON': {
+ 'Type': 'LINES'
+ }
+ },
+ OutputSerialization={
+ 'CSV': {
+ 'RecordDelimiter': '\n'
+ }
+ }
+ )
+ event_stream = response['Payload']
+ data = event_stream.get_select_result()
+ print(data)
+
+ # test EventStream.get_select_result_to_file
+ response = client.select_object_content(
+ Bucket=test_bucket,
+ Key=select_obj,
+ Expression='Select * from COSObject',
+ ExpressionType='SQL',
+ InputSerialization={
+ 'CompressionType': 'NONE',
+ 'JSON': {
+ 'Type': 'LINES'
+ }
+ },
+ OutputSerialization={
+ 'CSV': {
+ 'RecordDelimiter': '\n'
+ }
+ }
+ )
+ event_stream = response['Payload']
+ file_name = 'select.tmp'
+ event_stream.get_select_result_to_file(file_name)
+ if os.path.exists(file_name):
+ os.remove(file_name)
+
-def _test_get_object_sensitive_content_recognition():
+def test_get_object_sensitive_content_recognition():
"""测试ci文件内容识别的接口"""
- print(CiDetectType)
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.get_object_sensitive_content_recognition(
- Bucket=test_bucket,
- Key=test_object,
+ Bucket=ci_bucket_name,
+ Key=ci_test_ocr_image,
Interval=3,
MaxFrames=20,
# BizType='xxxx',
- DetectType=(CiDetectType.PORN | CiDetectType.TERRORIST | CiDetectType.POLITICS | CiDetectType.ADS)
+ DetectType=(CiDetectType.PORN | CiDetectType.TERRORIST | CiDetectType.POLITICS | CiDetectType.ADS),
+ **kwargs
)
print(response)
assert response
@@ -1402,6 +1724,9 @@ def test_download_file():
if os.path.exists('test_download_file.local'):
os.remove('test_download_file.local')
+ # 重置内置线程池大小
+ client.set_built_in_connection_pool_max_size(10, 5)
+
# 测试限速下载
client.download_file(copy_test_bucket, test_object, 'test_download_traffic_limit.local', TrafficLimit='819200')
if os.path.exists('test_download_traffic_limit.local'):
@@ -1444,20 +1769,27 @@ def test_download_file():
os.remove(file_name)
-def _test_put_get_bucket_intelligenttiering():
+def test_put_get_bucket_intelligenttiering():
"""测试设置获取智能分层"""
- intelligent_tiering_conf = {
- 'Status': 'Enabled',
- 'Transition': {
- 'Days': '30',
- 'RequestFrequent': '1'
+ try:
+ intelligent_tiering_conf = {
+ 'Status': 'Enabled',
+ 'Transition': {
+ 'Days': '30',
+ 'RequestFrequent': '1'
+ }
}
- }
- response = client.put_bucket_intelligenttiering(
- Bucket=test_bucket,
- IntelligentTieringConfiguration=intelligent_tiering_conf
- )
- time.sleep(2)
+ response = client.put_bucket_intelligenttiering(
+ Bucket=test_bucket,
+ IntelligentTieringConfiguration=intelligent_tiering_conf
+ )
+ time.sleep(2)
+ except CosServiceError as e:
+ if e.get_error_msg() == 'Not support modify intelligent tiering configuration':
+ print(e.get_error_msg())
+ else:
+ raise e
+
response = client.get_bucket_intelligenttiering(
Bucket=test_bucket,
)
@@ -1535,6 +1867,29 @@ def test_aes_client():
client_for_rsa.delete_object(test_bucket, 'test_multi_upload')
+def test_aes_client2():
+ """测试aes加密客户端的上传下载操作"""
+ aes_dir = os.path.expanduser('~/.cos_local_aes')
+ key_path = os.path.join(aes_dir, '.aes_key.pem')
+ aes_provider = AESProvider(aes_key_path=key_path)
+ client_for_aes = CosEncryptionClient(conf, aes_provider)
+
+ content = '123456' * 1024 + '1'
+ client_for_aes.delete_object(test_bucket, 'test_for_aes')
+ client_for_aes.put_object(test_bucket, content, 'test_for_aes')
+
+ # 测试整个文件的md5
+ response = client_for_aes.get_object(test_bucket, 'test_for_aes')
+ response['Body'].get_stream_to_file('test_for_aes_local')
+ local_file_md5 = None
+ content_md5 = None
+ with open('test_for_aes_local', 'rb') as f:
+ local_file_md5 = get_raw_md5(f.read())
+ content_md5 = get_raw_md5(content.encode("utf-8"))
+ assert local_file_md5 and content_md5 and local_file_md5 == content_md5
+ if os.path.exists('test_for_aes_local'):
+ os.remove('test_for_aes_local')
+
def test_rsa_client():
"""测试rsa加密客户端的上传下载操作"""
@@ -1585,6 +1940,33 @@ def test_rsa_client():
client_for_rsa.delete_object(test_bucket, 'test_multi_upload')
+def test_rsa_client2():
+ """测试rsa加密客户端的上传下载操作"""
+ rsa_dir = os.path.expanduser('~/.cos_local_rsa')
+ public_key_path = os.path.join(rsa_dir, '.public_key.pem')
+ private_key_path = os.path.join(rsa_dir, '.private_key.pem')
+ rsa_provider = RSAProvider(key_pair_info=RSAProvider.get_rsa_key_pair_path(public_key_path, private_key_path))
+ client_for_rsa = CosEncryptionClient(conf, rsa_provider)
+
+ with open('test_rsa_file', 'w') as f:
+ f.write('123456' * 1024 + '1')
+ with open('test_rsa_file', 'rb') as f:
+ client_for_rsa.delete_object(test_bucket, 'test_for_rsa')
+ client_for_rsa.put_object(test_bucket, f, 'test_for_rsa')
+ # 测试整个文件的md5
+ response = client_for_rsa.get_object(test_bucket, 'test_for_rsa')
+ response['Body'].get_stream_to_file('test_for_rsa_local')
+ local_file_md5 = None
+ content_md5 = None
+ with open('test_for_rsa_local', 'rb') as f:
+ local_file_md5 = get_raw_md5(f.read())
+ with open('test_rsa_file', 'rb') as f:
+ content_md5 = get_raw_md5(f.read())
+ assert local_file_md5 and content_md5 and local_file_md5 == content_md5
+ if os.path.exists('test_for_rsa_local'):
+ os.remove('test_for_rsa_local')
+ if os.path.exists('test_rsa_file'):
+ os.remove('test_rsa_file')
def test_live_channel():
if TEST_CI != 'true':
@@ -1709,9 +2091,17 @@ def test_live_channel():
Key=channel_name + '/test.m3u8')
assert (response)
- print("delete live channel...")
- response = client.delete_live_channel(Bucket=test_bucket, ChannelName=channel_name)
- assert (response)
+ from datetime import datetime
+ response = client.get_vod_playlist(
+ Bucket=test_bucket,
+ ChannelName=channel_name,
+ StartTime=int(datetime.now().timestamp()-100000),
+ EndTime=int(datetime.now().timestamp())
+ )
+
+ print("delete live channel...")
+ response = client.delete_live_channel(Bucket=test_bucket, ChannelName=channel_name)
+ assert (response)
def test_get_object_url():
@@ -1723,14 +2113,14 @@ def test_get_object_url():
print(response)
-def _test_qrcode():
+def test_qrcode():
"""二维码图片上传时识别"""
- file_name = 'test_object_sdk_qrcode.file'
+ file_name = 'format.png'
with open(file_name, 'rb') as fp:
# fp验证
- opts = '{"is_pic_info":1,"rules":[{"fileid":"format.jpg","rule":"QRcode/cover/1"}]}'
+ opts = '{"is_pic_info":1,"rules":[{"fileid":"format.png","rule":"QRcode/cover/1"}]}'
response, data = client.ci_put_object_from_local_file_and_get_qrcode(
- Bucket=test_bucket,
+ Bucket=ci_bucket_name,
LocalFilePath=file_name,
Key=file_name,
EnableMD5=False,
@@ -1739,10 +2129,12 @@ def _test_qrcode():
print(response, data)
"""二维码图片下载时识别"""
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response, data = client.ci_get_object_qrcode(
- Bucket=test_bucket,
+ Bucket=ci_bucket_name,
Key=file_name,
- Cover=0
+ Cover=0,
+ **kwargs
)
print(response, data)
@@ -1757,26 +2149,26 @@ def test_ci_put_image_style():
'StyleBody': 'imageMogr2/thumbnail/!50px',
}
response = client.ci_put_image_style(
- Bucket=test_bucket,
+ Bucket=ci_bucket_name,
Request=body,
)
- print(response)
-
-
-def test_ci_get_image_style():
- if TEST_CI != 'true':
- return
-
- """获取图片样式接口"""
+ assert response
body = {
'StyleName': 'style_name',
}
response, data = client.ci_get_image_style(
- Bucket=test_bucket,
+ Bucket=ci_bucket_name,
Request=body,
)
- print(response['x-cos-request-id'])
- print(data)
+ assert response
+ body = {
+ 'StyleName': 'style_name',
+ }
+ response = client.ci_delete_image_style(
+ Bucket=ci_bucket_name,
+ Request=body,
+ )
+ assert response
def test_ci_get_image_info():
@@ -1785,11 +2177,10 @@ def test_ci_get_image_info():
"""ci获取图片基本信息接口"""
response, data = client.ci_get_image_info(
- Bucket=test_bucket,
- Key='format.png',
+ Bucket=ci_bucket_name,
+ Key=ci_test_image,
)
- print(response['x-cos-request-id'])
- print(data)
+ assert response
def test_ci_get_image_exif_info():
@@ -1798,11 +2189,10 @@ def test_ci_get_image_exif_info():
"""获取图片exif信息接口"""
response, data = client.ci_get_image_exif_info(
- Bucket=test_bucket,
- Key='format.png',
+ Bucket=ci_bucket_name,
+ Key=ci_test_image,
)
- print(response['x-cos-request-id'])
- print(data)
+ assert response
def test_ci_get_image_ave_info():
@@ -1810,12 +2200,13 @@ def test_ci_get_image_ave_info():
return
"""获取图片主色调接口"""
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response, data = client.ci_get_image_info(
- Bucket=test_bucket,
- Key='format.png',
+ Bucket=ci_bucket_name,
+ Key=ci_test_image,
+ **kwargs
)
- print(response['x-cos-request-id'])
- print(data)
+ assert response
def test_ci_image_assess_quality():
@@ -1824,10 +2215,10 @@ def test_ci_image_assess_quality():
"""图片质量评估接口"""
response = client.ci_image_assess_quality(
- Bucket=test_bucket,
- Key='format.png',
+ Bucket=ci_bucket_name,
+ Key=ci_test_image,
)
- print(response)
+ assert response
def test_ci_qrcode_generate():
@@ -1835,15 +2226,14 @@ def test_ci_qrcode_generate():
return
"""二维码生成接口"""
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.ci_qrcode_generate(
- Bucket=test_bucket,
+ Bucket=ci_bucket_name,
QrcodeContent='https://www.example.com',
- Width=200
+ Width=200,
+ **kwargs
)
- qrCodeImage = base64.b64decode(response['ResultImage'])
- with open('/result.png', 'wb') as f:
- f.write(qrCodeImage)
- print(response)
+ assert response['ResultImage']
def test_ci_ocr_process():
@@ -1851,11 +2241,13 @@ def test_ci_ocr_process():
return
"""通用文字识别"""
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.ci_ocr_process(
- Bucket=test_bucket,
- Key='ocr.jpeg',
+ Bucket=ci_bucket_name,
+ Key=ci_test_ocr_image,
+ **kwargs
)
- print(response)
+ assert response
def test_ci_get_media_queue():
@@ -1863,11 +2255,12 @@ def test_ci_get_media_queue():
return
# 查询媒体队列信息
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.ci_get_media_queue(
Bucket=ci_bucket_name,
State="Active",
+ **kwargs
)
- print(response)
assert (response['QueueList'])
@@ -1880,7 +2273,6 @@ def test_ci_get_media_pic_queue():
Bucket=ci_bucket_name,
State="Active",
)
- print(response)
assert (response['QueueList'])
@@ -1958,7 +2350,6 @@ def test_ci_create_media_transcode_watermark_jobs():
Lst=lst,
ContentType='application/xml'
)
- print(response)
assert (response['JobsDetail'])
@@ -1987,13 +2378,13 @@ def test_ci_create_media_transcode_jobs():
'TemplateId': 't02db40900dc1c43ad9bdbd8acec6075c5'
}
}
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache", "ContentType": 'application/xml'}
response = client.ci_create_media_jobs(
Bucket=ci_bucket_name,
Jobs=body,
Lst={},
- ContentType='application/xml'
+ **kwargs
)
- print(response)
assert (response['JobsDetail'])
@@ -2025,11 +2416,12 @@ def test_ci_create_media_pic_jobs():
},
}
}
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache", "ContentType": 'application/xml'}
response = client.ci_create_media_pic_jobs(
Bucket=ci_bucket_name,
Jobs=body,
Lst={},
- ContentType='application/xml'
+ **kwargs
)
print(response)
assert (response['JobsDetail'])
@@ -2053,8 +2445,7 @@ def test_ci_list_media_pic_jobs():
ContentType='application/xml',
States='Success'
)
- print(response)
- assert (response['JobsDetail'])
+ assert response
def test_ci_list_media_transcode_jobs():
@@ -2067,41 +2458,47 @@ def test_ci_list_media_transcode_jobs():
State="Active",
)
QueueId = response['QueueList'][0]['QueueId']
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ now_time = time.time()
response = client.ci_list_media_jobs(
Bucket=ci_bucket_name,
QueueId=QueueId,
Tag='Transcode',
+ StartCreationTime=time.strftime("%Y-%m-%dT%H:%m:%S%z", time.localtime(now_time - 5)),
+ EndCreationTime=time.strftime("%Y-%m-%dT%H:%m:%S%z", time.localtime(now_time)),
Size=2,
- ContentType='application/xml'
+ **kwargs
)
- print(response)
- assert (response['JobsDetail'])
+ assert (response)
def test_get_media_info():
if TEST_CI != 'true':
return
# 获取媒体信息
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.get_media_info(
Bucket=ci_bucket_name,
- Key='workflow/input/video/test1.mp4'
+ Key=ci_test_media,
+ **kwargs
)
- print(response)
- assert (response)
+ assert response
def test_get_snapshot():
if TEST_CI != 'true':
return
# 产生同步截图
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.get_snapshot(
Bucket=ci_bucket_name,
- Key='workflow/input/video/test1.mp4',
+ Key=ci_test_media,
Time='1.5',
Width='480',
- Format='png'
+ Height='480',
+ Format='png',
+ **kwargs
)
- print(response)
assert (response)
@@ -2109,12 +2506,13 @@ def test_get_pm3u8():
if TEST_CI != 'true':
return
# 获取私有 M3U8 ts 资源的下载授权
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.get_pm3u8(
Bucket=ci_bucket_name,
- Key='/data/media/m3u8_no_end.m3u8',
- Expires='3600'
+ Key=ci_test_m3u8,
+ Expires='3600',
+ **kwargs
)
- print(response)
assert (response)
@@ -2122,26 +2520,28 @@ def test_ci_get_media_bucket():
if TEST_CI != 'true':
return
# 获取私有 M3U8 ts 资源的下载授权
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.ci_get_media_bucket(
Regions=ci_region,
BucketNames=ci_bucket_name,
BucketName=ci_bucket_name,
PageNumber='1',
- PageSize='2'
+ PageSize='2',
+ **kwargs
)
- print(response)
assert (response)
def test_ci_create_doc_transcode_jobs():
if TEST_CI != 'true':
return
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.ci_get_doc_queue(
- Bucket=ci_bucket_name
- )
- print(response)
+ Bucket=ci_bucket_name,
+ **kwargs)
assert (response['QueueList'][0]['QueueId'])
queueId = response['QueueList'][0]['QueueId']
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.ci_create_doc_job(
Bucket=ci_bucket_name,
QueueId=queueId,
@@ -2149,20 +2549,32 @@ def test_ci_create_doc_transcode_jobs():
OutputBucket=ci_bucket_name,
OutputRegion='ap-guangzhou',
OutputObject='/test_doc/normal/abc_${Number}.jpg',
- # DocPassword='123',
+ SrcType='pptx',
+ TgtType='jpg',
+ StartPage=1,
+ EndPage=-1,
+ SheetId=0,
+ PaperDirection=0,
+ PaperSize=0,
+ DocPassword='123',
+ Comments=0,
Quality=109,
+ Zoom=100,
+ ImageDpi=96,
+ PicPagination=1,
PageRanges='1,3',
- )
- print(response)
+ **kwargs
+ )
assert (response['JobsDetail']['JobId'])
# 测试转码查询任务
JobID = response['JobsDetail']['JobId']
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.ci_get_doc_job(
Bucket=ci_bucket_name,
JobID=JobID,
+ **kwargs
)
- print(response)
assert (response['JobsDetail'])
@@ -2173,16 +2585,19 @@ def test_ci_list_doc_transcode_jobs():
response = client.ci_get_doc_queue(
Bucket=ci_bucket_name
)
- print(response)
assert (response['QueueList'][0]['QueueId'])
queueId = response['QueueList'][0]['QueueId']
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ now_time = time.time()
response = client.ci_list_doc_jobs(
Bucket=ci_bucket_name,
QueueId=queueId,
+ StartCreationTime=time.strftime("%Y-%m-%dT%H:%m:%S%z", time.localtime(now_time - 5)),
+ EndCreationTime=time.strftime("%Y-%m-%dT%H:%m:%S%z", time.localtime(now_time)),
Size=10,
- )
- print(response)
- assert (response['JobsDetail'])
+ **kwargs
+ )
+ assert response
def test_ci_live_video_auditing():
@@ -2203,16 +2618,18 @@ def test_ci_live_video_auditing():
'IP': 'IP-test',
'Type': 'Type-test',
},
- BizType="44f32597a627d013962c54d459a9ab6e",
+ BizType='d0292362d07428b4f6982a31bf97c246',
+ CallbackType=1
)
assert (response['JobsDetail']['JobId'])
jobId = response['JobsDetail']['JobId']
time.sleep(5)
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
response = client.ci_auditing_live_video_cancle(
Bucket=ci_bucket_name,
JobID=jobId,
+ **kwargs
)
- print(response)
assert (response['JobsDetail'])
@@ -2299,10 +2716,822 @@ def test_sse_c_file():
CopySourceSSECustomerAlgorithm='AES256', CopySourceSSECustomerKey=ssec_key, CopySourceSSECustomerKeyMD5=ssec_key_md5)
assert(response['x-cos-server-side-encryption-customer-algorithm'] == 'AES256')
+def test_short_connection_put_get_object():
+ """使用短连接上传下载对象"""
+
+ my_conf = CosConfig(
+ Region=REGION,
+ SecretId=SECRET_ID,
+ SecretKey=SECRET_KEY,
+ KeepAlive=False)
+ my_client = CosS3Client(my_conf)
+
+ response = my_client.put_object(
+ Bucket=test_bucket,
+ Key=test_object,
+ Body='test'
+ )
+ assert response['Connection'] == 'close'
+
+ response = my_client.get_object(
+ Bucket=test_bucket,
+ Key=test_object,
+ )
+ assert response['Connection'] == 'close'
+
+def test_config_invalid_scheme():
+ """初始化Scheme为非法值"""
+ try:
+ my_conf = CosConfig(
+ Region=REGION,
+ SecretId=SECRET_ID,
+ SecretKey=SECRET_KEY,
+ Scheme="ftp")
+ except Exception as e:
+ print(e)
+
+def test_config_invalid_aksk():
+ """aksk首尾包含空格"""
+ try:
+ my_conf = CosConfig(
+ Region=REGION,
+ SecretId=SECRET_ID + ' ',
+ SecretKey=' ' + SECRET_KEY)
+ except Exception as e:
+ print(e)
+
+def test_config_credential_inst():
+ """使用CredentialInstance初始化"""
+ try:
+ my_conf = CosConfig(
+ Region=REGION,
+ CredentialInstance=CredentialDemo(),
+ )
+ except Exception as e:
+ raise e
+
+def test_config_anoymous():
+ """匿名访问配置"""
+ try:
+ my_conf = CosConfig(
+ Region=REGION,
+ Anonymous=True
+ )
+ except Exception as e:
+ raise e
+
+def test_config_none_aksk():
+ """缺少aksk"""
+ try:
+ my_conf = CosConfig(
+ Region=REGION,
+ )
+ except Exception as e:
+ print(e)
+
+def test_head_bucket_object_not_exist():
+ """HEAD不存在的桶和对象"""
+ try:
+ response = client.head_bucket(
+ Bucket="nosuchbucket-" + APPID
+ )
+ except CosServiceError as e:
+ if e.get_error_code() == "NoSuchResource":
+ print(e.get_error_code())
+ else:
+ raise e
+
+ try:
+ response = client.head_object(
+ Bucket=test_bucket,
+ Key="nosuchkey"
+ )
+ except CosServiceError as e:
+ if e.get_error_code() == "NoSuchResource":
+ print(e.get_error_code())
+ else:
+ raise e
+
+def test_append_object():
+ """APPEND上传对象"""
+ test_append_object = "test_append_object"
+ response = client.delete_object(
+ Bucket=test_bucket,
+ Key=test_append_object
+ )
+ response = client.append_object(
+ Bucket=test_bucket,
+ Key=test_append_object,
+ Position=0,
+ Data='test'
+ )
+ assert response
+
+
+def test_ci_delete_asr_template():
+ # 删除指定语音识别模板
+ response = client.ci_delete_asr_template(
+ Bucket=ci_bucket_name,
+ TemplateId='t1bdxxxxxxxxxxxxxxxxx94a9',
+ test='1'
+ )
+ assert response
+
+
+def test_ci_get_asr_template():
+ # 获取语音识别模板
+ kwargs = {"ContentType": "application/xml", "ResponseCacheControl": "no-cache"}
+ response = client.ci_get_asr_template(
+ Bucket=ci_bucket_name,
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_create_asr_template():
+ # 创建语音识别模板
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_create_asr_template(
+ Bucket=ci_bucket_name,
+ Name='test_asr_template',
+ EngineModelType='16k_zh',
+ ChannelNum=1,
+ ResTextFormat=2,
+ FlashAsr=True,
+ Format='mp3',
+ **kwargs
+ )
+ print(response)
+ assert response
+ templateId = response["Template"]["TemplateId"]
+ print(templateId)
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_update_asr_template(
+ Bucket=ci_bucket_name,
+ TemplateId=templateId,
+ Name='update_asr_template',
+ EngineModelType='16k_zh',
+ ChannelNum=1,
+ ResTextFormat=1,
+ Format='mp3',
+ **kwargs
+ )
+ assert response
+ kwargs = {"ContentType": "application/xml", "ResponseCacheControl": "no-cache"}
+ # 删除指定语音识别模板
+ response = client.ci_delete_asr_template(
+ Bucket=ci_bucket_name,
+ TemplateId=templateId,
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_list_asr_jobs():
+ kwargs = {"ContentType": "application/xml", "ResponseCacheControl": "no-cache"}
+ response = client.ci_get_asr_queue(
+ Bucket=ci_bucket_name,
+ **kwargs
+ )
+ queueId = response["QueueList"][0]["QueueId"]
+ # 获取语音识别任务信息列表
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ now_time = time.time()
+ response = client.ci_list_asr_jobs(
+ Bucket=ci_bucket_name,
+ QueueId=queueId,
+ StartCreationTime=time.strftime("%Y-%m-%dT%H:%m:%S%z", time.localtime(now_time - 5)),
+ EndCreationTime=time.strftime("%Y-%m-%dT%H:%m:%S%z", time.localtime(now_time)),
+ Size=10,
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_get_asr_jobs():
+ # 获取语音识别任务信息
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_get_asr_job(
+ Bucket=ci_bucket_name,
+ JobID='s0980xxxxxxxxxxxxxxxxff12',
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_create_asr_jobs():
+ response = client.ci_get_asr_queue(
+ Bucket=ci_bucket_name,
+ ContentType='application/xml'
+ )
+ queueId = response["QueueList"][0]["QueueId"]
+ # 创建语音识别异步任务
+ body = {
+ 'EngineModelType': '16k_zh',
+ 'ChannelNum': '1',
+ 'ResTextFormat': '1',
+ # 'FlashAsr': 'true',
+ # 'Format': 'mp3'
+ }
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_create_asr_job(
+ Bucket=ci_bucket_name,
+ QueueId=queueId,
+ # TemplateId='t1ada6f282d29742db83244e085e920b08',
+ InputObject='normal.mp4',
+ Url='',
+ TemplateId='',
+ OutputBucket=ci_bucket_name,
+ OutputRegion='ap-guangzhou',
+ OutputObject='result.txt',
+ SpeechRecognition=body,
+ CallBack="http://www.demo.com",
+ CallBackFormat='XML',
+ CallBackType='Url',
+ **kwargs
+ )
+ print(response)
+ return response
+
+
+def test_ci_put_asr_queue():
+ response = client.ci_get_asr_queue(
+ Bucket=ci_bucket_name,
+ )
+ queueId = response["QueueList"][0]["QueueId"]
+ # 更新语音识别队列信息
+ body = {
+ 'Name': 'asr-queue',
+ 'QueueID': queueId,
+ 'State': 'Active',
+ 'NotifyConfig': {
+ 'Type': 'Url',
+ 'Url': 'http://www.demo.com',
+ 'Event': 'TaskFinish',
+ 'State': 'On',
+ 'ResultFormat': 'JSON'
+ }
+ }
+ response = client.ci_update_asr_queue(
+ Bucket=ci_bucket_name,
+ QueueId=queueId,
+ Request=body,
+ ContentType='application/xml'
+ )
+ assert response
+
+
+def test_ci_get_asr_queue():
+ # 查询语音识别队列信息
+ response = client.ci_get_asr_queue(
+ Bucket=ci_bucket_name,
+ )
+ assert response
+
+
+def test_ci_get_asr_bucket():
+ # 查询语音识别开通状态
+ kwargs = {"ContentType": "application/xml", "ResponseCacheControl": "no-cache"}
+ response = client.ci_get_asr_bucket(
+ Regions=REGION,
+ BucketName=ci_bucket_name,
+ PageSize="10",
+ PageNumber="1",
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_get_doc_bucket():
+ # 查询文档预览开通状态
+ kwargs = {"ContentType": "application/xml", "ResponseCacheControl": "no-cache"}
+ response = client.ci_get_doc_bucket(
+ Regions=REGION,
+ # BucketName='demo',
+ BucketNames=ci_bucket_name,
+ PageSize=1,
+ PageNumber=1,
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_doc_preview_to_html_process():
+ # 文档预览同步接口(生成html)
+ kwargs = {"ContentType": "application/xml", "ResponseCacheControl": "no-cache"}
+ response = client.ci_doc_preview_html_process(
+ Bucket=ci_bucket_name,
+ Key=ci_test_txt,
+ SrcType='txt',
+ Copyable='0',
+ DstType='html',
+ HtmlParams='',
+ HtmlWaterword='',
+ HtmlFillStyle='',
+ HtmlFront='',
+ HtmlRotate='315',
+ HtmlHorizontal='50',
+ HtmlVertical='100',
+ **kwargs
+ )
+ assert response
+ response['Body'].get_stream_to_file('result.html')
+
+
+def test_ci_doc_preview_process():
+ # 文档预览同步接口
+ kwargs = {"ContentType": "application/xml", "ResponseCacheControl": "no-cache"}
+ response = client.ci_doc_preview_process(
+ Bucket=ci_bucket_name,
+ Key=ci_test_txt,
+ SrcType='txt',
+ Page=1,
+ DstType='jpg',
+ **kwargs
+ )
+ assert response
+ response['Body'].get_stream_to_file('result.png')
+
+
+def test_ci_put_doc_queue():
+ response = client.ci_get_doc_queue(
+ Bucket=ci_bucket_name,
+ )
+ queueId = response["QueueList"][0]["QueueId"]
+ # 更新文档预览队列信息
+ body = {
+ 'Name': 'doc-queue',
+ 'QueueID': queueId,
+ 'State': 'Active',
+ 'NotifyConfig': {
+ 'Type': 'Url',
+ 'Url': 'http://www.demo.com',
+ 'Event': 'TaskFinish',
+ 'State': 'On',
+ 'ResultFormat': 'JSON'
+ }
+ }
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache", "ContentType": 'application/xml'}
+ response = client.ci_update_doc_queue(
+ Bucket=ci_bucket_name,
+ QueueId=queueId,
+ Request=body,
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_list_workflowexecution():
+ # 查询工作流实例接口
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ now_time = time.time()
+ response = client.ci_list_workflowexecution(
+ Bucket=ci_bucket_name,
+ WorkflowId='w5307ee7a60d6489383c3921c715dd1c5',
+ StartCreationTime=time.strftime("%Y-%m-%dT%H:%m:%S%z", time.localtime(now_time - 5)),
+ EndCreationTime=time.strftime("%Y-%m-%dT%H:%m:%S%z", time.localtime(now_time)),
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_trigger_workflow():
+ # 触发工作流接口
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_trigger_workflow(
+ Bucket=ci_bucket_name,
+ WorkflowId='w5307ee7a60d6489383c3921c715dd1c5',
+ Key=ci_test_image,
+ **kwargs
+ )
+ assert response
+ print(response)
+ instance_id = response['InstanceId']
+ # 查询工作流实例接口
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_get_workflowexecution(
+ Bucket=ci_bucket_name,
+ RunId=instance_id,
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_get_media_transcode_jobs():
+ # 转码任务详情
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_get_media_jobs(
+ Bucket=ci_bucket_name,
+ JobIDs='jc46435e40bcc11ed83d6e19dd89b02cc',
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_get_media_pic_jobs():
+ # 图片处理任务详情
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_get_media_pic_jobs(
+ Bucket=ci_bucket_name,
+ JobIDs='c01742xxxxxxxxxxxxxxxxxx7438e39',
+ **kwargs
+ )
+ assert response
+
+
+def test_ci_put_media_pic_queue():
+ response = client.ci_get_media_pic_queue(
+ Bucket=ci_bucket_name,
+ )
+ queueId = response["QueueList"][0]["QueueId"]
+ # 更新图片处理队列信息
+ body = {
+ 'Name': 'media-pic-queue',
+ 'QueueID': queueId,
+ 'State': 'Active',
+ 'NotifyConfig': {
+ 'Type': 'Url',
+ 'Url': 'http://www.demo.com',
+ 'Event': 'TaskFinish',
+ 'State': 'On',
+ 'ResultFormat': 'JSON'
+ }
+ }
+ response = client.ci_update_media_pic_queue(
+ Bucket=ci_bucket_name,
+ QueueId=queueId,
+ Request=body,
+ ContentType='application/xml'
+ )
+ assert response
+
+
+def test_ci_compress_image():
+ # HEIF 压缩
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_download_compress_image(
+ Bucket=ci_bucket_name,
+ Key=ci_test_image,
+ DestImagePath='sample.heif',
+ CompressType='heif',
+ **kwargs
+ )
+ assert os.path.exists('sample.heif')
+
+
+def test_ci_image_detect_label():
+ # 图片标签
+ response = client.ci_image_detect_label(
+ Bucket=ci_bucket_name,
+ Key=ci_test_car_image,
+ )
+ assert response
+
+
+def test_ci_image_detect_car():
+ # 车辆车牌检测
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_image_detect_car(
+ Bucket=ci_bucket_name,
+ Key=ci_test_car_image,
+ **kwargs
+ )
+ assert response
+
+def test_pic_process_when_download_object():
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ rule = 'imageMogr2/format/jpg/interlace/1'
+ response = client.ci_get_object(
+ Bucket=ci_bucket_name,
+ Key=ci_test_image,
+ DestImagePath='format.png',
+ # pic operation json struct
+ Rule=rule,
+ **kwargs
+ )
+ print(response['x-cos-request-id'])
+
+
+def test_pic_process_when_put_object():
+ operations = '{"is_pic_info":1,"rules":[{"fileid": "format.png",' \
+ '"rule": "imageMogr2/quality/60" }]}'
+ response, data = client.ci_put_object_from_local_file(
+ Bucket=ci_bucket_name,
+ LocalFilePath='format.png',
+ Key=ci_test_image,
+ # pic operation json struct
+ PicOperations=operations,
+ EnableMD5=True,
+ ContentType='application/xml'
+ )
+ print(response['x-cos-request-id'])
+ print(data)
+
+
+def test_process_on_cloud():
+ operations = '{"is_pic_info":1,"rules":[{"fileid": "format.png",' \
+ '"rule": "imageMogr2/quality/60" }]}'
+ response, data = client.ci_image_process(
+ Bucket=ci_bucket_name,
+ Key=ci_test_image,
+ # pic operation json struct
+ PicOperations=operations,
+ ContentType='application/xml'
+ )
+ print(response['x-cos-request-id'])
+ print(data)
+
+
+def test_ci_auditing_video_submit():
+ response = client.ci_auditing_video_submit(Bucket=ci_bucket_name,
+ Key=ci_test_media,
+ Callback="http://www.demo.com",
+ CallbackVersion='Simple',
+ DetectContent=1,
+ Mode='Interval',
+ Count=1,
+ TimeInterval=1)
+ jobId = response['JobsDetail']['JobId']
+ while True:
+ time.sleep(5)
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_auditing_video_query(Bucket=ci_bucket_name, JobID=jobId, **kwargs)
+ print(response['JobsDetail']['State'])
+ if response['JobsDetail']['State'] == 'Success':
+ print(str(response))
+ break
+ assert response
+
+
+def test_ci_auditing_audio_submit():
+ response = client.ci_auditing_audio_submit(Bucket=ci_bucket_name,
+ Key=ci_test_media,
+ Callback="http://www.demo.com",
+ CallbackVersion='Simple')
+ jobId = response['JobsDetail']['JobId']
+ while True:
+ time.sleep(5)
+ response = client.ci_auditing_audio_query(Bucket=ci_bucket_name, JobID=jobId)
+ print(response['JobsDetail']['State'])
+ if response['JobsDetail']['State'] == 'Success':
+ print(str(response))
+ break
+ assert response
+
+
+def test_ci_auditing_text_submit():
+ response = client.ci_auditing_text_submit(Bucket=ci_bucket_name,
+ Key=ci_test_txt,
+ Callback="http://www.demo.com")
+
+ jobId = response['JobsDetail']['JobId']
+ while True:
+ time.sleep(5)
+ response = client.ci_auditing_text_query(Bucket=ci_bucket_name, JobID=jobId)
+ print(response['JobsDetail']['State'])
+ if response['JobsDetail']['State'] == 'Success':
+ print(str(response))
+ break
+ assert response
+
+
+def test_ci_auditing_document_submit():
+ response = client.ci_auditing_document_submit(Bucket=ci_bucket_name,
+ Url='https://cos-python-v5-test-ci-1253960454.cos.ap-guangzhou.myqcloud.com/test.txt',
+ Key=ci_test_txt,
+ Type='txt',
+ Callback="http://www.demo.com")
+ jobId = response['JobsDetail']['JobId']
+ while True:
+ time.sleep(5)
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_auditing_document_query(Bucket=ci_bucket_name, JobID=jobId, **kwargs)
+ print(response['JobsDetail']['State'])
+ print(str(response))
+ if response['JobsDetail']['State'] == 'Success':
+ print(str(response))
+ break
+ assert response
+
+
+def test_ci_auditing_html_submit():
+ response = client.ci_auditing_html_submit(Bucket=ci_bucket_name,
+ Url="https://cloud.tencent.com/product/ci",
+ ReturnHighlightHtml=False,
+ Callback="http://www.demo.com")
+ jobId = response['JobsDetail']['JobId']
+ while True:
+ time.sleep(5)
+ response = client.ci_auditing_html_query(Bucket=ci_bucket_name, JobID=jobId)
+ print(response['JobsDetail']['State'])
+ if response['JobsDetail']['State'] == 'Success':
+ print(str(response))
+ break
+ assert response
+
+
+def test_ci_auditing_image_batch():
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_auditing_image_batch(Bucket=ci_bucket_name,
+ Input=[{
+ 'Object': 'ocr.jpeg'}],
+ **kwargs)
+ jobId = response['JobsDetail'][0]['JobId']
+ while True:
+ time.sleep(5)
+ response = client.ci_auditing_image_query(Bucket=ci_bucket_name, JobID=jobId)
+ print(response['JobsDetail']['State'])
+ if response['JobsDetail']['State'] == 'Success':
+ print(str(response))
+ break
+ assert response
+
+
+def test_ci_auditing_virus_submit():
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_auditing_virus_submit(Bucket=ci_bucket_name,
+ Key=ci_test_image,
+ Callback="http://www.demo.com",
+ **kwargs)
+ jobId = response['JobsDetail']['JobId']
+ while True:
+ time.sleep(5)
+ kwargs = {"CacheControl": "no-cache", "ResponseCacheControl": "no-cache"}
+ response = client.ci_auditing_virus_query(Bucket=ci_bucket_name, JobID=jobId, **kwargs)
+ print(response['JobsDetail']['State'])
+ if response['JobsDetail']['State'] == 'Success':
+ print(str(response))
+ break
+ assert response
+
+
+def test_ci_auditing_detect_type():
+ detect_type = CiDetectType.get_detect_type_str(127)
+ assert detect_type
+
+def test_put_get_async_fetch_task():
+ from copy import deepcopy
+ tmp_conf = deepcopy(conf)
+ tmp_conf._scheme = 'http'
+ tmp_client = CosS3Client(tmp_conf)
+ url = "{}://{}/{}".format(tmp_conf._scheme, tmp_conf.get_host(test_bucket), test_object)
+ response = tmp_client.put_async_fetch_task(
+ Bucket=test_bucket,
+ FetchTaskConfiguration={
+ 'Url': url,
+ 'Key': test_object,
+ },
+ )
+ time.sleep(3)
+ response2 = tmp_client.get_async_fetch_task(
+ Bucket=test_bucket,
+ TaskId=response['data']['taskid'],
+ )
+ assert response2['message'] == 'SUCCESS'
+
+
+def test_get_rtmp_signed_url():
+ response = client.get_rtmp_signed_url(
+ Bucket=test_bucket,
+ ChannelName='ch1'
+ )
+ assert response
+
+def test_change_object_storage_class():
+ response = client.change_object_storage_class(
+ Bucket=test_bucket,
+ Key=test_object,
+ StorageClass='STANDARD_IA'
+ )
+ response = client.head_object(
+ Bucket=test_bucket,
+ Key=test_object
+ )
+ assert response['x-cos-storage-class'] == 'STANDARD_IA'
+
+ response = client.change_object_storage_class(
+ Bucket=test_bucket,
+ Key=test_object,
+ StorageClass='STANDARD'
+ )
+ response = client.head_object(
+ Bucket=test_bucket,
+ Key=test_object
+ )
+ assert 'x-cos-storage-class' not in response
+
+def test_update_object_meta():
+ response = client.update_object_meta(
+ Bucket=test_bucket,
+ Key=test_object,
+ Metadata={
+ 'x-cos-meta-key1': 'value1',
+ 'x-cos-meta-key2': 'value2'
+ }
+ )
+ response = client.head_object(
+ Bucket=test_bucket,
+ Key=test_object
+ )
+ assert response['x-cos-meta-key1'] == 'value1'
+ assert response['x-cos-meta-key2'] == 'value2'
+
+def test_cos_comm_misc():
+ from qcloud_cos.cos_comm import format_dict_or_list, get_date, get_raw_md5, client_can_retry, format_path
+ data = [
+ {'aaa': '111'},
+ {'bbb': '222'},
+ ]
+ data = format_dict_or_list(data, ['aaa', 'bbb'])
+ print(data)
+
+ r = get_date(2022, 5, 30)
+ assert r == '2022-05-30T00:00:00+08:00'
+
+ r = get_raw_md5(b'12345'*1024)
+ assert r
+
+ with open("tmp_test", 'w') as f:
+ r = client_can_retry(0, data=f)
+ assert r
+ if os.path.exists("tmp_test"):
+ os.remove("tmp_test")
+
+ try:
+ r = format_path('')
+ except Exception as e:
+ print(e)
+
+ try:
+ r = format_path(0)
+ except Exception as e:
+ print(e)
+
+ r = format_path('/test/path/to')
+ assert r == 'test/path/to'
+
+def test_cos_exception_unknow():
+ msg = ''
+ e = CosServiceError('GET', msg, '400')
+ assert e.get_error_code() == 'Unknown'
+ assert e.get_error_msg() == 'Unknown'
+ assert e.get_resource_location() == 'Unknown'
+ assert e.get_trace_id() == 'Unknown'
+ assert e.get_request_id() == 'Unknown'
+
+def test_check_multipart_upload():
+ test_key = 'test_key'
+ test_data = 'x'*1024*1024
+ with open(test_key, 'w') as f:
+ f.write(test_data)
+ response = client.create_multipart_upload(
+ Bucket=test_bucket,
+ Key=test_key,
+ )
+ uploadId = response['UploadId']
+ response = client.upload_part(
+ Bucket=test_bucket,
+ Key=test_key,
+ Body=test_data,
+ PartNumber=1,
+ UploadId=uploadId
+ )
+ response = client.list_parts(
+ Bucket=test_bucket,
+ Key=test_key,
+ UploadId=uploadId
+ )
+
+ tmp_file = 'tmp_file'
+ record = {'bucket': test_bucket, 'key': test_key, 'tmp_filename': tmp_file,
+ 'mtime': response['Part'][0]['LastModified'], 'etag': response['Part'][0]['ETag'],
+ 'file_size': response['Part'][0]['Size'], 'part_size': response['Part'][0]['Size'], 'parts': []}
+ with open(tmp_file, 'w') as f:
+ json.dump(record, f)
+ r = client._check_all_upload_parts(
+ bucket=test_bucket,
+ key=test_key,
+ uploadid=uploadId,
+ local_path=test_key,
+ parts_num=1,
+ part_size=int(response['Part'][0]['Size']),
+ last_size=int(response['Part'][0]['Size']),
+ already_exist_parts={}
+ )
+ assert r
+ response = client.abort_multipart_upload(
+ Bucket=test_bucket,
+ Key=test_key,
+ UploadId=uploadId
+ )
if __name__ == "__main__":
setUp()
"""
+ test_config_invalid_scheme()
+ test_config_credential_inst()
+ test_config_anoymous()
+ test_config_none_aksk()
test_put_get_delete_object_10MB()
test_put_object_speacil_names()
test_get_object_special_names()
@@ -2352,6 +3581,8 @@ def test_sse_c_file():
test_put_get_delete_bucket_tagging()
test_put_get_delete_object_tagging()
test_put_get_delete_bucket_referer()
+ test_put_get_bucket_intelligenttiering()
+ test_put_get_delete_bucket_domain_certificate()
test_put_get_traffic_limit()
test_select_object()
test_download_file()
@@ -2382,6 +3613,43 @@ def test_sse_c_file():
test_ci_create_doc_transcode_jobs()
test_ci_list_doc_transcode_jobs()
test_ci_live_video_auditing()
+ test_ci_delete_asr_template()
+ test_ci_get_asr_template()
+ test_ci_update_asr_template()
+ test_ci_create_asr_template()
+ test_ci_list_asr_jobs()
+ test_ci_get_asr_jobs()
+ test_ci_create_asr_jobs()
+ test_ci_put_asr_queue()
+ test_ci_get_asr_queue()
+ test_ci_get_asr_bucket()
+ test_ci_get_doc_bucket()
+ test_ci_doc_preview_to_html_process()
+ test_ci_doc_preview_process()
+ test_qrcode()
+ test_ci_put_doc_queue()
+ test_ci_list_workflowexecution()
+ test_ci_get_workflowexecution()
+ test_ci_trigger_workflow()
+ test_ci_get_media_transcode_jobs()
+ test_ci_get_media_pic_jobs()
+ test_ci_put_media_pic_queue()
+ test_ci_compress_image()
+ test_pic_process_when_download_object()
+ test_ci_image_detect_label()
+ test_ci_image_detect_car()
+ test_ci_get_image_ave_info()
+ test_ci_delete_image_style()
+ test_pic_process_when_put_object()
+ test_process_on_cloud()
+ test_ci_auditing_video_submit()
+ test_ci_auditing_audio_submit()
+ test_ci_auditing_text_submit()
+ test_get_object_sensitive_content_recognition()
+ test_ci_auditing_document_submit()
+ test_ci_auditing_html_submit()
+ test_ci_auditing_image_batch()
+ test_ci_auditing_virus_submit()
test_sse_c_file()
"""
tearDown()