-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathcpdbench_utils.py
198 lines (157 loc) · 5.12 KB
/
cpdbench_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Utility functions for CPDBench.
Author: Gertjan van den Burg
Copyright (c) 2020 - The Alan Turing Institute
License: See the LICENSE file.
"""
import copy
import hashlib
import json
import numpy as np
import socket
import sys
def md5sum(filename):
"""Compute the MD5 checksum of a given file"""
blocksize = 65536
hasher = hashlib.md5()
with open(filename, "rb") as fp:
buf = fp.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = fp.read(blocksize)
return hasher.hexdigest()
def load_dataset(filename):
""" Load a CPDBench dataset """
with open(filename, "r") as fp:
data = json.load(fp)
if data["time"]["index"] != list(range(0, data["n_obs"])):
raise NotImplementedError(
"Time series with non-consecutive time axis are not yet supported."
)
mat = np.zeros((data["n_obs"], data["n_dim"]))
for j, series in enumerate(data["series"]):
mat[:, j] = series["raw"]
# We normalize to avoid numerical errors.
mat = (mat - np.nanmean(mat, axis=0)) / np.sqrt(np.nanvar(mat, axis=0, ddof=1))
return data, mat
def prepare_result(
data,
data_filename,
status,
error,
params,
locations,
runtime,
script_filename,
):
"""Prepare the experiment output as a dictionary
Parameters
----------
data : dict
The CPDBench dataset object
data_filename : str
Absolute path to the dataset file
status : str
Status of the experiments. Commonly used status codes are: SUCCESS if
the experiment was succesful, SKIP is the method was provided improper
parameters, FAIL if the method failed for whatever reason, and TIMEOUT
if the method ran too long.
error : str
If an error occurred, this field can be used to describe what it is.
params : dict
Dictionary of parameters provided to the method. It is good to be as
complete as possible, so even default methods should be added to this
field. This enhances reproducibility.
locations : list
Detected change point locations. Remember that change locations are
indices of time points and are 0-based (start counting at zero, thus
change locations are integers on the interval [0, T-1], including both
endpoints).
runtime : float
Runtime of the method. This should be computed as accurately as
possible, excluding any method-specific setup code.
script_filename :
Path to the script of the method. This is hashed to enable rough
versioning.
"""
out = {}
# record the command that was used
out["command"] = " ".join(sys.argv)
# save the script and the hash of the script as very rough versioning
out["script"] = script_filename
out["script_md5"] = md5sum(script_filename)
# record the hostname
out["hostname"] = socket.gethostname()
# record the dataset name and hash of the dataset
out["dataset"] = data["name"]
out["dataset_md5"] = md5sum(data_filename)
# record the status of the detection and any potential error message
out["status"] = status
out["error"] = error
# save the parameters that were used
out["parameters"] = params
# save the detection results
out["result"] = {"cplocations": locations, "runtime": runtime}
return out
def dump_output(output, filename=None):
"""Save result to output file or write to stdout (json format)"""
if filename is None:
print(json.dumps(output, sort_keys=True, indent="\t"))
else:
with open(filename, "w") as fp:
json.dump(output, fp, sort_keys=True, indent="\t")
def make_param_dict(args, defaults):
"""Create the parameter dict combining CLI arguments and defaults"""
params = copy.deepcopy(vars(args))
del params["input"]
if "output" in params:
del params["output"]
params.update(defaults)
return params
def exit_with_error(data, args, parameters, error, script_filename):
"""Exit and save result using the 'FAIL' exit status"""
status = "FAIL"
out = prepare_result(
data,
args.input,
status,
error,
parameters,
None,
None,
script_filename,
)
dump_output(out, args.output)
raise SystemExit
def exit_with_timeout(data, args, parameters, runtime, script_filename):
"""Exit and save result using the 'TIMEOUT' exit status"""
status = "TIMEOUT"
out = prepare_result(
data,
args.input,
status,
None,
parameters,
None,
runtime,
script_filename,
)
dump_output(out, args.output)
raise SystemExit
def exit_success(data, args, parameters, locations, runtime, script_filename):
"""Exit and save result using the 'SUCCESS' exit status"""
status = "SUCCESS"
error = None
out = prepare_result(
data,
args.input,
status,
error,
parameters,
locations,
runtime,
script_filename,
)
dump_output(out, args.output)