In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import findspark
findspark.init()
#from pyspark import SparkContext, SparkConf, sql
import pyspark as spark
import datetime

In [2]:
import pyspark.sql.functions as spark_functions

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


In [4]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import RobustScaler

In [5]:
import pdb
from pdb import set_trace as bp

In [6]:
conf = spark.SparkConf().setAppName("Anomaly").setMaster("local[2]")
sc = spark.SparkContext(conf = conf)

In [7]:
sql_context = spark.SQLContext(sc)

In [8]:
df = sql_context.read.csv(path='/home/l7/Downloads/UPM.HE.csv')
df_keys = list(df.)

In [43]:
df_filtered = df.filter((df._c0 != 'Date'))

In [44]:
df_new = sql_context.createDataFrame(data=df_filtered.collect(),
                                    schema=df.head()[:],
                                    )
df_new.head()

Row(Date='2000-01-03', Open='20.575001', High='22.000000', Low='20.500000', Close='21.100000', Adj Close='6.877053', Volume='601912')

### Overwriting column types

In [45]:
df_new = df_new.withColumn('Date', df_new['Date'].cast(spark.sql.types.DateType()))
for column in df_new.columns[1:]:
    df_new = df_new.withColumn(column, df_new[column].cast(spark.sql.types.FloatType()))
df_new = df_new.dropna()
df_new.dtypes

[('Date', 'date'),
 ('Open', 'float'),
 ('High', 'float'),
 ('Low', 'float'),
 ('Close', 'float'),
 ('Adj Close', 'float'),
 ('Volume', 'float')]

In [46]:
df_new = df_new.withColumn('h_l_diff', spark_functions.abs(df_new['High'] - df_new['Low']))

In [47]:
df_new.show(2)

+----------+------+-----+----+-----+---------+---------+---------+
|      Date|  Open| High| Low|Close|Adj Close|   Volume| h_l_diff|
+----------+------+-----+----+-----+---------+---------+---------+
|2000-01-03|20.575| 22.0|20.5| 21.1| 6.877053| 601912.0|      1.5|
|2000-01-04|  21.1|21.55|20.1|21.55|  7.02372|1412912.0|1.4499989|
+----------+------+-----+----+-----+---------+---------+---------+
only showing top 2 rows



In [48]:
df_new.describe(['Open', 'Adj Close']).show()

+-------+------------------+------------------+
|summary|              Open|         Adj Close|
+-------+------------------+------------------+
|  count|              4937|              4937|
|   mean|15.573755305330595|10.106034078118963|
| stddev| 5.505172848734595| 6.197554401683743|
|    min|              4.44|          2.682938|
|    max|             34.42|          32.96963|
+-------+------------------+------------------+



In [49]:
window_spec = spark.sql.window.Window.orderBy('Date')
df_lag = df_new.withColumn('prev_day_price',
                          spark_functions.lag(df_new['Adj Close'],count=1)
                           .over(window_spec))

In [50]:
def calculate_returns(present, past):
    return (present - past) / past

In [51]:
df_r = df_lag.withColumn('daily_returns',
                         calculate_returns(df_lag['Adj Close'], df_lag['prev_day_price']))
df_r.show(5)

+----------+------+-----+----+-----+---------+---------+---------+--------------+--------------------+
|      Date|  Open| High| Low|Close|Adj Close|   Volume| h_l_diff|prev_day_price|       daily_returns|
+----------+------+-----+----+-----+---------+---------+---------+--------------+--------------------+
|2000-01-03|20.575| 22.0|20.5| 21.1| 6.877053| 601912.0|      1.5|          null|                null|
|2000-01-04|  21.1|21.55|20.1|21.55|  7.02372|1412912.0|1.4499989|      6.877053| 0.02132701438231618|
|2000-01-05|  21.5| 21.5|20.0| 21.1| 6.877053| 670824.0|      1.5|       7.02372|-0.02088167069115...|
|2000-01-06|  21.1| 21.1|21.1| 21.1| 6.877053|      0.0|      0.0|      6.877053|                 0.0|
|2000-01-07|  21.5|22.45|20.6| 20.9| 6.811868|1988914.0|1.8500004|      6.877053|-0.00947856520058...|
+----------+------+-----+----+-----+---------+---------+---------+--------------+--------------------+
only showing top 5 rows



In [52]:
upm_df = df_r.dropna()
upm_df = upm_df.orderBy('Date')

In [53]:
upm_df.select('daily_returns').show(5)

+--------------------+
|       daily_returns|
+--------------------+
| 0.02132701438231618|
|-0.02088167069115...|
|                 0.0|
|-0.00947856520058...|
|0.019138746267812527|
+--------------------+
only showing top 5 rows



