Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ nosetests.xml
.mr.developer.cfg
.project
.pydevproject
/.idea
/.venv
7 changes: 5 additions & 2 deletions qiniu/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
API_HOST = 'http://api.qiniu.com' # 数据处理操作Host
UC_HOST = 'https://uc.qbox.me' # 获取空间信息Host

_BLOCK_SIZE = 1024 * 1024 * 4 # 断点续上传分块大小,该参数为接口规格,暂不支持修改
_BLOCK_SIZE = 1024 * 1024 * 4 # 断点续传分块大小,该参数为接口规格,暂不支持修改

_config = {
'default_zone': zone.Zone(),
Expand All @@ -18,6 +18,7 @@
'connection_timeout': 30, # 链接超时为时间为30s
'connection_retries': 3, # 链接重试次数为3次
'connection_pool': 10, # 链接池个数为10
'default_upload_threshold': 2 * _BLOCK_SIZE # put_file上传方式的临界默认值
}


Expand All @@ -28,7 +29,7 @@ def get_default(key):
def set_default(
default_zone=None, connection_retries=None, connection_pool=None,
connection_timeout=None, default_rs_host=None, default_uc_host=None,
default_rsf_host=None, default_api_host=None):
default_rsf_host=None, default_api_host=None, default_upload_threshold=None):
if default_zone:
_config['default_zone'] = default_zone
if default_rs_host:
Expand All @@ -45,3 +46,5 @@ def set_default(
_config['connection_pool'] = connection_pool
if connection_timeout:
_config['connection_timeout'] = connection_timeout
if default_upload_threshold:
_config['default_upload_threshold'] = default_upload_threshold
8 changes: 8 additions & 0 deletions qiniu/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ def _post_with_token(url, data, token):
return _post(url, data, None, _TokenAuth(token))


def _post_with_token_and_headers(url, data, token, headers):
return _post(url, data, None, _TokenAuth(token), headers)


def _post_file(url, data, files):
return _post(url, data, files, None)

Expand Down Expand Up @@ -132,6 +136,10 @@ def _put_with_auth(url, data, auth):
return _put(url, data, None, qiniu.auth.RequestsAuth(auth))


def _put_with_token_and_headers(url, data, auth, headers):
return _put(url, data, None, _TokenAuth(auth), headers)


def _put_with_auth_and_headers(url, data, auth, headers):
return _put(url, data, None, qiniu.auth.RequestsAuth(auth), headers)

Expand Down
183 changes: 150 additions & 33 deletions qiniu/services/storage/uploader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-

import hashlib
import json
import os
import time

Expand Down Expand Up @@ -45,7 +47,8 @@ def put_data(

def put_file(up_token, key, file_path, params=None,
mime_type='application/octet-stream', check_crc=False,
progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None):
progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None,
part_size=None, version=None, bucket_name=None):
"""上传文件到七牛

Args:
Expand All @@ -58,22 +61,25 @@ def put_file(up_token, key, file_path, params=None,
progress_handler: 上传进度
upload_progress_recorder: 记录上传进度,用于断点续传
hostscache_dir: host请求 缓存文件保存位置
version 分片上传版本 目前支持v1/v2版本 默认v1
part_size 分片上传v2必传字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
bucket_name 分片上传v2字段必传字段 空间名称

Returns:
一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}
一个ResponseInfo对象
"""
ret = {}
size = os.stat(file_path).st_size
# fname = os.path.basename(file_path)
with open(file_path, 'rb') as input_stream:
file_name = os.path.basename(file_path)
modify_time = int(os.path.getmtime(file_path))
if size > config._BLOCK_SIZE * 2:
if size > config.get_default('default_upload_threshold'):
ret, info = put_stream(up_token, key, input_stream, file_name, size, hostscache_dir, params,
mime_type, progress_handler,
upload_progress_recorder=upload_progress_recorder,
modify_time=modify_time, keep_last_modified=keep_last_modified)
modify_time=modify_time, keep_last_modified=keep_last_modified,
part_size=part_size, version=version, bucket_name=bucket_name)
else:
crc = file_crc32(file_path)
ret, info = _form_put(up_token, key, input_stream, params, mime_type,
Expand Down Expand Up @@ -129,9 +135,11 @@ def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None,

def put_stream(up_token, key, input_stream, file_name, data_size, hostscache_dir=None, params=None,
mime_type=None, progress_handler=None,
upload_progress_recorder=None, modify_time=None, keep_last_modified=False):
upload_progress_recorder=None, modify_time=None, keep_last_modified=False,
part_size=None, version=None, bucket_name=None):
task = _Resume(up_token, key, input_stream, file_name, data_size, hostscache_dir, params, mime_type,
progress_handler, upload_progress_recorder, modify_time, keep_last_modified)
progress_handler, upload_progress_recorder, modify_time, keep_last_modified,
part_size, version, bucket_name)
return task.upload()


