Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions qcloud_cos/cos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,43 @@ def __call__(self, r):
logger.debug("request headers: " + str(r.headers))
return r

class CosRtmpAuth(AuthBase):
def __init__(self, conf, bucket=None, channel=None, params={}, expire=10000):
self._secret_id = conf._secret_id
self._secret_key = conf._secret_key
self._token = conf._token
self._anonymous = conf._anonymous
self._expire = expire
self._params = params
if self._token:
self._params['q-token'] = self._token
self._path = u'/' + bucket + u'/' + channel

def get_rtmp_sign(self):
# get rtmp string
canonicalized_param = ''
for k, v in self._params.iteritems():
canonicalized_param += '{key}={value}&'.format(key=k, value=v)
canonicalized_param = canonicalized_param.rstrip('&')
rtmp_str = u"{path}\n{params}\n".format(path=self._path, params=canonicalized_param)
logger.debug("rtmp str: " + rtmp_str)

sha1 = hashlib.sha1()
sha1.update(to_bytes(rtmp_str))
# get time
sign_time = int(time.time())
sign_time_str = "{start_time};{end_time}".format(start_time = sign_time - 60, end_time = sign_time + self._expire)
str_to_sign = "sha1\n{time}\n{sha1}\n".format(time = sign_time_str, sha1 = sha1.hexdigest())
logger.debug('str_to_sign: ' + str(str_to_sign))
# get sinature
signature = hmac.new(to_bytes(self._secret_key), to_bytes(str_to_sign), hashlib.sha1).hexdigest()
logger.debug('signature: ' + str(signature))
rtmp_sign = "q-sign-algorithm=sha1&q-ak={ak}&q-sign-time={sign_time}&q-key-time={key_time}&q-signature={sign}".format(
ak=self._secret_id, sign_time=sign_time_str, key_time=sign_time_str, sign=signature)
if canonicalized_param != '':
return rtmp_sign + "&{params}".format(params=canonicalized_param)
else:
return rtmp_sign

if __name__ == "__main__":
pass
322 changes: 322 additions & 0 deletions qcloud_cos/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .streambody import StreamBody
from .xml2dict import Xml2Dict
from .cos_auth import CosS3Auth
from .cos_auth import CosRtmpAuth
from .cos_comm import *
from .cos_threadpool import SimpleThreadPool
from .cos_exception import CosClientError
Expand Down Expand Up @@ -3625,6 +3626,327 @@ def get_async_fetch_task(self, Bucket, TaskId, **kwargs):
data = rt.json()
return data

def put_live_channel(self, Bucket, ChannelName, Expire=3600, LiveChannelConfiguration={}, **kwargs):
"""创建直播通道

:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:param Expire(int): 推流url签名过期时间.
:param LiveChannelConfiguration(dict): 直播通道配置.
:param kwargs(dict): 设置请求headers.
:return(dict): publish url and playurl.

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
# 设置直播通道配置
livechannel_config = {
'Description': 'channel description',
'Switch': 'Enabled',
'Target': {
'Type': 'HLS',
'FragDuration': '3',
'FragCount': '5',
}
}
response = client.put_live_channel(Bucket='bucket', ChannelName='ch1', LiveChannelConfiguration=livechannel_config)
"""
xml_config = format_xml(data=LiveChannelConfiguration, root='LiveChannelConfiguration')
headers = mapped(kwargs)
headers['Content-MD5'] = get_md5(xml_config)
headers['Content-Type'] = 'application/xml'
params = {'live': ''}
url = self._conf.uri(bucket=Bucket, path=ChannelName)
logger.info("put live channel, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
rt = self.send_request(
method='PUT',
url=url,
bucket=Bucket,
data=xml_config,
auth=CosS3Auth(self._conf, params=params, key=ChannelName),
headers=headers,
params=params)
data = xml_to_dict(rt.content)
if data['PublishUrls']['Url'] is not None:
rtmpSign = CosRtmpAuth(self._conf, bucket=Bucket, channel=ChannelName, expire=Expire)
url = data['PublishUrls']['Url']
url += '?' + rtmpSign.get_rtmp_sign()
data['PublishUrls']['Url'] = url
return data

def get_rtmp_signed_url(self, Bucket, ChannelName, Expire=3600, Params={}):
"""获取直播通道带签名的推流url
:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:return: dict.

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
resp = client.get_rtmp_signed_url(Bucket='bucket', ChannelName='ch1')
"""
rtmp_signed_url = 'rtmp://{bucket}.cos.{region}.myqcloud.com/live/{channel}'.format(bucket=Bucket, region=self._conf._region, channel=ChannelName)
rtmpAuth = CosRtmpAuth(self._conf, bucket=Bucket, channel=ChannelName, params = Params, expire=Expire)
return rtmp_signed_url + '?' + rtmpAuth.get_rtmp_sign()

def get_live_channel_info(self, Bucket, ChannelName, **kwargs):
"""获取直播通道配置信息

:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:param kwargs(dict): 设置请求headers.
:return: dict.

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
resp = client.get_live_channel_info(Bucket='bucket', ChannelName='ch1')
"""
params = {'live': ''}
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=ChannelName)
logger.info("get live channel info, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
rt = self.send_request(
method='GET',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params, key=ChannelName),
headers=headers,
params=params)
data = xml_to_dict(rt.content)
return data

def put_live_channel_switch(self, Bucket, ChannelName, Switch, **kwargs):
"""禁用或者开启直播通道

:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:param Switch(string): 'enabled'或'disabled'.
:param kwargs(dict): 设置请求headers.
:return(None).

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
client.put_live_channel_switch(Bucket='bucket', ChannelName='ch1', Switch='enabled')
"""
params = {'live': ''}
if Switch in ['enabled', 'disabled']:
params['switch'] = Switch
else:
raise CosClientError('switch must be enabled or disabled')

headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=ChannelName)
logger.info("put live channel switch, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
self.send_request(
method='PUT',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params, key=ChannelName),
headers=headers,
params=params)
return None

def get_live_channel_history(self, Bucket, ChannelName, **kwargs):
"""获取直播通道推流历史

:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:param kwargs(dict): 设置请求headers.
:return(dict).

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
resp = client.get_live_channel_history(Bucket='bucket', ChannelName='ch1')
"""
params = {'live': '', 'comp' : 'history'}
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=ChannelName)
logger.info("get live channel history, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
rt = self.send_request(
method='GET',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params, key=ChannelName),
headers=headers,
params=params)
data = xml_to_dict(rt.content)
format_dict(data, ['LiveRecord'])
return data

def get_live_channel_status(self, Bucket, ChannelName, **kwargs):
"""获取直播通道推流状态

:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:param kwargs(dict): 设置请求headers.
:return(dict).

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
resp = client.get_live_channel_status(Bucket='bucket', ChannelName='ch1')
"""
params = {'live': '', 'comp' : 'status'}
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=ChannelName)
logger.info("get live channel status, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
rt = self.send_request(
method='GET',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params, key=ChannelName),
headers=headers,
params=params)
data = xml_to_dict(rt.content)
return data

def delete_live_channel(self, Bucket, ChannelName, **kwargs):
"""删除直播通道

:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:param kwargs(dict): 设置请求headers.
:return(dict).

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
client.delete_live_channel(Bucket='bucket', ChannelName='ch1')
"""
params = {'live': ''}
url = self._conf.uri(bucket=Bucket, path=ChannelName)
headers = mapped(kwargs)
logger.info("delete live channel, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
rt = self.send_request(
method='DELETE',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params, key=ChannelName),
headers=headers,
params=params)
data = dict(**rt.headers)
return data

def get_vod_playlist(self, Bucket, ChannelName, StartTime = 0, EndTime = 0, **kwargs):
"""查询指定时间段播放列表文件

:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:param StartTime(int): 播放列表ts文件的起始时间,格式为unix时间戳.
:param EndTime(int): 播放列表ts文件的结束时间,格式为unix时间戳.
:param kwargs(dict): 设置请求headers.
:return(string).

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
resp = client.get_vod_playlist(Bucket='bucket', ChannelName='ch1', StartTime=1611218201, EndTime=1611218300)
"""
if StartTime <= 0 or EndTime <= 0:
raise CosClientError('invalid timestamp')
if StartTime >= EndTime:
raise CosClientError('StartTime must be less than EndTime')

params = {'vod': '', 'starttime' : StartTime, 'endtime' : EndTime}
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=ChannelName)
logger.info("get vod playlist, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
rt = self.send_request(
method='GET',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params, key=ChannelName),
headers=headers,
params=params)
return rt.content

def post_vod_playlist(self, Bucket, ChannelName, PlaylistName, StartTime = 0, EndTime = 0, **kwargs):
"""生成点播播放列表文件

:param Bucket(string): 存储桶名称.
:param ChannelName(string): 直播通道名称.
:param PlaylistName(string): 播放列表文件名称.
:param StartTime(int): 播放列表ts文件的起始时间,格式为unix时间戳.
:param EndTime(int): 播放列表ts文件的结束时间,格式为unix时间戳.
:param kwargs(dict): 设置请求headers.
:return(None).

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
resp = client.post_vod_playlist(Bucket='bucket', ChannelName='ch1', PlaylistName='test.m3u8', StartTime=1611218201, EndTime=1611218300)
"""
if StartTime <= 0 or EndTime <= 0:
raise CosClientError('invalid timestamp')
if StartTime >= EndTime:
raise CosClientError('StartTime must be less than EndTime')
if not PlaylistName.endswith('.m3u8'):
raise CosClientError('PlaylistName must be end with .m3u8')

params = {'vod': '', 'starttime' : StartTime, 'endtime' : EndTime}
headers = mapped(kwargs)
file_path = ChannelName + '/' + PlaylistName
url = self._conf.uri(bucket=Bucket, path=file_path)
logger.info("post vod playlist, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
rt = self.send_request(
method='POST',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params, key=file_path),
headers=headers,
params=params)
return None

def list_live_channel(self, Bucket, MaxKeys = 100, Prefix = '', Marker = '', **kwargs):
"""获取直播通道列表

:param Bucket(string): 存储桶名称.
:param MaxKeys(int): 每页可以列出通道数量的最大值,有效值范围为[1, 1000],默认值:100.
:param Prefix(string): 限定返回的 LiveChannel 必须以 prefix 作为前缀.
:param Marker(string): 从 marker 之后按字母排序的第一个开始返回.
:param kwargs(dict): 设置请求headers.
:return: string.

.. code-block:: python
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象
client = CosS3Client(config)
resp = client.list_channel(Bucket='bucket', MaxKeys=100)
"""
params = {'live' : ''}
if MaxKeys >= 1:
params['max-keys'] = MaxKeys
if Prefix != '':
params['prefix'] = Prefix
if Marker != '':
params['marker'] = Marker
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket)
logger.info("list live channel, url=:{url} ,headers=:{headers}".format(url=url, headers=headers))
rt = self.send_request(
method='GET',
url=url,
bucket=Bucket,
auth=CosS3Auth(self._conf, params=params),
headers=headers,
params=params)
data = xml_to_dict(rt.content)
format_dict(data, ['LiveChannel'])
decode_result(
data,
[
'Prefix',
'Marker',
'MaxKeys',
'IsTruncated',
'NextMarker'
],
[
['LiveChannel', 'Name'],
])
return data

if __name__ == "__main__":
pass
Loading