-
Notifications
You must be signed in to change notification settings - Fork 7
/
sign.py
166 lines (142 loc) · 6.08 KB
/
sign.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# coding:utf-8
"""
Copyright (year) Beijing Volcano Engine Technology Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import datetime
import hashlib
import hmac
from urllib.parse import quote
import requests
# 以下参数视服务不同而不同,一个服务内通常是一致的
Service = "iam"
Version = "2018-01-01"
Region = "cn-north-1"
Host = "iam.volcengineapi.com"
ContentType = "application/x-www-form-urlencoded"
# 请求的凭证,从IAM或者STS服务中获取
AK = "AKExample***********"
SK = "QwERrtyasdf*********"
# 当使用临时凭证时,需要使用到SessionToken传入Header,并计算进SignedHeader中,请自行在header参数中添加X-Security-Token头
# SessionToken = ""
def norm_query(params):
query = ""
for key in sorted(params.keys()):
if type(params[key]) == list:
for k in params[key]:
query = (
query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&"
)
else:
query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&")
query = query[:-1]
return query.replace("+", "%20")
# 第一步:准备辅助函数。
# sha256 非对称加密
def hmac_sha256(key: bytes, content: str):
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
# sha256 hash算法
def hash_sha256(content: str):
return hashlib.sha256(content.encode("utf-8")).hexdigest()
# 第二步:签名请求函数
def request(method, date, query, header, ak, sk, action, body):
# 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表
# AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。
# 初始化身份证明结构体
credential = {
"access_key_id": ak,
"secret_access_key": sk,
"service": Service,
"region": Region,
}
# 初始化签名结构体
request_param = {
"body": body,
"host": Host,
"path": "/",
"method": method,
"content_type": ContentType,
"date": date,
"query": {"Action": action, "Version": Version, **query},
}
if body is None:
request_param["body"] = ""
# 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。
# 初始化签名结果的结构体
x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ")
short_x_date = x_date[:8]
x_content_sha256 = hash_sha256(request_param["body"])
sign_result = {
"Host": request_param["host"],
"X-Content-Sha256": x_content_sha256,
"X-Date": x_date,
"Content-Type": request_param["content_type"],
}
# 第五步:计算 Signature 签名。
signed_headers_str = ";".join(
["content-type", "host", "x-content-sha256", "x-date"]
)
# signed_headers_str = signed_headers_str + ";x-security-token"
canonical_request_str = "\n".join(
[request_param["method"].upper(),
request_param["path"],
norm_query(request_param["query"]),
"\n".join(
[
"content-type:" + request_param["content_type"],
"host:" + request_param["host"],
"x-content-sha256:" + x_content_sha256,
"x-date:" + x_date,
]
),
"",
signed_headers_str,
x_content_sha256,
]
)
# 打印正规化的请求用于调试比对
print(canonical_request_str)
hashed_canonical_request = hash_sha256(canonical_request_str)
# 打印hash值用于调试比对
print(hashed_canonical_request)
credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
# 打印最终计算的签名字符串用于调试比对
print(string_to_sign)
k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date)
k_region = hmac_sha256(k_date, credential["region"])
k_service = hmac_sha256(k_region, credential["service"])
k_signing = hmac_sha256(k_service, "request")
signature = hmac_sha256(k_signing, string_to_sign).hex()
sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
credential["access_key_id"] + "/" + credential_scope,
signed_headers_str,
signature,
)
header = {**header, **sign_result}
# header = {**header, **{"X-Security-Token": SessionToken}}
# 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。
r = requests.request(method=method,
url="https://{}{}".format(request_param["host"], request_param["path"]),
headers=header,
params=request_param["query"],
data=request_param["body"],
)
return r.json()
if __name__ == "__main__":
# response_body = request("Get", datetime.datetime.utcnow(), {}, {}, AK, SK, "ListUsers", None)
# print(response_body)
now = datetime.datetime.utcnow()
# Body的格式需要配合Content-Type,API使用的类型请阅读具体的官方文档,如:json格式需要json.dumps(obj)
response_body = request("GET", now, {"Limit": "2"}, {}, AK, SK, "ListUsers", None)
print(response_body)
response_body = request("POST", now, {"Limit": "10"}, {}, AK, SK, "ListUsers", "UnUseParam=ASDF")
print(response_body)