In [25]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error



In [43]:
data = pd.read_pickle("data/ready_dataset.pickle")
print(len(data.merged[0]))
test_data = data.iloc[:40]
train_data = data.iloc[40:]

1664


In [30]:
class MyDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = torch.tensor(np.array(data["merged"][idx], dtype=np.float32), dtype=torch.float32)       
        y = torch.tensor(data["target_value"][idx][0], dtype=torch.float32)
        return x, y


In [31]:
my_dataset = MyDataset(train_data)

In [32]:
my_dataset.__getitem__(0)

(tensor([-0.0003, -0.0003, -0.0002,  ...,  0.0000,  0.0000,  0.0000]),
 tensor(31.9400))

In [67]:

class MV_LSTM(torch.nn.Module):
    def __init__(self,n_features,seq_length):
        super(MV_LSTM, self).__init__()
        self.n_features = n_features
        self.seq_len = seq_length
        self.n_hidden = 20 # number of hidden states
        self.n_layers = 1 # number of LSTM layers (stacked)
    
        self.l_lstm = torch.nn.LSTM(input_size = n_features, 
                                 hidden_size = self.n_hidden,
                                 num_layers = self.n_layers, 
                                 batch_first = True)
        # according to pytorch docs LSTM output is 
        # (batch_size,seq_len, num_directions * hidden_size)
        # when considering batch_first = True
        self.l_linear = torch.nn.Linear(n_features, 1)
        
    
    def init_hidden(self, batch_size):
        # even with batch_first = True this remains same as docs
        hidden_state = torch.zeros(self.n_layers,self.n_hidden)
        cell_state = torch.zeros(self.n_layers,self.n_hidden)
        self.hidden = (hidden_state, cell_state)
    
    
    def forward(self, x):
        self.init_hidden(1)
        batch_size, seq_len = x.size()
        lstm_out, self.hidden = self.l_lstm(x,self.hidden)
        # lstm_out(with batch_first = True) is 
        # (batch_size,seq_len,num_directions * hidden_size)
        # for following linear layer we want to keep batch_size dimension and merge rest       
        # .contiguous() -> solves tensor compatibility error
        #x = lstm_out.contiguous().view(batch_size,-1)
        return self.l_linear(x)


# konwersja danych na tensor i załadowanie do DataLoader


def collate_fn(batch):
    x = [item[0] for item in batch]
    y = [item[1] for item in batch]
    x = torch.nn.utils.rnn.pad_sequence(x, batch_first=True)
    y = torch.stack(y)
    return x, y

def collate_fn_lstm(batch):
    """
    Funkcja collate dla sekwencji wejściowych dla sieci LSTM.
    """
    # Sortujemy batch względem długości sekwencji wejściowych
    batch = sorted(batch, key=lambda x: x[0].shape[0], reverse=True)
    
    # Tworzymy mini-batche z wyrównaniem długości sekwencji wejściowych
    inputs = [item[0] for item in batch]
    inputs = np.array(inputs, dtype=np.float32)
    inputs = torch.from_numpy(inputs)
    # Tworzymy mini-batche z etykietami
    targets = [item[1] for item in batch]
    targets = torch.FloatTensor(targets)
    #targets = torch.nn.utils.rnn.pad_sequence(targets, batch_first=True)
    #print(inputs)
    return inputs, targets


batch_size = 10
my_dataloader = DataLoader(my_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn_lstm)

# inicjalizacja modelu i uruchomienie treningu
model = MV_LSTM(n_features=1664, seq_length=4)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10000):
    for batch_x, batch_y in my_dataloader:
        y_pred = model(batch_x)
        loss = criterion(y_pred, batch_y)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()


  inputs = np.array(inputs, dtype=np.float32)


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (10,) + inhomogeneous part.

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # Inicjalizacja stanu ukrytego LSTM
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Przekazanie danych wejściowych przez warstwę LSTM
        out, _ = self.lstm(x, (h0, c0))
        
        # Przekazanie ostatniego kroku czasowego przez warstwę Fully Connected
        out = self.fc(out[:, -1, :])
        return out

# Parametry modelu
input_size = 1600  # Liczba parametrów w jednym kroku czasowym
hidden_size = 64  # Liczba jednostek w warstwie ukrytej LSTM
num_layers = 2  # Liczba warstw LSTM
output_size = 1  # Liczba wyjściowych neuronów w warstwie Fully Connected

# Inicjalizacja modelu
model = LSTMModel(input_size, hidden_size, num_layers, output_size)

# Wydrukowanie architektury modelu
print(model)

In [59]:
test_dataset = MyDataset(test_data)

In [61]:
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=False)

y_true = []
y_pred = []
model.eval()
with torch.no_grad():
    for batch_x, batch_y in test_dataloader:
        #print(model(batch_x))
        y_true += batch_y.tolist()
        y_pred += model(batch_x).tolist()

mse = mean_squared_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
mae = mean_absolute_error(y_true, y_pred)

print(f"MSE: {mse:.4f}")
print(f"R^2: {r2:.4f}")
print(f"MAE: {mae:.4f}")
print(y_pred[5], y_true[5])

MSE: 154549.7966
R^2: -9249360.6296
MAE: 89.8794
[-45.83506393432617] 32.01499938964844


