In [18]:
import os
from tqdm.notebook import tqdm
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler

import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger

from collections import defaultdict



tqdm.pandas() 



In [19]:
geo_data = ['latitude_degree','longitude_degree']

independent_signals = ['accelerator_pedal','accelerator_pedal_gradient_sign','brake_pressure','steering_angle_calculated','steering_angle_calculated_sign']
dependent_signals = ['vehicle_speed','roll_angle','pitch_angle']


Spliting Data

In [28]:
class DataPreprocessing():
    def __init__(self, directory, independent_signals, dependent_signals, sequence_length, train_size=.9, do_train_test_split = True):
        
      self.directory = directory
      self.do_train_test_split = do_train_test_split
      self.list_of_path = DataPreprocessing.list_csv_files(directory)
      self.train_size = train_size
      self.independent_signals = independent_signals
      self.dependent_signals = dependent_signals
      self.sequence_length = sequence_length

      self.train_df_batch = []
      self.test_df_batch = []
      self.dataset_batch = []
      self.train_sequences = []
      self.test_sequences = []

      self.scaler = MinMaxScaler(feature_range=(-1,1))
    
    @staticmethod
    def split_dataset(data, train_size):
      train_size = int(len(data)* train_size)
      train_df, test_df = data[:train_size], data[train_size + 1:]
      return train_df, test_df  

    @staticmethod
    def list_csv_files(directory):
      csv_files = []
      for filename in os.listdir(directory):
        if filename.endswith(".csv"):
          csv_files.append(filename)
      return csv_files    
    
    @staticmethod
    def load_data_from_a_specific_path(file_path):
      dataset = pd.read_csv(file_path)
      dataset.timestamp = pd.to_datetime(dataset.timestamp)
      dataset = dataset.set_index('timestamp')
      return dataset
    
    
    def scaling_fit(self):
      self.scaler.fit(pd.concat(self.train_df_batch))

    def scaling_transform(self, dataset):
      return pd.DataFrame(self.scaler.transform(dataset),
                          index=dataset.index,
                          columns=dataset.columns)
    
    def data_loader(self, file_path):

      path = '{}{}'.format(self.directory, file_path) 
      print(path)
      dataset = pd.read_csv(path)
      dataset.timestamp = pd.to_datetime(dataset.timestamp)
      dataset = dataset.set_index('timestamp')
      return dataset
    
    def fit_transform(self):  
      for each_file_path in self.list_of_path:
        # Loading data from each file
        dataset = self.data_loader(each_file_path)

        # Calculate min and max value of all features this will be used for scaling
        self.dataset_batch.append(dataset)
        
        train_df, test_df = DataPreprocessing.split_dataset(dataset, self.train_size)
        self.train_df_batch.append(train_df)
        self.test_df_batch.append(test_df)

      # Scaling
      self.scaling_fit()
      
      # Scaling transform
      for batch_idx in range(len(self.train_df_batch)):
        self.train_df_batch[batch_idx] = (self.scaling_transform(self.train_df_batch[batch_idx]))
        self.test_df_batch[batch_idx] = self.scaling_transform(self.test_df_batch[batch_idx])


      # For train data
      self.create_sequences(self.sequence_length)
      # For test data
      self.create_sequences(self.sequence_length, get_sequences_for_train = False)


    def transform(self, path):
        dataset = DataPreprocessing.load_data_from_a_specific_path(path)
        self.datset_sequences = []
        scaled_dataset = (self.scaling_transform(dataset))
        self.create_sequences_for_a_batch(scaled_dataset, self.sequence_length, None)
        return self.datset_sequences


    def create_sequences_for_a_batch(self, input_data, sequence_length, get_sequences_for_train):

      data_size = len(input_data)

      for i in range(data_size - sequence_length):

        sequence = input_data[i: i+sequence_length][self.independent_signals]

        if get_sequences_for_train: 
          label = input_data[i: i + sequence_length][self.dependent_signals]
          self.train_sequences.append((sequence, label))

        elif get_sequences_for_train ==None:
          self.datset_sequences.append((sequence, None)) 

        else:
          label = input_data[i: i + sequence_length][self.dependent_signals]
          self.test_sequences.append((sequence,label))  


    def create_sequences(self, sequence_length, get_sequences_for_train=True):

      if get_sequences_for_train:

        for each_batch in self.train_df_batch:
          self.create_sequences_for_a_batch(each_batch, sequence_length, get_sequences_for_train)

      else:

        for each_batch in self.test_df_batch:
          self.create_sequences_for_a_batch(each_batch, sequence_length, get_sequences_for_train)  
        

    # Getter method
    @property
    def get_train_df(self):
      return self.train_df_batch

    @property
    def get_test_df(self):
      return self.test_df_batch 
    
    @property
    def get_sequences_train(self):
      return self.train_sequences
    
    @property
    def get_sequences_test(self):
      return self.test_sequences  
    

