-
Notifications
You must be signed in to change notification settings - Fork 0
/
log_lines.py
119 lines (98 loc) · 4.53 KB
/
log_lines.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import datetime
import re
import pytz
from tzlocal import get_localzone
class LogLine:
def __init__(self, timestamp: datetime.datetime, category: str, content: str):
self.content = content
self.category = category
self.timestamp = timestamp
@staticmethod
def _deduce_timestamp(time: datetime.datetime, read_on: datetime.datetime):
read_on_local = read_on.astimezone(get_localzone())
current_day = time.replace(year=read_on_local.year, month=read_on_local.month,
day=read_on_local.day)
day_before = current_day - datetime.timedelta(days=1)
day_after = current_day + datetime.timedelta(days=1)
candidates = [current_day, day_before, day_after]
best_candidate = None
best_candidate_diff = datetime.timedelta.max
for candidate in candidates:
diff = abs(read_on_local - candidate)
if best_candidate_diff >= diff:
best_candidate = candidate
best_candidate_diff = diff
return best_candidate.astimezone(pytz.utc)
@staticmethod
def parse_log_line(line: str, read_on: datetime.datetime):
# Logs are in the form [HH:MM:SS] [<category>]: <content>. Time is in military format.
time_string = ''
category_string = ''
content_string = ''
state = 'start_time'
for c in line:
if state == 'start_time':
if c != '[':
raise ValueError('A log line must start with "["')
state = 'time'
elif state == 'time':
if c == ']':
state = 'start_category'
else:
time_string += c
elif state == 'start_category':
if c == '[':
state = 'category'
elif state == 'category':
if c == ']':
state = 'start_content'
else:
category_string += c
elif state == 'start_content':
if c == ' ':
state = 'content'
elif state == 'content':
content_string += c
else:
raise RuntimeError('Invalid state: ' + state)
if state != 'content':
raise ValueError('The line ' + line + ' does not have the required, 3-part format')
timestamp = LogLine._deduce_timestamp(LogLine.parse_time_string(time_string),
read_on)
return LogLine(timestamp, category_string, content_string)._specialized()
@staticmethod
def parse_time_string(time_string):
return get_localzone().localize(datetime.datetime.strptime(time_string, '%H:%M:%S'),
is_dst=None)
def _specialized(self):
if self.category == 'Server thread/INFO':
if re.fullmatch('<.*> .*', self.content):
name_end_index = self.content.find('>')
player_name = self.content[1:name_end_index]
player_message = self.content[name_end_index + 2:]
return PlayerMessageLine(self, player_name, player_message)
elif re.fullmatch('\[Server\] .*', self.content):
message = self.content[self.content.find(']') + 2:]
return ServerMessage(self, message)
elif self.content.endswith(' joined the game'):
return PlayerJoinedLine(self, self.content[:-len(' joined the game')])
elif self.content.endswith(' left the game'):
return PlayerLeftLine(self, self.content[:-len(' left the game')])
return self
class ServerMessage(LogLine):
def __init__(self, base: LogLine, message_text: str):
LogLine.__init__(self, base.timestamp, base.category, base.content)
self.message_text = message_text
class PlayerMessageLine(LogLine):
def __init__(self, base: LogLine, player_name: str, message_text: str):
LogLine.__init__(self, base.timestamp, base.category, base.content)
self.message_text = message_text
self.player_name = player_name
class PlayerJoinedLine(LogLine):
def __init__(self, base: LogLine, player_name: str):
LogLine.__init__(self, base.timestamp, base.category, base.content)
self.player_name = player_name
class PlayerLeftLine(LogLine):
def __init__(self, base: LogLine, player_name: str):
LogLine.__init__(self, base.timestamp, base.category, base.content)
self.player_name = player_name