In [54]:
start_date = upm_df.agg(spark_functions.min(upm_df.Date)).collect()
end_date = upm_df.agg(spark_functions.max(upm_df.Date)).collect()
start_date = start_date[0][0]
end_date = end_date[0][0]

In [55]:
window_length = 10
current_end_date = start_date + datetime.timedelta(days=window_length)
current_end_date

datetime.date(2000, 1, 14)

In [56]:
r1 = upm_df[(upm_df['Date'] >= start_date ) &
      (upm_df['Date'] <= datetime.datetime(2000, 2,1) )][['Date','daily_returns']]
r2 = upm_df[(upm_df['Date'] >= start_date ) &
      (upm_df['Date'] <= datetime.datetime(2000, 2,1) )]['daily_returns']

In [60]:
start_date = upm_df.agg(spark_functions.min(upm_df.Date)).collect()
end_date = upm_df.agg(spark_functions.max(upm_df.Date)).collect()
start_date = start_date[0][0]
end_date = end_date[0][0]
batch_container = []
batch_container_length = 1000


In [61]:
current_end_date = start_date + datetime.timedelta(days=batch_container_length)
while current_end_date <= end_date:
    #bp()
    r = upm_df[(upm_df['Date'] >= start_date) &
      (upm_df['Date'] <= current_end_date )][['Date','daily_returns']]
    batch_container.append(r.toPandas())
    start_date += datetime.timedelta(days=batch_container_length)
    current_end_date += datetime.timedelta(days=batch_container_length)

## Notes regarding following steps with training: 
At this point, in order to pursue training and testing, it will be necessary to translate the parts of the resulting work to batches, and load to pandas dataframes, so it becomes easy to manipulate the work, and use it to train. 

Since the current dataset is small enough to work with here, we will just translate it directly to pandas. 

In [62]:
upm_pd = upm_df.toPandas()
upm_pd[:5]

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,h_l_diff,prev_day_price,daily_returns
0,2000-01-04,21.1,21.549999,20.1,21.549999,7.02372,1412912.0,1.449999,6.877053,0.021327
1,2000-01-05,21.5,21.5,20.0,21.1,6.877053,670824.0,1.5,7.02372,-0.020882
2,2000-01-06,21.1,21.1,21.1,21.1,6.877053,0.0,0.0,6.877053,0.0
3,2000-01-07,21.5,22.450001,20.6,20.9,6.811868,1988914.0,1.85,6.877053,-0.009479
4,2000-01-10,21.5,21.950001,21.200001,21.299999,6.942239,1800878.0,0.75,6.811868,0.019139


In [63]:
upm_pd = upm_pd.dropna()
upm_pd[:5]

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,h_l_diff,prev_day_price,daily_returns
0,2000-01-04,21.1,21.549999,20.1,21.549999,7.02372,1412912.0,1.449999,6.877053,0.021327
1,2000-01-05,21.5,21.5,20.0,21.1,6.877053,670824.0,1.5,7.02372,-0.020882
2,2000-01-06,21.1,21.1,21.1,21.1,6.877053,0.0,0.0,6.877053,0.0
3,2000-01-07,21.5,22.450001,20.6,20.9,6.811868,1988914.0,1.85,6.877053,-0.009479
4,2000-01-10,21.5,21.950001,21.200001,21.299999,6.942239,1800878.0,0.75,6.811868,0.019139


In [97]:
daily_returns = upm_pd['daily_returns'].values
window_step = 2
window_length = 10
observations = int(daily_returns.shape[0]/window_step)
training_set = np.zeros([observations, window_length])
print('observations: {}'.format(observations))

observations: 2468


In [98]:
row = 0
start_point = 0
while row < observations:
    current_observations = daily_returns[start_point:start_point + window_length]
    if len(current_observations) < window_length:
        pass
    else:
        training_set[row,:] = current_observations
    row += 1
    start_point += window_step
training_set = training_set * 100
training_set[0:5]

array([[ 2.13270144, -2.08816707,  0.        , -0.94785652,  1.91387463,
        -4.22531738, -4.90199601,  3.06701701,  2.52563485,  2.195093  ],
       [ 0.        , -0.94785652,  1.91387463, -4.22531738, -4.90199601,
         3.06701701,  2.52563485,  2.195093  , -0.83527611, -6.37787642],
       [ 1.91387463, -4.22531738, -4.90199601,  3.06701701,  2.52563485,
         2.195093  , -0.83527611, -6.37787642, -4.37016334,  1.61290005],
       [-4.90199601,  3.06701701,  2.52563485,  2.195093  , -0.83527611,
        -6.37787642, -4.37016334,  1.61290005, -0.79365313, -2.64002031],
       [ 2.52563485,  2.195093  , -0.83527611, -6.37787642, -4.37016334,
         1.61290005, -0.79365313, -2.64002031, -1.94467406,  5.02793264]])

