# Verschiende (absolut unnötige!) Experimente mit MLPs


In [None]:
# now build a pytorch pipeline to predict sales based on only weather data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

# define a dataset class
class SalesDataset(Dataset):
    def __init__(self, df):
        self.df = df
        self.X = df[["temperature_mean", "temperature_min", "temperature_max", "precipitation", "sunshine_duration"]].values
        self.y = df["Getränke_sales"].values.reshape(-1, 1)
       
        # normalize the data
        self.X = (self.X - self.X.mean(axis=0)) / self.X.std(axis=0)
        mu = self.y.mean(axis=0)
        sigma = self.y.std(axis=0)
        self.y = (self.y - mu) / sigma
        print("Mean: {}, Std: {}".format(mu, sigma))

        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# split the data into train and test set
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# create the dataset objects with normalized data
print("Train data")
train_dataset = SalesDataset(train_df)
print("Test data")
test_dataset = SalesDataset(test_df)

# create the dataloader objects
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

# define the model parameters
input_size = 5
hidden_size = 8
output_size = 1

# define the model
class SalesModel(nn.Module):
    def __init__(self):
        super(SalesModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

# create the model object
model = SalesModel()

# define the loss function and the optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)

# train the model and visualize the loss
epochs = 100
train_losses = []
test_losses = []
best_loss = np.inf

for epoch in range(epochs):
    train_loss = 0.0
    test_loss = 0.0
    model.train()
    for X, y in train_loader:
        optimizer.zero_grad()
        y_pred = model(X.float())
        loss = criterion(y_pred, y.float())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)
    train_losses.append(train_loss)
    
    # test the model
    model.eval()
    with torch.no_grad():
        for X, y in test_loader:
            y_pred = model(X.float())
            loss = criterion(y_pred, y.float())
            test_loss += loss.item()
        test_loss /= len(test_loader)
        test_losses.append(test_loss)
    
    if test_loss < best_loss:
        best_loss = test_loss
        print("Best loss: {:.2f}".format(best_loss))
    
    if epoch % 10 == 0:
        print("Epoch: {}, Train Loss: {:.2f}, Test Loss: {:.2f}".format(epoch, train_loss, test_loss))
    
    lr_scheduler.step()
    
   
    
print("Best loss: {:.2f}".format(best_loss))
# plot the loss
plt.plot(train_losses, label="Train Loss")
plt.plot(test_losses, label="Test Loss")
plt.legend()
plt.show()
model1 = model

In [None]:
# define a seconde dataset class to predict sales with the day of the week
class SalesDataset2(Dataset):
    def __init__(self, df):
        self.df = df 
        self.X = {"weather": df[["temperature_mean", "temperature_min", "temperature_max", "precipitation", "sunshine_duration"]].values, 
                    "date": pd.get_dummies(df["day_of_week"]).values}
        self.y = df["Getränke_sales"].values.reshape(-1, 1)
       
        # normalize the data (X is a dictionary, so we need to normalize each column separately
        self.X["weather"] = (self.X["weather"] - self.X["weather"].mean(axis=0)) / self.X["weather"].std(axis=0)
        mu = self.y.mean(axis=0)
        sigma = self.y.std(axis=0)
        self.y = (self.y - mu) / sigma
        print("Mean: {}, Std: {}".format(mu, sigma))

        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        return self.X["weather"][idx], self.X["date"][idx], self.y[idx]

# split the data into train and test set
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# create the dataset objects with normalized data
print("Train data")
train_dataset = SalesDataset2(train_df)
print("Test data")
test_dataset = SalesDataset2(test_df)

# create the dataloader objects
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

# define the model parameters
input_size_weather = 5
input_size_date = 7
hidden_size = 8
output_size = 1

# define the model
class SalesModel2(nn.Module):
    def __init__(self):
        super(SalesModel2, self).__init__()
        self.fc1 = nn.Linear(input_size_weather, hidden_size)
        self.fc2 = nn.Linear(input_size_date, hidden_size)
        self.fc3 = nn.Linear(hidden_size*2,  hidden_size)
        self.fc4 = nn.Linear(hidden_size, hidden_size)
        self.fc5 = nn.Linear(hidden_size, output_size)
        
        
        
    def forward(self, x_weather, x_date):
        x_weather = F.relu(self.fc1(x_weather))
        x_date = F.relu(self.fc2(x_date))
        x = torch.cat((x_weather, x_date), dim=1)
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        
        return x

# create the model object
model = SalesModel2()

# define the loss function and the optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)

# train the model and visualize the loss
epochs = 100
train_losses = []
test_losses = []
best_loss = np.inf

