-
Notifications
You must be signed in to change notification settings - Fork 0
/
log_deal2.py
191 lines (173 loc) · 5.47 KB
/
log_deal2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import os
import sys
'''
output structure:
{
job_1:{
node_1:{
task_1:{
"map":[
begin_time, end_time
],
"shuffle":[...], //if it is lack of properties, its value is `None`
"reduce":[...]
},
task_2:{}, ...
},
node_2:{}, ...
},
job_2:{}, ...
}
internal log structure
{
job_1:{
node_1:{
task_1:{
"map":[
begin_time, end_time
],
"shuffle":[...], //if it is lack of properties, its value is `None`
"reduce":[...]
},
task_2:{}, ...
},
node_2:{}, ...
},
job_2:{}, ...
}
'''
log_def = {
"phase_name": ["map", "reduce", "shuffle"],
"node_name": ["2", "3", "4", "5","6","7"],
"log_dir": "./output/"
}
class LogDealer:
def __init__(self):
self.logs = {}
self.output = None
self.min = sys.maxsize
self.max = 0
def load_job(self, job_id, log_dir=log_def["log_dir"]):
"""
load job by job id
:param job_id:
:param log_dir:
:return:
"""
for node_id in log_def["node_name"]:
self.load_job_from_file(node_id, job_id.strip(), log_dir)
def load_dirs(self, dirs=log_def["log_dir"]):
for i, j, k in os.walk(dirs):
for file in k:
self.load_file(os.path.join(i, file))
# load content from file
def load_file(self, filename):
tmp = os.path.basename(filename).split("-")
node_id = tmp[1]
job_id = tmp[0]
self.load_content(filename, node_id, job_id)
# filename = self.source_file + job_id + "-" + node_id
# self.logs[node_id] = {job_id: {}}
def load_job_from_file(self, node_id, job_id, log_dir):
"""
load job from file from log_dir which was denoted by job_id and node_id
:param node_id:
:param job_id:
:param log_dir:
:return:
"""
self.load_content(log_dir + job_id + "-" + node_id, node_id, job_id)
def load_content(self, filename, node_id, job_id):
"""
load content from file
:param filename:
:param node_id:
:param job_id:
:return:
"""
node_logs = []
cnt = 0
self.output = None
self.create_job_node(node_id, job_id)
with open(filename, "r") as f:
for s in f.readlines():
node_logs.append(self.line_process(s))
cnt += 1
for i, j in node_logs:
if self.logs[job_id][node_id].get(i) is None:
self.logs[job_id][node_id][i] = []
self.logs[job_id][node_id][i].append(j)
return cnt
# create node and job if not exist
def create_job_node(self, node_id, job_id):
"""
create job and node if they are not existed
:param node_id:
:param job_id:
:return:
"""
if self.logs.get(job_id) is None:
self.logs[job_id] = {node_id: {}}
return
if self.logs[job_id].get(node_id) is None:
self.logs[job_id][node_id] = {}
return
# get output and cache it
def get_output(self):
"""
when load file, the cache of output will be eraser, so be careful to load at once
get output form loaded log
:return: formatted and processed log
"""
if self.output is not None:
return self.output
ans = {}
for job in self.logs:
ans[job] = {}
for node in self.logs[job]:
ans[job][node] = {}
for task in self.logs[job][node]:
#print job,node,task
ans[job][node][task] = self.get_phase(self.logs[job][node][task])
self.output = ans
return ans
# change format of phase
def get_phase(self, phase):
"""
get each phase from a task
:param phase:
:return:
"""
ans = {}
for k in log_def["phase_name"]:
ans[k] = {}
for i in phase:
ans[i["phase"]][i["status"]] = (i["timestamp"] - self.min) / 1000
for i in ans:
if ans[i] == {} or len(ans[i]) != 2:
ans[i] = None
else:
ans[i] = [ans[i]["start"], ans[i]["stop"]]
return ans
# process log file line
def line_process(self, s):
"""
process line, extract phase info from it and compute minimal timestamp.
:param s:
:return:
"""
s = s.split(" ")[-1].split("-")
ans = {"timestamp": int(s[1].strip()), "phase": s[3].strip(), "status": s[4].strip()}
self.min = min(self.min, ans["timestamp"])
self.max = max(self.max,ans["timestamp"])
return s[2], ans
def process(self, job_id):
self.load_job(job_id)
return self.get_output(),(self.max - self.min )/1000
if __name__ == '__main__':
log = LogDealer()
log.load_dirs()
t = log.get_output()
print(t)
# print
# log.process("1540438986149_0010")