In [66]:
#scaler = MinMaxScaler(feature_range=(0,1))
#scaler.fit(training_set)
#training_set = scaler.transform(training_set)
#training_set[:5]

In [99]:
np.random.shuffle(training_set)
print(training_set.shape)

(2468, 10)


In [100]:
training_set_tensor = torch.tensor(training_set, dtype=torch.float64)

In [101]:
class GeneralDataset(torch.utils.data.Dataset):
    def __init__(self, input_data):
        self.output_tensor = torch.tensor(input_data, 
                                          dtype=torch.float64)
        self.output_tensor = self.output_tensor.reshape([self.output_tensor.shape[0],1,
                  self.output_tensor.shape[1]])
    
    def __len__(self):
        return self.output_tensor.shape[0]
    
    def __getitem__(self, index):
        return self.output_tensor[index, :, :]
    
    def get_range(self, start_index, end_index):
        return self.output_tensor[start_index:end_index,:,:]

In [102]:
training_dataset = GeneralDataset(training_set[501:])
test_dataset = GeneralDataset(training_set[:500])

print(len(training_dataset))
len(test_dataset)

1967


500

In [103]:
training_loader = torch.utils.data.DataLoader(training_dataset,
                                             batch_size=50,
                                             shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset,
                                             batch_size=50,
                                             shuffle=True)

In [72]:
class EncoderLinear(nn.Module):
    def __init__(self, input_size, 
                 mid_layer_size):
        super(EncoderLinear, self).__init__()
        self.fc1 = nn.Linear(input_size, mid_layer_size)
    
    def forward(self, x):
        x = x.float()
        x = x.view(-1, self.fc1.in_features)
        x = self.fc1(x)
        return x

In [73]:
class EncoderLSTM(nn.Module):
    def __init__(self, input_size, hidden_layer_size, 
                 mid_layer_size):
        super(EncoderLSTM, self).__init__()
        self.lstm_1 = nn.LSTM(input_size, hidden_layer_size, 1)
        self.fc1 = nn.Linear(hidden_layer_size, mid_layer_size)
    
    def forward(self, x):
        #bp()
        x = x.float()
        lstm_out, (h,c) = self.lstm_1(x)
        x = lstm_out.view(lstm_out.shape[0], lstm_out.shape[1] * lstm_out.shape[2])
        x = self.fc1(x)
        return x

In [74]:
class DecoderLinear(nn.Module):
    def __init__(self, mid_layer_size,
                 input_size):
        super(DecoderLinear, self).__init__()
        self.fc1 = nn.Linear(mid_layer_size, input_size)
        
    def forward(self, x):
        x = x.float()
        x = self.fc1(x)
        x = x.view(x.shape[0], 1, x.shape[1])
        return x
        

In [75]:
class DecoderLSTM(nn.Module):
    def __init__(self, mid_layer_size, hidden_layer_size,
                 input_size):
        super(DecoderLSTM, self).__init__()
        self.lstm_1 = nn.LSTM(mid_layer_size, hidden_layer_size, 1)
        self.fc1 = nn.Linear(hidden_layer_size, input_size)
        
    def forward(self, x):
        #bp()
        x = x.float()
        lstm_out, (h,c) = self.lstm_1(x.reshape([x.shape[0],1,x.shape[1]]))
        x = lstm_out.view(lstm_out.shape[0], lstm_out.shape[1] * lstm_out.shape[2])
        x = self.fc1(x)
        x = x.reshape(x.shape[0], 1, x.shape[1])
        return x
        

In [104]:
x = torch.randn(1,1, window_length)
mid_layer_size = 3
hidden_layer_size = 7
encoder_test = EncoderLSTM(window_length, hidden_layer_size, mid_layer_size)
y = encoder_test(x)
#print(y.shape)
assert y.shape == torch.Size([1,mid_layer_size]), "bad shape, y: {}".format(y.shape)
print("shape is ok")

shape is ok


In [105]:
#x = torch.randn(1, 5)
decoder_test = DecoderLSTM(mid_layer_size, hidden_layer_size, window_length)
#print(y.shape)
y = decoder_test(y)
assert y.shape == torch.Size([1,1, window_length]), "bad shape, y: {}".format(y.shape)
print("shape is ok")

shape is ok


In [78]:
device = torch.device('cuda:0')

