-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtick_resamplers.py
165 lines (142 loc) · 6.87 KB
/
tick_resamplers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import apache_beam as beam
from apache_beam.transforms import window
from fxbeam.combiners.tick_to_ohlcv import TickToOHLCVCombiner
from fxbeam.utils.datetime_utils import TimestampToTimeGroup, TimeGroupToTimestamp, AddWindowStartTimestampToElement
class TickByTimeGroupResampler:
"""Resampler to combine ticks into OHLCV values.
This creates a key based on the timestamp.
The process function is the entry point to be used.
"""
def __init__(self, window_size, instrument_column=None):
self.window_size = window_size
self.instrument_column = instrument_column
def process(self, data):
"""Entry point to the resampler. This function
adds the group keys to the data, resamples using the
TickToOHLCVCombiner and reinserts the timestamp value
before returning the OHLCV data.
:param data: PCollection being processed
:return: PCollection of OHLCV data including timestamp key
"""
data = self.timestamp_to_timegroup(data)
data = self.resample(data)
data = self.timegroup_to_timestamp(data)
return data
def timestamp_to_timegroup(self, data):
"""Function to create a timegroup key from timestamp
:param data: PCollection being processed
:return: PCollection with timegroup keys
"""
return data | 'Resampler - Create group from timestamp' >> beam.ParDo(
TimestampToTimeGroup(),
window_size=self.window_size,
time_group_key='time_group_key'
)
def timegroup_to_timestamp(self, data):
"""Function to create a timestamp from timegroup keys
:param data: PCollection being processed
:return: PCollection with timestamps
"""
return data | 'Resampler - Add timestamp to elements' >> beam.ParDo(
TimeGroupToTimestamp(),
window_size=self.window_size,
time_group_key='time_group_key'
)
def map_elements(self, data):
"""Map function to create key:value pairs to run CombinePerKey function.
:param data: PCollection being processed with time_group_key column and
instrument_column if set to use.
:return: PCollection with mapped data
"""
action_name = 'Resampler - Map data'
if self.instrument_column:
return data | action_name >> beam.Map(
lambda x: ((x['time_group_key'], x[self.instrument_column]), x)
)
return data | action_name >> beam.Map(lambda x: (x['time_group_key'], x))
def demap_elements(self, data):
"""De-Map function to unmap key:value pairs after CombinePerKey function.
:param data: PCollection being processed with (key: value) or
(key1, instrumentKey: value) format.
:return: PCollection with de-mapped data with keys added back into elements
"""
action_name = 'Resampler - De-Map data'
if self.instrument_column:
return data | action_name >> beam.Map(
lambda x: {'time_group_key': x[0][0], self.instrument_column: x[0][1], **x[1]}
)
return data | 'Resampler - De-Map data' >> beam.Map(lambda x: {'time_group_key': x[0], **x[1]})
def resample(self, data):
"""Resample function which runs the TickToOHLCVCombiner.
The map before and de-map after are done since the Combine
per key expects a map of key: value elements.
:param data: PCollection being processed
:return: PCollection with OHLCV data but without timestamp
"""
data = self.map_elements(data)
data = data | 'Resampler - Perform OHLCV' >> beam.CombinePerKey(TickToOHLCVCombiner())
return self.demap_elements(data)
class TickByWindowResampler:
"""Resampler to combine ticks into OHLCV values.
This creates windows to be used in the aggregation process.
The process function is the entry point to be used.
"""
def __init__(self, window_size, instrument_column=None):
self.window_size = window_size
self.instrument_column = instrument_column
def process(self, data):
"""Entry point to the resampler. This function
applies the window to the data, resamples using the
TickToOHLCVCombiner and reinserts the timestamp value
of the start of the window before returning the OHLCV data.
:param data: PCollection being processed
:return: PCollection of OHLCV data including timestamp key
"""
data = self.apply_window(data)
data = self.resample(data)
data = self.extract_window_time(data)
return data
def apply_window(self, data):
"""Function used to apply the window to the data.
Currently this is FIXED since OHLCV data is calculated using a fixed window.
:param data: PCollection being processed
:return: PCollection with applied window depending on window size.
"""
return data | 'Resampler - Divide data to windows' >> beam.WindowInto(
window.FixedWindows(self.window_size)
)
@classmethod
def extract_window_time(cls, data):
"""Function to assign the start window time ot the element
:param data: PCollection being processed
:return: PCollection with timestamps
"""
return data | 'Resampler - Add timestamp to elements' >> beam.ParDo(
AddWindowStartTimestampToElement()
)
def map_elements(self, data):
"""Map function to create key:value pairs to run CombinePerKey function.
:param data: PCollection being processed with instrument_column if set to use.
:return: PCollection with mapped data
"""
return data | 'Resampler - Map data' >> beam.Map(lambda x: (x[self.instrument_column], x))
def demap_elements(self, data):
"""De-Map function to unmap key:value pairs after CombinePerKey function.
:param data: PCollection being processed with (instrumentKey: value)
:return: PCollection with de-mapped data with keys added back into elements
"""
return data | 'Resampler - De-Map data' >> beam.Map(lambda x: {self.instrument_column: x[0], **x[1]})
def resample(self, data):
"""Resample function which runs the TickToOHLCVCombiner.
The CombineGlobally function is used since we are not using
keys. However, this still takes into account the non-global
windowing method assigned in the previous step.
:param data: PCollection being processed
:return: PCollection with OHLCV data but without timestamp
"""
action_name = 'Resampler - Perform OHLCV'
if self.instrument_column:
data = self.map_elements(data)
data = data | action_name >> beam.CombinePerKey(TickToOHLCVCombiner())
return self.demap_elements(data)
return data | action_name >> beam.CombineGlobally(TickToOHLCVCombiner())