Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions qcloud_cos/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import copy
import json
import threading
import xml.dom.minidom
import xml.etree.ElementTree
from requests import Request, Session
Expand Down Expand Up @@ -2830,7 +2831,7 @@ def list_buckets(self, **kwargs):
return data

# Advanced interface
def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst, resumable_flag, already_exist_parts, enable_md5, traffic_limit):
def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst, resumable_flag, already_exist_parts, enable_md5, traffic_limit, progress_callback=None):
"""从本地文件中读取分块, 上传单个分块,将结果记录在md5——list中

:param bucket(string): 存储桶名称.
Expand All @@ -2855,6 +2856,8 @@ def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid
data = fp.read(size)
rt = self.upload_part(bucket, key, data, part_num, uploadid, enable_md5, TrafficLimit=traffic_limit)
md5_lst.append({'PartNumber': part_num, 'ETag': rt['ETag']})
if progress_callback:
progress_callback.report(size)
return None

def _get_resumable_uploadid(self, bucket, key):
Expand Down Expand Up @@ -2967,7 +2970,7 @@ def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAZThread=5, Ena
downloader = ResumableDownLoader(self, Bucket, Key, DestFilePath, object_info, PartSize, MAZThread, EnableCRC, **Kwargs)
downloader.start()

def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, EnableMD5=False, **kwargs):
def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, EnableMD5=False, progress_callback=None, **kwargs):
"""小于等于20MB的文件简单上传,大于20MB的文件使用分块上传

:param Bucket(string): 存储桶名称.
Expand Down Expand Up @@ -3037,12 +3040,14 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, Enabl
offset = 0 # 记录文件偏移量
lst = list() # 记录分块信息
pool = SimpleThreadPool(MAXThread)

callback = None
if progress_callback:
callback = ProgressCallback(file_size, progress_callback)
for i in range(1, parts_num+1):
if i == parts_num: # 最后一块
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst, resumable_flag, already_exist_parts, EnableMD5, traffic_limit)
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst, resumable_flag, already_exist_parts, EnableMD5, traffic_limit, callback)
else:
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst, resumable_flag, already_exist_parts, EnableMD5, traffic_limit)
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst, resumable_flag, already_exist_parts, EnableMD5, traffic_limit, callback)
offset += part_size

pool.wait_completion()
Expand Down
14 changes: 14 additions & 0 deletions qcloud_cos/cos_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io
import re
import sys
import threading
import xml.dom.minidom
import xml.etree.ElementTree
from datetime import datetime
Expand Down Expand Up @@ -420,3 +421,16 @@ class CiDetectType():
TERRORIST = 2
POLITICS = 4
ADS = 8


class ProgressCallback():
def __init__(self, file_size, progress_callback):
self.__lock = threading.Lock()
self.__finished_size = 0
self.__file_size = file_size
self.__progress_callback = progress_callback

def report(self, size):
with self.__lock:
self.__finished_size += size
self.__progress_callback(self.__finished_size, self.__file_size)
30 changes: 30 additions & 0 deletions ut/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ def print_error_msg(e):
print (e.get_trace_id())
print (e.get_request_id())

def percentage(consumed_bytes, total_bytes):
"""进度条回调函数,计算当前完成的百分比

:param consumed_bytes: 已经上传/下载的数据量
:param total_bytes: 总数据量
"""
if total_bytes:
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print('\r{0}% '.format(rate))
sys.stdout.flush()

def setUp():
print ("start test...")
Expand Down Expand Up @@ -707,6 +717,25 @@ def test_upload_file_multithreading():
print (ed - st)


def test_upload_file_with_progress_callback():
"""带有进度条功能的并发上传"""
file_name = "test_progress_callback"
file_size = 1024
if TRAVIS_FLAG == 'true':
file_size = 5 # set 5MB beacuse travis too slow
gen_file(file_name, file_size)
response = client.upload_file(
Bucket=test_bucket,
Key=file_name,
LocalFilePath=file_name,
MAXThread=5,
EnableMD5=True,
progress_callback=percentage
)
if os.path.exists(file_name):
os.remove(file_name)


def test_copy_file_automatically():
"""根据拷贝源文件的大小自动选择拷贝策略,不同园区,小于5G直接copy_object,大于5G分块拷贝"""
copy_source = {'Bucket': copy_test_bucket, 'Key': 'test.txt', 'Region': REGION}
Expand Down Expand Up @@ -1223,6 +1252,7 @@ def test_download_file():
test_put_get_delete_replication()
test_upload_part_copy()
test_upload_file_multithreading()
test_upload_file_with_progress_callback()
test_copy_file_automatically()
test_copy_10G_file_in_same_region()
test_list_objects()
Expand Down