class TorchDatasetTS(Dataset):

  def __init__(self, sequences):
    #super().__init__()
    self.sequences = sequences

  def __len__(self):
    return len(self.sequences)
  
  def __getitem__(self, idx):
    sequence, label = self.sequences[idx]
    
    return dict(
      sequence= torch.Tensor(sequence.to_numpy()),
      label = torch.Tensor(label.to_numpy()).reshape(-1,1)
    )
  

class TorchDataModule(pl.LightningDataModule):

  def __init__(self, train_sequences, test_sequences, batch_size):

    super().__init__()
    self.train_sequences = train_sequences
    self.test_sequences = test_sequences
    self.batch_size = batch_size

  def setup(self):
    self.train_datset = TorchDatasetTS(self.train_sequences)  
    self.test_datset = TorchDatasetTS(self.test_sequences)  

  def train_dataloader(self):
    return DataLoader(
      self.train_datset,
      batch_size = self.batch_size,
      shuffle = False
    )  
  
  def val_dataloader(self):   # TODO make seprate data for validation
    return DataLoader(
      self.test_datset,
      batch_size = 1,
      shuffle = False
    )  
  
  def test_dataloader(self):
    return DataLoader(
      self.test_datset,
      batch_size = 1,
      shuffle = False
    )  


In [29]:
sequence_length = 10
data_prep = DataPreprocessing('datasets/', independent_signals, dependent_signals, sequence_length)
data_prep.fit_transform()

train_sequence = data_prep.get_sequences_train
test_sequence = data_prep.get_sequences_test

datasets/resampled_20180810150607.csv
datasets/resampled_20190401121727.csv
datasets/resampled_20190401145936.csv


In [30]:
train_sequence[0][0]

Unnamed: 0_level_0,accelerator_pedal,accelerator_pedal_gradient_sign,brake_pressure,steering_angle_calculated,steering_angle_calculated_sign
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2018-08-10 13:06:54,0.53665,-0.391304,-0.996302,-0.874614,-1.0
2018-08-10 13:06:55,0.512533,-0.92,-0.997506,-0.958218,-1.0
2018-08-10 13:06:56,0.210929,0.36,-0.996674,-0.954475,-1.0
2018-08-10 13:06:57,-0.400822,0.4,-0.996683,-0.984006,-0.68
2018-08-10 13:06:58,-0.731603,-0.64,-0.997173,-0.942241,1.0
2018-08-10 13:06:59,-0.774321,-1.0,-0.996591,-0.852417,1.0
2018-08-10 13:07:00,-0.774321,-0.96,-0.996258,-0.916477,0.7
2018-08-10 13:07:01,-0.976465,-0.64,-0.996912,-0.902894,-1.0
2018-08-10 13:07:02,-1.0,-1.0,-0.996591,-0.928943,-1.0
2018-08-10 13:07:03,-1.0,-1.0,-0.810177,-0.997826,-1.0


In [31]:
class LSTMModel(nn.Module):

    def __init__(self, n_features, sequence_length, number_of_dependent_signals, n_hidden=128, n_layers = 2):
        super().__init__()

        self.n_hidden = n_hidden
        self.sequence_length = sequence_length
        self.number_of_dependent_signals = number_of_dependent_signals

        self.lstm = nn.LSTM(
            input_size=n_features,
            hidden_size=n_hidden,
            batch_first=True,
            num_layers=  n_layers,
            dropout=0.2
            
        )

        self.regressor = nn.Linear(n_hidden, sequence_length * number_of_dependent_signals)

    def forward(self, x):
        self.lstm.flatten_prameters()  # Even if we don't use flatten prameter the code will work. But it helps in distributed training of GPU

        _, (hidden, _) = self.lstm(x)
        out = hidden[-1]

        return self.regressor(out)
    


In [32]:
class Predictor(pl.LightningModule):
    def __init__(self, n_features, sequence_length, number_of_dependent_signals):
        super().__init__
        self.model = LSTMModel(n_features, sequence_length, number_of_dependent_signals)
        self.criterion = nn.MSELoss()

    def forward(self, x, labels =None):
        output = self.model(x)
        loss = 0
        if labels is not None:
            loss = self.criterion(output, labels.unsqueeze(dim=1))
        return loss, output   

    def training_step(self, batch, batch_idx):
        sequences = batch['sequence']
        labels = batch['label']

        loss, output = self(sequences, labels)
        self.log('train_loss', loss, prog_bar=True, logger=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        sequences = batch['sequence']
        labels = batch['label']

        loss, output = self(sequences, labels) #
        self.log('val_loss', loss, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        sequences = batch['sequence']
        labels = batch['label']

        loss, output = self(sequences, labels)
        self.log('test_loss', loss, prog_bar=True, logger=True)
        return loss
    
    def configure_optimizers(self):
        return optim.AdamW(self.parameters(), lr = 0.0001)





In [34]:
N_EPOCHS = 100
BATCH_SIZE = 64


data_module = TorchDataModule(train_sequence, test_sequence, batch_size = BATCH_SIZE)
data_module.setup()

In [35]:
for item in data_module.train_dataloader(): 
    print(item['sequence'].shape)
    print(item['label'].shape)
    break

    

torch.Size([64, 10, 5])
torch.Size([64, 30, 1])