for epoch in range(epochs):
    train_loss = 0.0
    test_loss = 0.0
    model.train()
    for X_weather, X_date, y in train_loader:
        optimizer.zero_grad()
        y_pred = model(X_weather.float(), X_date.float())
        loss = criterion(y_pred, y.float())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)
    train_losses.append(train_loss)
    
    model.eval()
    with torch.no_grad():
        for X_weather, X_date, y in test_loader:
            y_pred = model(X_weather.float(), X_date.float())
            loss = criterion(y_pred, y.float())
            test_loss += loss.item()
        test_loss /= len(test_loader)
        test_losses.append(test_loss)

    if test_loss < best_loss:
        best_loss = test_loss
        print("Best loss: {:.2f}".format(best_loss))
    
    if epoch % 10 == 0:
        print("Epoch: {}, Train Loss: {:.2f}, Test Loss: {:.2f}".format(epoch, train_loss, test_loss))
    
    lr_scheduler.step()

print("Best loss: {:.2f}".format(best_loss))
# plot the loss
plt.plot(train_losses, label="Train Loss")
plt.plot(test_losses, label="Test Loss")
plt.legend()
plt.show()
model2 = model

In [None]:
# define a third dataset class to predict sales with only the day of the week
class SalesDataset3(Dataset):
    def __init__(self, df):
        self.df = df 
        self.X = pd.get_dummies(df["day_of_week"]).values
        self.y = df["Getränke_sales"].values.reshape(-1, 1)
       
        # normalize the data
        mu = self.y.mean(axis=0)
        sigma = self.y.std(axis=0)
        self.y = (self.y - mu) / sigma
        print("Mean: {}, Std: {}".format(mu, sigma))

        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# split the data into train and test set
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# create the dataset objects with normalized data
print("Train data")
train_dataset = SalesDataset3(train_df)
print("Test data")
test_dataset = SalesDataset3(test_df)

# create the dataloader objects
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

# define the model parameters
input_size = 7
hidden_size = 32
output_size = 1

# define the model
class SalesModel3(nn.Module):
    def __init__(self):
        super(SalesModel3, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

# create the model object
model = SalesModel3()

# define the loss function and the optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)

# train the model and visualize the loss
epochs = 100
train_losses = []
test_losses = []
best_loss = np.inf

for epoch in range(epochs):
    train_loss = 0.0
    test_loss = 0.0
    model.train()
    for X, y in train_loader:
        optimizer.zero_grad()
        y_pred = model(X.float())
        loss = criterion(y_pred, y.float())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)
    train_losses.append(train_loss)
    
    model.eval()
    with torch.no_grad():
        for X, y in test_loader:
            y_pred = model(X.float())
            loss = criterion(y_pred, y.float())
            test_loss += loss.item()
        test_loss /= len(test_loader)
        test_losses.append(test_loss)

    if test_loss < best_loss:
        best_loss = test_loss
        print("Best loss: {:.2f}".format(best_loss))
    
    if epoch % 10 == 0:
        print("Epoch: {}, Train Loss: {:.2f}, Test Loss: {:.2f}".format(epoch, train_loss, test_loss))
    
    lr_scheduler.step()

print("Best loss: {:.2f}".format(best_loss))
# plot the loss
plt.plot(train_losses, label="Train Loss")
plt.plot(test_losses, label="Test Loss")
plt.legend()
plt.show()
model3 = model

In [None]:
dataset = SalesDataset2(df)
day = 144+7
print(dataset.X["date"][day])
print(dataset.X["weather"][day])
print(dataset.y[day])
print(df[day:day+1])

In [None]:
# test manually to see if the day of the week makes a difference in the prediction


pred1 = model1(torch.tensor([1.59759539,  1.34599214,  1.56808058, -0.40700749,  1.53056742]).float().unsqueeze(0))
pred2 = model2(torch.tensor([1.59759539,  1.34599214,  1.56808058, -0.40700749,  1.53056742]).float().unsqueeze(0), torch.tensor([0, 0, 0, 0, 0, 0, 1]).float().unsqueeze(0))
pred3 = model3(torch.tensor([0, 0, 0, 0, 0, 0, 1]).float().unsqueeze(0))
print(pred1)
print(pred2)
print(pred3)
# un-normalize the data
mu = 949.91726592
sigma = 404.25934273
pred1 = pred1.item() * sigma + mu
pred2 = pred2.item() * sigma + mu
pred3 = pred3.item() * sigma + mu

print("Prediction 1: {:.2f}".format(pred1))
print("Prediction 2: {:.2f}".format(pred2))
print("Prediction 3: {:.2f}".format(pred3))
