In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
import numpy as np

In [2]:
f_l = ["datasets/ext_1.csv", "datasets/ext_2.csv", "datasets/ext_3.csv", "datasets/ext_4.csv", "datasets/ext_5.csv"]
timeseries = []
for f in f_l:
    # Read the CSV file
    df = pd.read_csv(f)

    # Convert the "Date and time" column to datetime
    df['Date and time'] = pd.to_datetime(df['Date and time'])

    # Extract the date and time components
    df['Date'] = df['Date and time'].dt.date

    # Group by the date
    grouped = df.groupby('Date')

    # Prepare a list to collect valid days data
    valid_days = []

    # Iterate over each group
    for date, group in grouped:
        if len(group) == 96:
            valid_days.append(group['External temperature (Celsius degree)'].values)

    # Convert the list of valid days to a numpy array (tensor)
    timeseries.append(torch.tensor(np.array(valid_days)))

In [5]:
all_timeseries = torch.cat(timeseries, dim=0)

In [6]:
all_timeseries.shape

torch.Size([1521, 96])

In [8]:
daily_avg_temp = torch.mean(all_timeseries, dim=1, keepdim=True)

In [9]:
daily_avg_temp.shape

torch.Size([1521, 1])

In [10]:
# Hyperparameters
noise_dim = 100  # Latent vector size
hidden_dim = 256  # Size of hidden layers
output_dim = 96  # Number of 15-minute intervals in a day

# Learning rates and training parameters
lr = 1e-3
batch_size = 32
num_epochs = 100

In [11]:
dataset = TensorDataset(daily_avg_temp, all_timeseries)

train_size = int(0.95 * len(dataset))
val_size = len(dataset) - train_size

# Split the dataset
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create DataLoaders for training and validation sets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class Generator(nn.Module):
    def __init__(self, noise_dim, hidden_dim, output_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(noise_dim + 1, hidden_dim),  # noise_dim + 1 for avg_temp
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)  # Output 96 values
        )
    
    def forward(self, noise, avg_temp):
        avg_temp = avg_temp.view(-1, 1)  # Ensure avg_temp is the right shape
        x = torch.cat((noise, avg_temp), dim=1)
        return self.model(x)

class Discriminator(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim + 1, hidden_dim),  # input_dim + 1 for avg_temp
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, output_dim),
            nn.Sigmoid()  # Output probability
        )
    
    def forward(self, timeseries, avg_temp):
        avg_temp = avg_temp.view(-1, 1)  # Ensure avg_temp is the right shape
        x = torch.cat((timeseries, avg_temp), dim=1)
        return self.model(x)

In [None]:
# Initialize models
generator = Generator(noise_dim, hidden_dim, output_dim)
discriminator = Discriminator(output_dim, hidden_dim, 1)

# Optimizers
optimizer_g = optim.Adam(generator.parameters(), lr=lr)
optimizer_d = optim.Adam(discriminator.parameters(), lr=lr)

# Loss function
criterion = nn.BCELoss()

In [None]:
# Training loop
for epoch in range(num_epochs):
    for i, (avg_temp, real_timeseries) in enumerate(train_loader):
        
        batch_size = avg_temp.size(0)
        
        # Labels
        real_labels = torch.ones(batch_size, 1)
        fake_labels = torch.zeros(batch_size, 1)
        
        # Train Discriminator
        optimizer_d.zero_grad()
        
        # Real timeseries
        real_output = discriminator(real_timeseries, avg_temp)
        d_loss_real = criterion(real_output, real_labels)
        
        # Fake timeseries
        noise = torch.randn(batch_size, noise_dim)
        fake_timeseries = generator(noise, avg_temp)
        fake_output = discriminator(fake_timeseries, avg_temp)
        d_loss_fake = criterion(fake_output, fake_labels)
        
        # Total discriminator loss
        d_loss = d_loss_real + d_loss_fake
        d_loss.backward()
        optimizer_d.step()
        
        # Train Generator
        optimizer_g.zero_grad()
        
        noise = torch.randn(batch_size, noise_dim)
        fake_timeseries = generator(noise, avg_temp)
        fake_output = discriminator(fake_timeseries, avg_temp)
        
        # Generator loss
        g_loss = criterion(fake_output, real_labels)
        g_loss.backward()
        optimizer_g.step()
        
    print(f'Epoch [{epoch+1}/{num_epochs}], d_loss: {d_loss.item()}, g_loss: {g_loss.item()}')



In [None]:
# Validation phase
    generator.eval()
    discriminator.eval()
    val_d_loss = 0
    val_g_loss = 0
    with torch.no_grad():
        for avg_temp, real_timeseries in val_loader:
            batch_size = avg_temp.size(0)
            real_labels = torch.ones(batch_size, 1)
            fake_labels = torch.zeros(batch_size, 1)
            
            # Real timeseries
            real_output = discriminator(real_timeseries, avg_temp)
            val_d_loss_real = criterion(real_output, real_labels)
            
            # Fake timeseries
            noise = torch.randn(batch_size, noise_dim)
            fake_timeseries = generator(noise, avg_temp)
            fake_output = discriminator(fake_timeseries, avg_temp)
            val_d_loss_fake = criterion(fake_output, fake_labels)
            
            # Total discriminator loss
            val_d_loss += val_d_loss_real + val_d_loss_fake
            
            # Generator loss
            fake_output = discriminator(fake_timeseries, avg_temp)
            val_g_loss += criterion(fake_output, real_labels)
        
        # Average losses
        val_d_loss /= len(val_loader)
        val_g_loss /= len(val_loader)
    
    print(f'Validation d_loss: {val_d_loss.item()}, g_loss: {val_g_loss.item()}')

In [None]:
# Generate new timeseries
generator.eval()
with torch.no_grad():
    noise = torch.randn(1, noise_dim)
    avg_temp = torch.tensor([[your_avg_temp]])
    generated_timeseries = generator(noise, avg_temp)
    print(generated_timeseries)