-
-
Notifications
You must be signed in to change notification settings - Fork 44
/
momentum_indicators.py
367 lines (330 loc) · 9.39 KB
/
momentum_indicators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
""""""
# Import Built-Ins:
# Import Third-Party:
# Import Homebrew:
import jhtalib as jhta
def ADX(df, n):
"""
Average Directional Movement Index
"""
def ADXR(df, n):
"""
Average Directional Movement Index Rating
"""
def APO(df, n_fast, n_slow, price='Close'):
"""
Absolute Price Oscillator
Returns: list of floats = jhta.APO(df, n_fast, n_slow, price='Close')
Source: https://www.fmlabs.com/reference/default.htm?url=PriceOscillator.htm
"""
apo_list = []
for i in range(len(df[price])):
if i + 1 < n_slow:
apo = float('NaN')
else:
start_fast = i + 1 - n_fast
end = i + 1
sma_fast = sum(df[price][start_fast:end]) / n_fast
start_slow = i + 1 - n_slow
end = i + 1
sma_slow = sum(df[price][start_slow:end]) / n_slow
apo = sma_slow - sma_fast
# apo *= -1
apo_list.append(apo)
return apo_list
def AROON(df, n):
"""
Aroon
"""
def AROONOSC(df, n):
"""
Aroon Oscillator
"""
def BOP(df):
"""
Balance Of Power
"""
def CCI(df, n):
"""
Commodity Channel Index
"""
def CMO(df, n, price='Close'):
"""
Chande Momentum Oscillator
"""
def DX(df, n):
"""
Directional Movement Index
"""
def MACD(df, price='Close'):
"""
Moving Average Convergence/Divergence
"""
def MACDEXT(df, price='Close'):
"""
MACD with controllable MA type
"""
def MACDFIX(df, n, price='Close'):
"""
Moving Average Convergence/Divergence Fix 12/26
"""
def MFI(df, n, high='High', low='Low', close='Close', volume='Volume'):
"""
Money Flow Index
Returns: list of floats = jhta.MFI(df, n, high='High', low='Low', close='Close', volume='Volume')
Source: https://www.fmlabs.com/reference/default.htm?url=MoneyFlowIndex.htm
"""
mfi_list = []
typprice_list = jhta.TYPPRICE(df, high, low, close)
mf_pos_list = []
mf_neg_list = []
for i in range(len(df[low])):
mf = typprice_list[i] * df[volume][i]
if i + 1 < n:
mfi = float('NaN')
mf_pos_list.append(float('NaN'))
mf_neg_list.append(float('NaN'))
else:
start = i + 1 - n
end = i + 1
if typprice_list[i] > typprice_list[i - 1]:
mf_pos_list.append(mf)
mf_neg_list.append(.0)
else:
mf_pos_list.append(.0)
mf_neg_list.append(mf)
x = sum(mf_pos_list[start:end])
# FIX ZeroDivisionError: float division by zero:
y = sum(mf_neg_list[start:end]) or .00000001
mr = x / y
mfi = 100 - (100 / (1 + mr))
mfi_list.append(mfi)
return mfi_list
def MINUS_DI(df, n):
"""
Minus Directional Indicator
"""
def MINUS_DM(df, n):
"""
Minus Directional Movement
"""
def MOM(df, n, price='Close'):
"""
Momentum
Returns: list of floats = jhta.MOM(df, n, price='Close')
Source: https://www.fmlabs.com/reference/default.htm?url=Momentum.htm
"""
mom_list = []
for i in range(len(df[price])):
if i + 1 < n:
mom = float('NaN')
else:
mom = df[price][i] - df[price][i - n]
mom_list.append(mom)
return mom_list
def PLUS_DI(df, n):
"""
Plus Directional Indicator
"""
def PLUS_DM(df, n):
"""
Plus Directional Movement
"""
def PMOM(df, n, price='Close'):
"""
%Momentum
Returns: list of floats = jhta.PMOM(df, n, price='Close')
"""
pmom_list = []
mom_list = jhta.MOM(df, n, price)
for i in range(len(df[price])):
if i + 1 < n:
pmom = float('NaN')
else:
pmom = mom_list[i] / df[price][i - n]
pmom_list.append(pmom)
return pmom_list
def PPO(df, price='Close'):
"""
Percentage Price Oscillator
"""
def RMI(df, n, price='Close'):
"""
Relative Momentum Index
Returns: list of floats = jhta.RMI(df, n, price='Close')
Source: https://www.fmlabs.com/reference/default.htm?url=RMI.htm
"""
rmi_list = []
upavg = .0
dnavg = .0
for i in range(len(df[price])):
if i + 1 < n:
rmi = float('NaN')
else:
if df[price][i] > df[price][i - n]:
up = df[price][i] - df[price][i - n]
dn = 0
else:
up = 0
dn = df[price][i - n] - df[price][i]
upavg = (upavg * (n - 1) + up) / n
dnavg = (dnavg * (n - 1) + dn) / n
if (upavg + dnavg) == 0:
rmi = float('NaN')
else:
rmi = 100 * upavg / (upavg + dnavg)
rmi_list.append(rmi)
return rmi_list
def ROC(df, n, price='Close'):
"""
Rate of change : ((price/prevPrice)-1)*100
Returns: list of floats = jhta.ROC(df, n, price='Close')
"""
roc_list = []
for i in range(len(df[price])):
if i + 1 < n:
roc = float('NaN')
else:
roc = ((df[price][i] / df[price][i - n]) - 1) * 100
roc_list.append(roc)
return roc_list
def ROCP(df, n, price='Close'):
"""
Rate of change Percentage: (price-prevPrice)/prevPrice
Returns: list of floats = jhta.ROCP(df, n, price='Close')
"""
rocp_list = []
for i in range(len(df[price])):
if i + 1 < n:
rocp = float('NaN')
else:
rocp = (df[price][i] - df[price][i - n]) / df[price][i - n]
rocp_list.append(rocp)
return rocp_list
def ROCR(df, n, price='Close'):
"""
Rate of change ratio: (price/prevPrice)
Returns: list of floats = jhta.ROCR(df, n, price='Close')
"""
rocr_list = []
for i in range(len(df[price])):
if i + 1 < n:
rocr = float('NaN')
else:
rocr = df[price][i] / df[price][i - n]
rocr_list.append(rocr)
return rocr_list
def ROCR100(df, n, price='Close'):
"""
Rate of change ratio 100 scale: (price/prevPrice)*100
Returns: list of floats = jhta.ROCR100(df, n, price='Close')
Source: https://www.fmlabs.com/reference/default.htm?url=RateOfChange.htm
"""
rocr100_list = []
for i in range(len(df[price])):
if i + 1 < n:
rocr100 = float('NaN')
else:
rocr100 = (df[price][i] / df[price][i - n]) * 100
rocr100_list.append(rocr100)
return rocr100_list
def RSI(df, n, price='Close'):
"""
Relative Strength Index
Returns: list of floats = jhta.RSI(df, n, price='Close')
Source: https://www.fmlabs.com/reference/default.htm?url=rsi.htm
"""
rsi_list = []
upavg = .0
dnavg = .0
for i in range(len(df[price])):
if i + 1 < n:
rsi = float('NaN')
else:
if df[price][i] > df[price][i - 1]:
up = df[price][i] - df[price][i - 1]
dn = 0
else:
up = 0
dn = df[price][i - 1] - df[price][i]
upavg = (upavg * (n - 1) + up) / n
dnavg = (dnavg * (n - 1) + dn) / n
rsi = 100 * upavg / (upavg + dnavg)
rsi_list.append(rsi)
return rsi_list
def STOCH(df, n, price='Close'):
"""
Stochastic
Returns: list of floats = jhta.STOCH(df, n, price='Close')
Source: https://www.fmlabs.com/reference/default.htm?url=Stochastic.htm
"""
stoch_list = []
for i in range(len(df[price])):
if i + 1 < n:
stoch = float('NaN')
else:
start = i + 1 - n
end = i + 1
lowest = min(df[price][start:end])
highest = max(df[price][start:end])
stoch = (df[price][i] - lowest) / (highest - lowest)
stoch_list.append(stoch)
return stoch_list
def STOCHF(df):
"""
Stochastic Fast
"""
def STOCHRSI(df, n, price='Close'):
"""
Stochastic Relative Strength Index
"""
def TRIX(df, n, price='Close'):
"""
1-day Rate-Of-Change (ROC) of a Triple Smooth EMA
"""
def ULTOSC(df):
"""
Ultimate Oscillator
"""
def VHF(df, n, price='Close'):
"""
Vertical Horizontal Filter
Returns: list of floats = jhta.VHF(df, n, price='Close')
Source: https://www.fmlabs.com/reference/default.htm?url=VHF.htm
"""
vhf_list = []
c_list = []
for i in range(len(df[price])):
if i + 1 < n:
vhf = float('NaN')
c_list.append(float('NaN'))
else:
start = i + 1 - n
end = i + 1
highest = max(df[price][start:end])
lowest = min(df[price][start:end])
c0 = df[price][i]
c1 = df[price][i - 1]
c = (c0 - c1) / c1
c_list.append(c)
c_sum = sum(c_list[start:end])
vhf = (highest - lowest) / c_sum
vhf_list.append(vhf)
return vhf_list
def WILLR(df, n, high='High', low='Low', close='Close'):
"""
Williams' %R
Returns: list of floats = jhta.WILLR(df, n, high='High', low='Low', close='Close')
Source: https://www.fmlabs.com/reference/default.htm?url=WilliamsR.htm
"""
willr_list = []
for i in range(len(df[close])):
if i + 1 < n:
willr = float('NaN')
else:
start = i + 1 - n
end = i + 1
willr = (max(df[high][start:end]) - df[close][i]) / (max(df[high][start:end]) - min(df[low][start:end])) * 100
# willr *= -1
willr_list.append(willr)
return willr_list