-
Notifications
You must be signed in to change notification settings - Fork 0
/
extractor.py
160 lines (114 loc) · 3.29 KB
/
extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Pranav Minasandra
# pminasandra.github.io
# Nov 29, 2023
"""
Extract features from ACC data
"""
import os.path
import pandas as pd
import accreading
import config
import utilities
if not config.SUPPRESS_INFORMATIVE_PRINT:
old_print = print
print = utilities.sprint
ALL_FEATURES = {}
def feature(f):
"""
DECORATOR!!
Adds functions to a global list.
Function f must run on a pandas frame view with
columns ['Timestamp', 'X', 'Y', 'Z'] and must return
a float.
"""
global ALL_FEATURES
assert f.__name__.startswith('_')
ALL_FEATURES[f.__name__[1:]] = f
return f
def each_second_of_data(acc_df):
"""
GENERATOR!!
"""
accreading.validate_acc_file(acc_df)
acc_df['TimestampRounded'] = acc_df['Timestamp'].dt.round('1s')
for time, frame in acc_df.groupby("TimestampRounded"):
yield time, frame
##### FEATURES TO BE USED START HERE
# NOTE: Start all these functions with '_'
@feature
def _xmean(frame):
return frame.X.mean()
@feature
def _ymean(frame):
return frame.Y.mean()
@feature
def _zmean(frame):
return frame.Z.mean()
@feature
def _xvar(frame):
return frame.X.var()
@feature
def _yvar(frame):
return frame.Y.var()
@feature
def _zvar(frame):
return frame.Z.var()
@feature
def _xmin(frame):
return frame.X.min()
@feature
def _ymin(frame):
return frame.Y.min()
@feature
def _zmin(frame):
return frame.Z.min()
@feature
def _xmax(frame):
return frame.X.max()
@feature
def _ymax(frame):
return frame.Y.max()
@feature
def _zmax(frame):
return frame.Z.max()
@feature
def _mean_vedba(frame):
dx = frame.X - frame.X.mean()
dy = frame.Y - frame.Y.mean()
dz = frame.Z - frame.Z.mean()
return ((dx**2 + dy**2 + dz**2 )**0.5).mean()
##### FEATURES END HERE
def make_features_dir():
"""
Ensures that there exists a features dir
"""
os.makedirs(os.path.join(config.DATA, "Features"), exist_ok=True)
for dplment in config.DEPLOYMENTS:
os.makedirs(os.path.join(config.DATA, "Features", dplment), exist_ok=True)
def extract_all_features(accfile_generator):
"""
Extracts features from all available data
Args:
accfile_generator: a generator object, typically output
from each_second_of_data(...)
"""
for dplment, filename, df in accfile_generator:
feature_df = {} # Not a df right now, but becomes one later. Minor hack to save mem
feature_df["Timestamp"] = [] # timestamps will be stored here
for fname in ALL_FEATURES:
feature_df[fname] = [] #features will be stored here
print(f"now working on {filename}.")
secondwise_data_generator = each_second_of_data(df)
for time, frame in secondwise_data_generator:
feature_df['Timestamp'].append(time)
for fname, ffunc in ALL_FEATURES.items():
fval = ffunc(frame)
feature_df[fname].append(fval)
tgtfilename = filename + "_extracted_features.csv"
tgtfilepath = os.path.join(config.DATA, "Features", dplment, tgtfilename)
feature_df = pd.DataFrame(feature_df)
feature_df.to_csv(tgtfilepath, index=False)
if __name__ == "__main__":
make_features_dir()
accfilegen = accreading.load_acc_files()
extract_all_features(accfilegen)