Expand All @@ -153,10 +161,14 @@ class _Resume(object):
upload_progress_recorder: 记录上传进度,用于断点续传
modify_time: 上传文件修改日期
hostscache_dir: host请求 缓存文件保存位置
version 分片上传版本 目前支持v1/v2版本 默认v1
part_size 分片上传v2必传字段 分片大小范围为1 MB - 1 GB
bucket_name 分片上传v2字段必传字段 空间名称
"""

def __init__(self, up_token, key, input_stream, file_name, data_size, hostscache_dir, params, mime_type,
progress_handler, upload_progress_recorder, modify_time, keep_last_modified):
progress_handler, upload_progress_recorder, modify_time, keep_last_modified, part_size=None,
version=None, bucket_name=None):
"""初始化断点续上传"""
self.up_token = up_token
self.key = key
Expand All @@ -170,46 +182,87 @@ def __init__(self, up_token, key, input_stream, file_name, data_size, hostscache
self.upload_progress_recorder = upload_progress_recorder or UploadProgressRecorder()
self.modify_time = modify_time or time.time()
self.keep_last_modified = keep_last_modified
# print(self.modify_time)
# print(modify_time)
self.version = version or 'v1'
self.part_size = part_size or config._BLOCK_SIZE
self.bucket_name = bucket_name

def record_upload_progress(self, offset):
record_data = {
'size': self.size,
'offset': offset,
'contexts': [block['ctx'] for block in self.blockStatus]
}
if self.version == 'v1':
record_data['contexts'] = [block['ctx'] for block in self.blockStatus]
elif self.version == 'v2':
record_data['etags'] = self.blockStatus
record_data['expired_at'] = self.expiredAt
record_data['upload_id'] = self.uploadId
if self.modify_time:
record_data['modify_time'] = self.modify_time
# print(record_data)
self.upload_progress_recorder.set_upload_record(self.file_name, self.key, record_data)

def recovery_from_record(self):
record = self.upload_progress_recorder.get_upload_record(self.file_name, self.key)
if not record:
return 0

if self.version == 'v1':
return 0
elif self.version == 'v2':
return 0, None, None
try:
if not record['modify_time'] or record['size'] != self.size or \
record['modify_time'] != self.modify_time:
return 0
record['modify_time'] != self.modify_time:
if self.version == 'v1':
return 0
elif self.version == 'v2':
return 0, None, None
except KeyError:
return 0
self.blockStatus = [{'ctx': ctx} for ctx in record['contexts']]
return record['offset']
if self.version == 'v1':
return 0
elif self.version == 'v2':
return 0, None, None
if self.version == 'v1':
if not record.__contains__('contexts') or len(record['contexts']) == 0:
return 0
self.blockStatus = [{'ctx': ctx} for ctx in record['contexts']]
return record['offset']
elif self.version == 'v2':
if not record.__contains__('etags') or len(record['etags']) == 0 or \
not record.__contains__('expired_at') or float(record['expired_at']) < time.time() or \
not record.__contains__('upload_id'):
return 0, None, None
self.blockStatus = record['etags']
return record['offset'], record['upload_id'], record['expired_at']

def upload(self):
"""上传操作"""
self.blockStatus = []
if config.get_default('default_zone').up_host:
host = config.get_default('default_zone').up_host
self.recovery_index = 1
self.expiredAt = None
self.uploadId = None
host = self.get_up_host()
if self.version == 'v1':
offset = self.recovery_from_record()
self.part_size = config._BLOCK_SIZE
elif self.version == 'v2':
offset, self.uploadId, self.expiredAt = self.recovery_from_record()
if offset > 0 and self.blockStatus != [] and self.uploadId is not None \
and self.expiredAt is not None:
self.recovery_index = self.blockStatus[-1]['partNumber'] + 1
else:
self.recovery_index = 1
init_url = self.block_url_v2(host, self.bucket_name)
self.uploadId, self.expiredAt = self.init_upload_task(init_url)
else:
host = config.get_default('default_zone').get_up_host_by_token(self.up_token, self.hostscache_dir)
offset = self.recovery_from_record()
for block in _file_iter(self.input_stream, config._BLOCK_SIZE, offset):
raise ValueError("version must choose v1 or v2 !")
for index, block in enumerate(_file_iter(self.input_stream, self.part_size, offset)):
length = len(block)
crc = crc32(block)
ret, info = self.make_block(block, length, host)
if self.version == 'v1':
crc = crc32(block)
ret, info = self.make_block(block, length, host)
elif self.version == 'v2':
index_ = index + self.recovery_index
url = self.block_url_v2(host, self.bucket_name) + '/%s/%d' % (self.uploadId, index_)
ret, info = self.make_block_v2(block, url)
if ret is None and not info.need_retry():
return ret, info
if info.connect_failed():
Expand All @@ -218,28 +271,77 @@ def upload(self):
else:
host = config.get_default('default_zone').get_up_host_backup_by_token(self.up_token,
self.hostscache_dir)
if info.need_retry() or crc != ret['crc32']:
ret, info = self.make_block(block, length, host)
if ret is None or crc != ret['crc32']:
return ret, info
if self.version == 'v1':
if info.need_retry() or crc != ret['crc32']:
ret, info = self.make_block(block, length, host)
if ret is None or crc != ret['crc32']:
return ret, info
elif self.version == 'v2':
if info.need_retry():
url = self.block_url_v2(host, self.bucket_name) + '/%s/%d' % (self.uploadId, index + 1)
ret, info = self.make_block_v2(block, url)
if ret is None:
return ret, info
del ret['md5']
ret['partNumber'] = index_
self.blockStatus.append(ret)
offset += length
self.record_upload_progress(offset)
if (callable(self.progress_handler)):
self.progress_handler(((len(self.blockStatus) - 1) * config._BLOCK_SIZE) + length, self.size)
return self.make_file(host)
self.progress_handler(((len(self.blockStatus) - 1) * self.part_size) + len(block), self.size)
if self.version == 'v1':
return self.make_file(host)
elif self.version == 'v2':
make_file_url = self.block_url_v2(host, self.bucket_name) + '/%s' % self.uploadId
return self.make_file_v2(self.blockStatus, make_file_url, self.file_name,
self.mime_type, self.params)

def make_file_v2(self, block_status, url, file_name=None, mime_type=None, customVars=None):
"""completeMultipartUpload"""
parts = self.get_parts(block_status)
headers = {
'Content-Type': 'application/json',
}
data = {
'parts': parts,
'fname': file_name,
'mimeType': mime_type,
'customVars': customVars
}
ret, info = self.post_with_headers(url, json.dumps(data), headers=headers)
if ret is not None and ret != {}:
if ret['hash'] and ret['key']:
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
return ret, info

def get_up_host(self):
if config.get_default('default_zone').up_host:
host = config.get_default('default_zone').up_host
else:
host = config.get_default('default_zone').get_up_host_by_token(self.up_token, self.hostscache_dir)
return host

def make_block(self, block, block_size, host):
"""创建块"""
url = self.block_url(host, block_size)
return self.post(url, block)

def make_block_v2(self, block, url):
headers = {
'Content-Type': 'application/octet-stream',
'Content-MD5': hashlib.md5(block).hexdigest(),
}
return self.put(url, block, headers)

def block_url(self, host, size):
return '{0}/mkblk/{1}'.format(host, size)

def block_url_v2(self, host, bucket_name):
encoded_object_name = urlsafe_base64_encode(self.key) if self.key is not None else '~'
return '{0}/buckets/{1}/objects/{2}/uploads'.format(host, bucket_name, encoded_object_name)

def file_url(self, host):
url = ['{0}/mkfile/{1}'.format(host, self.size)]

if self.mime_type:
url.append('mimeType/{0}'.format(urlsafe_base64_encode(self.mime_type)))

Expand All @@ -259,7 +361,6 @@ def file_url(self, host):
"x-qn-meta-!Last-Modified/{0}".format(urlsafe_base64_encode(rfc_from_timestamp(self.modify_time))))

url = '/'.join(url)
# print url
return url

def make_file(self, host):
Expand All @@ -269,5 +370,21 @@ def make_file(self, host):
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
return self.post(url, body)

def init_upload_task(self, url):
body, resp = self.post(url, '')
if body is not None:
return body['uploadId'], body['expireAt']
else:
return None, None

def post(self, url, data):
return http._post_with_token(url, data, self.up_token)

def post_with_headers(self, url, data, headers):
return http._post_with_token_and_headers(url, data, self.up_token, headers)

def put(self, url, data, headers):
return http._put_with_token_and_headers(url, data, self.up_token, headers)

def get_parts(self, block_status):
return sorted(block_status, key=lambda i: i['partNumber'])
Loading