In [69]:
class ConvLSTMCell(torch.nn.Module):

    def __init__(self, input_dim, hidden_dim, kernel_size, bias):
        """
        Initialize ConvLSTM cell.
        Parameters
        ----------
        input_dim: int
            Number of channels of input tensor.
        hidden_dim: int
            Number of channels of hidden state.
        kernel_size: (int, int)
            Size of the convolutional kernel.
        bias: bool
            Whether or not to add the bias.
        """

        super(ConvLSTMCell, self).__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim

        self.kernel_size = kernel_size
        self.padding = kernel_size[0] // 2, kernel_size[1] // 2
        self.bias = bias

        self.conv = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
                              out_channels=4 * self.hidden_dim,
                              kernel_size=self.kernel_size,
                              padding=self.padding,
                              bias=self.bias)

    def forward(self, input_tensor, cur_state):
        h_cur, c_cur = cur_state

        combined = torch.cat([input_tensor, h_cur], dim=1)  # concatenate along channel axis

        combined_conv = self.conv(combined)
        cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)
        i = torch.sigmoid(cc_i)
        f = torch.sigmoid(cc_f)
        o = torch.sigmoid(cc_o)
        g = torch.tanh(cc_g)

        c_next = f * c_cur + i * g
        h_next = o * torch.tanh(c_next)

        return h_next, c_next

    def init_hidden(self, batch_size, image_size):
        height, width = image_size
        return (torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device),
                torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device))


class ConvLSTM(torch.nn.Module):

    """
    Parameters:
        input_dim: Number of channels in input
        hidden_dim: Number of hidden channels
        kernel_size: Size of kernel in convolutions
        num_layers: Number of LSTM layers stacked on each other
        batch_first: Whether or not dimension 0 is the batch or not
        bias: Bias or no bias in Convolution
        return_all_layers: Return the list of computations for all layers
        Note: Will do same padding.
    Input:
        A tensor of size B, T, C, H, W or T, B, C, H, W
    Output:
        A tuple of two lists of length num_layers (or length 1 if return_all_layers is False).
            0 - layer_output_list is the list of lists of length T of each output
            1 - last_state_list is the list of last states
                    each element of the list is a tuple (h, c) for hidden state and memory
    Example:
        >> x = torch.rand((32, 10, 64, 128, 128))
        >> convlstm = ConvLSTM(64, 16, 3, 1, True, True, False)
        >> _, last_states = convlstm(x)
        >> h = last_states[0][0]  # 0 for layer index, 0 for h index
    """

    def __init__(self, input_dim, hidden_dim, kernel_size, num_layers,
                 batch_first=False, bias=True, return_all_layers=False):
        super(ConvLSTM, self).__init__()

        self._check_kernel_size_consistency(kernel_size)

        # Make sure that both `kernel_size` and `hidden_dim` are lists having len == num_layers
        kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
        hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
        if not len(kernel_size) == len(hidden_dim) == num_layers:
            raise ValueError('Inconsistent list length.')

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.kernel_size = kernel_size
        self.num_layers = num_layers
        self.batch_first = batch_first
        self.bias = bias
        self.return_all_layers = return_all_layers

        cell_list = []
        for i in range(0, self.num_layers):
            cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]

            cell_list.append(ConvLSTMCell(input_dim=cur_input_dim,
                                          hidden_dim=self.hidden_dim[i],
                                          kernel_size=self.kernel_size[i],
                                          bias=self.bias))

        self.cell_list = nn.ModuleList(cell_list)

    def forward(self, input_tensor, hidden_state=None):
        """
        Parameters
        ----------
        input_tensor: todo
            5-D Tensor either of shape (t, b, c, h, w) or (b, t, c, h, w)
        hidden_state: todo
            None. todo implement stateful
        Returns
        -------
        last_state_list, layer_output
        """
        if not self.batch_first:
            # (t, b, c, h, w) -> (b, t, c, h, w)
            input_tensor = input_tensor.permute(1, 0, 2, 3, 4)

        b, _, _, h, w = input_tensor.size()

        # Implement stateful ConvLSTM
        if hidden_state is not None:
            raise NotImplementedError()
        else:
            # Since the init is done in forward. Can send image size here
            hidden_state = self._init_hidden(batch_size=b,
                                             image_size=(h, w))

        layer_output_list = []
        last_state_list = []

        seq_len = input_tensor.size(1)
        cur_layer_input = input_tensor

        for layer_idx in range(self.num_layers):

            h, c = hidden_state[layer_idx]
            output_inner = []
            for t in range(seq_len):
                h, c = self.cell_list[layer_idx](input_tensor=cur_layer_input[:, t, :, :, :],
                                                 cur_state=[h, c])
                output_inner.append(h)

            layer_output = torch.stack(output_inner, dim=1)
            cur_layer_input = layer_output

            layer_output_list.append(layer_output)
            last_state_list.append([h, c])

        if not self.return_all_layers:
            layer_output_list = layer_output_list[-1:]
            last_state_list = last_state_list[-1:]

        return layer_output_list, last_state_list

    def _init_hidden(self, batch_size, image_size):
        init_states = []
        for i in range(self.num_layers):
            init_states.append(self.cell_list[i].init_hidden(batch_size, image_size))
        return init_states

    @staticmethod
    def _check_kernel_size_consistency(kernel_size):
        if not (isinstance(kernel_size, tuple) or
                (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
            raise ValueError('`kernel_size` must be tuple or list of tuples')

    @staticmethod
    def _extend_for_multilayer(param, num_layers):
        if not isinstance(param, list):
            param = [param] * num_layers
        return param

In [61]:
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=False)

y_true = []
y_pred = []
model.eval()
with torch.no_grad():
    for batch_x, batch_y in test_dataloader:
        #print(model(batch_x))
        y_true += batch_y.tolist()
        y_pred += model(batch_x).tolist()

mse = mean_squared_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
mae = mean_absolute_error(y_true, y_pred)

print(f"MSE: {mse:.4f}")
print(f"R^2: {r2:.4f}")
print(f"MAE: {mae:.4f}")
print(y_pred[5], y_true[5])

MSE: 154549.7966
R^2: -9249360.6296
MAE: 89.8794
[-45.83506393432617] 32.01499938964844
