-
Notifications
You must be signed in to change notification settings - Fork 6
/
StormReplayParser.py
160 lines (142 loc) · 6.63 KB
/
StormReplayParser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from s2protocol.mpyq import mpyq
from s2protocol import protocol15405
from s2protocol.decoders import *
import os
class StormReplayParser:
def __init__(self, replayFile):
# The replayFile can be either the name of a file or any object that has a 'read()' method.
self.mpq = mpyq.MPQArchive(replayFile)
self.buildStormReplay = protocol15405.decode_replay_header(self.mpq.header['user_data_header']['content'])['m_version']['m_baseBuild']
try:
self.protocol = __import__('s2protocol' + '.protocol%s' % self.buildStormReplay, fromlist=['protocol2'])
except ImportError:
raise Exception('Unsupported build number: %i' % self.buildStormReplay)
# Returns a unique string that is shared among all of the 10+ replays involved in this match
def getUniqueMatchId(self):
try:
return self.matchId
except AttributeError:
self.matchId = "todo"
return self.matchId
def getReplayInitData(self):
try:
return self.replayInitData
except AttributeError:
self.replayInitData = self.protocol.decode_replay_initdata(self.mpq.read_file('replay.initData'))
return self.replayInitData
def getReplayDetails(self):
try:
return self.replayDetails
except AttributeError:
self.replayDetails = self.protocol.decode_replay_details(self.mpq.read_file('replay.details'))
return self.replayDetails
# returns array indexed by user ID
def getReplayPlayers(self):
try:
return self.replayPlayers
except AttributeError:
self.players = [None] * 10
for i, player in enumerate(self.getReplayDetails()['m_playerList']):
#TODO: confirm that m_workingSetSlotId == i always
toon = player['m_toon']
player['toon_id'] = "%i-%s-%i-%i" % (toon['m_region'], toon['m_programId'], toon['m_realm'], toon['m_id'])
# The m_controlPlayerId is the field value to reference this player in the tracker events
player['m_controlPlayerId'] = i+1
self.players[i] = player
return self.players
# returns array indexed by user ID
def getPlayerSpawnInfo(self):
try:
return self.playerSpawnInfo
except AttributeError:
self.playerSpawnInfo = [None] * 10
players = self.getReplayPlayers()
playerIdToUserId = {}
for event in self.getReplayTrackerEvents():
if event['_event'] == 'NNet.Replay.Tracker.SPlayerSetupEvent':
playerIdToUserId[event['m_playerId']] = event['m_userId']
elif event['_event'] == 'NNet.Replay.Tracker.SUnitBornEvent':
playerId = event['m_controlPlayerId']
if (playerIdToUserId.has_key(playerId)):
playerIndex = playerIdToUserId[playerId] # always playerId-1 so far, but this is safer
self.playerSpawnInfo[playerIndex] = {
'hero': event['m_unitTypeName'],
'unit_tag': event['m_unitTag']
}
del playerIdToUserId[playerId]
if len(playerIdToUserId) == 0:
break
return self.playerSpawnInfo
def getReplayMessageEvents(self):
try:
return self.replayMessageEvents
except AttributeError:
messageGenerator = self.protocol.decode_replay_message_events(self.mpq.read_file('replay.message.events'))
self.replayMessageEvents = []
for event in messageGenerator:
self.replayMessageEvents.append(event)
return self.replayMessageEvents
def getMapName(self):
try:
return self.mapName
except AttributeError:
self.mapName = self.getReplayDetails()['m_title']
return self.mapName
def getMatchUTCTimestamp(self):
try:
return self.utcTimestamp
except AttributeError:
self.utcTimestamp = (self.getReplayDetails()['m_timeUTC'] / 10000000) - 11644473600
return self.utcTimestamp
def getChat(self):
try:
return self.chat
except AttributeError:
self.chat = []
for messageEvent in self.getReplayMessageEvents():
if (messageEvent['_event'] != 'NNet.Game.SChatMessage'):
continue
userId = messageEvent['_userid']['m_userId']
chatData = {
't': self.getMatchUTCTimestamp() + messageEvent['_gameloop'] / 16,
'user': userId,
'msg': messageEvent['m_string'],
}
self.chat.append(chatData)
return self.chat
def getReplayGameEvents(self):
try:
return self.replayGameEvents
except AttributeError:
generator = self.protocol.decode_replay_game_events(self.mpq.read_file('replay.game.events'))
self.replayGameEvents = []
for event in generator:
self.replayGameEvents.append(event)
return self.replayGameEvents
def getReplayGameEventsDebug(self):
try:
return self.replayGameEventsDebug
except AttributeError:
generator = self.protocol.decode_replay_game_events_debug(self.mpq.read_file('replay.game.events'))
self.replayGameEvents = []
try:
i = 0
for event in generator:
event['index'] = i
if (i >= 25000):
return self.replayGameEvents
i = i + 1
except CorruptedError as e:
self.replayGameEvents.append({'error': str(e)});
return self.replayGameEvents
def getReplayTrackerEvents(self):
try:
return self.replayTrackerEvents
except AttributeError:
generator = self.protocol.decode_replay_tracker_events(self.mpq.read_file('replay.tracker.events'))
self.replayTrackerEvents = []
for event in generator:
if event.has_key('m_unitTagIndex') and event.has_key('m_unitTagRecycle'):
event['m_unitTag'] = self.protocol.unit_tag(event['m_unitTagIndex'], event['m_unitTagRecycle'])
self.replayTrackerEvents.append(event)
return self.replayTrackerEvents