In [129]:
#encoder = EncoderLinear(window_length, mid_layer_size)
encoder = EncoderLSTM(window_length, hidden_layer_size, mid_layer_size)

encoder.to(device)

#decoder = DecoderLinear(mid_layer_size, window_length)
decoder = DecoderLSTM(mid_layer_size, hidden_layer_size, window_length)

decoder.to(device)


DecoderLSTM(
  (lstm_1): LSTM(3, 7)
  (fc1): Linear(in_features=7, out_features=10, bias=True)
)

In [130]:
criterion = nn.MSELoss()
parameters = list(encoder.parameters()) + list(decoder.parameters())
optimizer = optim.Adam(params=parameters,
                      lr=0.01)

In [131]:
print_interval = 30
for epoch in range(10):
    running_loss = 0.0
    for index, batch in enumerate(training_loader):
        #bp()
        batch = batch.float()
        batch = batch.to(device)
        optimizer.zero_grad()
        encoder_output = encoder(batch)
        decoder_output = decoder(encoder_output)
        
        loss = criterion(decoder_output, batch)
        loss.backward()
        
        optimizer.step()
        
        #print information
        
        running_loss += loss.item()
        if index % print_interval == (print_interval - 1):
            print("[{}, {}] loss: {}".format(epoch + 1,
                                            index + 1, 
                                            running_loss / print_interval))
            running_loss = 0.0
print("finished training")

[1, 30] loss: 4.320635851224264
[2, 30] loss: 3.9830880800882977
[3, 30] loss: 3.5230156819025678
[4, 30] loss: 3.4738654216130573
[5, 30] loss: 3.440229304631551
[6, 30] loss: 3.161561107635498
[7, 30] loss: 3.123292064666748
[8, 30] loss: 3.0456082979838053
[9, 30] loss: 2.944238305091858
[10, 30] loss: 3.095765781402588
[11, 30] loss: 3.041571346918742
[12, 30] loss: 2.93480589389801
[13, 30] loss: 2.987698193391164
[14, 30] loss: 3.005099892616272
[15, 30] loss: 2.995634587605794
[16, 30] loss: 2.968323580423991
[17, 30] loss: 2.9729693094889322
[18, 30] loss: 2.9375372330347695
[19, 30] loss: 2.906225339571635
[20, 30] loss: 2.937411340077718
[21, 30] loss: 2.964202968279521
[22, 30] loss: 2.97530517578125
[23, 30] loss: 2.887610101699829
[24, 30] loss: 2.8584443092346192
[25, 30] loss: 2.9433035333951314
[26, 30] loss: 2.909257499376933
[27, 30] loss: 2.943731673558553
[28, 30] loss: 2.932986768086751
[29, 30] loss: 2.9380579312642414
[30, 30] loss: 2.9389785051345827
[31, 30] lo

In [172]:
with torch.no_grad():
    for index, batch in enumerate(test_loader):
        #bp()
        batch = batch.float()
        batch = batch.to(device)
        encoder_output = encoder(batch)
        decoder_output = decoder(encoder_output)
        
        loss = criterion(decoder_output, batch)
        print(loss)
        
        batch_example = torch.zeros([2,5])
        batch_example[0,:] = batch[0][0][:5]
        batch_example[1,:] = decoder_output[0][0][:5]
        #batch_example = torch.log10(batch_example)
        print(batch_example)
        print(F.mse_loss(batch_example[0],batch_example[1]))
        if index >= 0:
            break
        

tensor(3.1851, device='cuda:0')
tensor([[-0.7074, -2.5824, -8.1353, -2.2885, -4.7862],
        [ 0.4031,  2.0561, -0.5560, -0.8107,  0.2442]])
tensor(21.5365)


In [161]:
with torch.no_grad():
    for index, batch in enumerate(training_loader):
        #bp()
        batch = batch.float()
        batch = batch.to(device)
        encoder_output = encoder(batch)
        decoder_output = decoder(encoder_output)
        
        loss = criterion(decoder_output, batch)
        print(loss)
        
        batch_example = torch.zeros([2,5])
        batch_example[0,:] = batch[0][0][:5]
        batch_example[1,:] = decoder_output[0][0][:5]
        #batch_example = torch.log10(batch_example)
        print(batch_example)
        print(F.mse_loss(batch_example[0],batch_example[1]))
        if index >= 0:
            break
        

tensor(2.0343, device='cuda:0')
tensor([[-3.1861, -0.1732,  0.9832, -2.2337, -1.2302],
        [-0.1516, -0.9208,  0.9052,  0.1902, -2.0160]])
tensor(3.2532)
