-
Notifications
You must be signed in to change notification settings - Fork 4
/
vnchaince.py
143 lines (112 loc) · 4.05 KB
/
vnchaince.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# encoding: UTF-8
from __future__ import print_function
import requests
from queue import Queue, Empty
from multiprocessing.dummy import Pool
from time import time
from jwt import PyJWS
from .edalgorithm import EDAlgorithm
REST_HOST = 'https://api.hoo.com'
class ChainceRestApi(object):
""""""
jws = PyJWS()
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.jws.register_algorithm('Ed25519', EDAlgorithm())
self.apiKey = ''
self.secretKey = ''
self.active = False
self.reqid = 0
self.queue = Queue()
self.pool = None
self.sessionDict = {}
#----------------------------------------------------------------------
def init(self, apiKey, secretKey):
"""初始化"""
self.apiKey = str(apiKey)
self.secretKey = str(secretKey)
#----------------------------------------------------------------------
def start(self, n=10):
"""启动"""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.pool:
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def addReq(self, method, path, callback, params=None, postdict=None):
"""添加请求"""
self.reqid += 1
req = (method, path, callback, params, postdict, self.reqid)
self.queue.put(req)
return self.reqid
#----------------------------------------------------------------------
def processReq(self, req, i):
"""处理请求"""
method, path, callback, params, postdict, reqid = req
url = REST_HOST + path
header = {'Authorization': 'Bearer %s' % self.generateSignature()}
try:
# 使用长连接的session,比短连接的耗时缩短20%
resp = self.sessionDict[i].request(method, url, headers=header, params=params, json=postdict)
code = resp.status_code
d = resp.json()
if code == 200:
callback(d, req)
else:
self.onError(code, str(d))
except Exception as e:
self.onError(type(e), e.message)
#----------------------------------------------------------------------
def run(self, i):
"""连续运行"""
s = requests.session()
s.keep_alive = False
self.sessionDict[i] = s
while self.active:
try:
req = self.queue.get(timeout=1)
self.processReq(req, i)
except Empty:
pass
#----------------------------------------------------------------------
def generateSignature(self):
"""生成签名"""
payload = '{"key": "%s", "iat": %s}' % (self.apiKey, int(time()))
try:
# py2
signature = self.jws.encode(payload, secretKey, algorithm='Ed25519')
except TypeError:
# py3
payload = bytes(payload, encoding="utf-8")
bearer = self.jws.encode(payload, secretKey, algorithm='Ed25519')
signature = str(bearer, encoding="utf-8")
return signature
#----------------------------------------------------------------------
def onError(self, code, error):
"""错误回调"""
print('on error')
print(code, error)
#----------------------------------------------------------------------
def onData(self, data, req):
"""通用回调"""
print('on %s' % req[1])
print(data, req[5])
if __name__ == '__main__':
from six.moves import input
apiKey = ''
secretKey = ''
# REST测试
rest = ChainceRestApi()
rest.init(apiKey, secretKey)
rest.start(1)
rest.addReq('GET', '/orders/ceteos/today', rest.onData)
input()