-
Notifications
You must be signed in to change notification settings - Fork 0
/
liqi_new.py
161 lines (146 loc) · 5.51 KB
/
liqi_new.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# 捕获websocket数据并解析雀魂"动作"语义为Json
import json
from struct import unpack
import base64
from enum import Enum
from typing import List, Dict
from google.protobuf.json_format import MessageToDict
from proto import liqi_pb2, basic_pb2
class MsgType(Enum):
Notify = 1
Req = 2
Res = 3
class LiqiProto:
def __init__(self):
# 解析一局的WS消息队列
self.tot = 0 # 当前总共解析的包数量
# (method_name:str,pb.MethodObj) for 256 sliding windows; req->res
self.res_type = {} # int -> (method_name,pb2obj)
self.jsonProto = json.load(open('./proto/liqi.json', 'r'))
def parse(self, flow_msg):
# parse一帧WS flow msg,要求按顺序parse
buf = flow_msg.content
from_client = flow_msg.from_client
result = {}
msg_block = basic_pb2.BaseMessage()
msg_type = MsgType(buf[0]) # 通信报文类型
if msg_type == MsgType.Notify:
msg_block.ParseFromString(buf[1:]) # 解析剩余报文结构
method_name = msg_block.method_name
_, lq, message_name = method_name.split('.')
liqi_pb2_notify = getattr(liqi_pb2, message_name)
proto_obj = liqi_pb2_notify.FromString(msg_block.data)
dict_obj = MessageToDict(
proto_obj, preserving_proto_field_name=True, including_default_value_fields=True)
if 'data' in dict_obj:
B = base64.b64decode(dict_obj['data'])
action_proto_obj = getattr(
liqi_pb2, dict_obj['name']).FromString(decode(B))
action_dict_obj = MessageToDict(
action_proto_obj, preserving_proto_field_name=True, including_default_value_fields=True)
dict_obj['data'] = action_dict_obj
msg_id = self.tot
else:
msg_id = unpack('<H', buf[1:3])[0] # 小端序解析报文编号(0~255)
msg_block.ParseFromString(buf[3:])
if msg_type == MsgType.Req:
assert (msg_id < 1 << 16)
# assert(len(msg_block) == 2)
assert (msg_id not in self.res_type)
method_name = msg_block.method_name
_, lq, service, rpc = method_name.split('.')
proto_domain = self.jsonProto['nested'][lq]['nested'][service]['methods'][rpc]
liqi_pb2_req = getattr(liqi_pb2, proto_domain['requestType'])
proto_obj = liqi_pb2_req.FromString(msg_block.data)
dict_obj = MessageToDict(
proto_obj, preserving_proto_field_name=True, including_default_value_fields=True)
self.res_type[msg_id] = (method_name, getattr(
liqi_pb2, proto_domain['responseType'])) # wait response
elif msg_type == MsgType.Res:
assert (len(msg_block.method_name) == 0)
assert (msg_id in self.res_type)
method_name, liqi_pb2_res = self.res_type.pop(msg_id)
proto_obj = liqi_pb2_res.FromString(msg_block.data)
dict_obj = MessageToDict(
proto_obj, preserving_proto_field_name=True, including_default_value_fields=True)
result = {'id': msg_id, 'type': msg_type,
'method': method_name, 'data': dict_obj}
self.tot += 1
return result
def fromProtobuf(buf) -> List[Dict]:
"""
dump the struct of protobuf,观察报文结构
buf: protobuf bytes
"""
p = 0
result = []
while (p < len(buf)):
block_begin = p
block_type = (buf[p] & 7)
block_id = buf[p] >> 3
p += 1
if block_type == 0:
# varint
block_type = 'varint'
data, p = parseVarint(buf, p)
elif block_type == 2:
# string
block_type = 'string'
s_len, p = parseVarint(buf, p)
data = buf[p:p+s_len]
p += s_len
else:
raise Exception('unknow type:', block_type, ' at', p)
result.append({'id': block_id, 'type': block_type,
'data': data, 'begin': block_begin})
return result
def toVarint(x: int) -> bytes:
data = 0
base = 0
length = 0
if x == 0:
return b'\x00'
while (x > 0):
length += 1
data += (x & 127) << base
x >>= 7
if x > 0:
data += 1 << (base+7)
base += 8
return data.to_bytes(length, 'little')
def toProtobuf(data: List[Dict]) -> bytes:
"""
Inverse operation of 'fromProtobuf'
"""
result = b''
for d in data:
if d['type'] == 'varint':
result += ((d['id'] << 3)+0).to_bytes(length=1, byteorder='little')
result += toVarint(d['data'])
elif d['type'] == 'string':
result += ((d['id'] << 3)+2).to_bytes(length=1, byteorder='little')
result += toVarint(len(d['data']))
result += d['data']
else:
raise NotImplementedError
return result
def parseVarint(buf, p):
# parse a varint from protobuf
data = 0
base = 0
while (p < len(buf)):
data += (buf[p] & 127) << base
base += 7
p += 1
if buf[p-1] >> 7 == 0:
break
return (data, p)
def decode(data: bytes):
keys = [0x84, 0x5e, 0x4e, 0x42, 0x39, 0xa2, 0x1f, 0x60, 0x1c]
data = bytearray(data)
k = len(keys)
d = len(data)
for i, j in enumerate(data):
u = (23 ^ d) + 5 * i + keys[i % k] & 255
data[i] ^= u
return bytes(data)