In [6]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer

# Load the Titanic dataset
data = pd.read_csv("../data/train.csv")

# Preprocess the data
data = data.drop(columns=["Name", "Ticket", "Cabin", "PassengerId", "Sex"])
data = data.dropna()
data["Embarked"] = data["Embarked"].astype("category")
cat_cols = [col for col in data.columns if data[col].dtype.name == "category"]
enc = OneHotEncoder(sparse=False)
cat_data = enc.fit_transform(data[cat_cols])
data = data.drop(columns=cat_cols)
data = np.concatenate((data.values, cat_data), axis=1)



In [7]:
# Impute missing values
imputer = SimpleImputer(strategy="median")
data = imputer.fit_transform(data)

In [9]:
# Normalize the data
data = (data - np.mean(data, axis=0)) / np.std(data, axis=0)

In [10]:
# Define the generator
class Generator(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Generator, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 128)
        self.fc3 = nn.Linear(128, output_dim)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, z, c):
        x = torch.cat((z, c), dim=1)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        return x

In [11]:
# Define the discriminator
class Discriminator(nn.Module):
    def __init__(self, input_dim):
        super(Discriminator, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.dropout1 = nn.Dropout(0.3)
        self.fc2 = nn.Linear(128, 64)
        self.dropout2 = nn.Dropout(0.3)
        self.fc3 = nn.Linear(64, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout1(x)
        x = self.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.sigmoid(self.fc3(x))
        return x

In [12]:
# Define the GAN model
class GAN(nn.Module):
    def __init__(self, generator, discriminator):
        super(GAN, self).__init__()
        self.generator = generator
        self.discriminator = discriminator
        
    def forward(self, z, c):
        x = self.generator(z, c)
        return self.discriminator(x)

# Define the loss functions
bce_loss = nn.BCELoss()

In [13]:
def generator_loss(fake_output):
    return bce_loss(fake_output, torch.ones_like(fake_output))

def discriminator_loss(real_output, fake_output):
    real_loss = bce_loss(real_output, torch.ones_like(real_output))
    fake_loss = bce_loss(fake_output, torch.zeros_like(fake_output))
    total_loss = real_loss + fake_loss
    return total_loss

In [15]:
# # Define the optimizer
# lr = 1e-4
# beta1 = 0.5
# beta2 = 0.999
# optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(beta1, beta2))
# optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(beta1, beta2))

# # Define the training loop
# def train_gan(data, generator, discriminator, gan, epochs, batch_size, categorical_cols=[]):
#     enc = OneHotEncoder(sparse=False)
#     cat_cols = np.array(categorical_cols)
#     cat_dims = [data[:,col].max()+1 for col in cat_cols]
#     z_dim = 100
#     for epoch in range(epochs):
#         for i in range(len(data) // batch_size):
#             # Train the discriminator
#             real_data = torch.tensor(data[i * batch_size : (i+1) * batch_size]).float()
#             real_data[:,cat_cols] = enc.fit_transform(real_data[:,cat_cols].long())
#             noise = torch.randn(batch_size, z_dim)
#             c = torch.zeros(batch_size, sum(cat_dims)).scatter

In [16]:
# Set the device for training
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Set the hyperparameters
batch_size = 128
latent_dim = 32
num_categories = cat_data.shape[1]
num_epochs = 100
num_batches = int(data.shape[0] / batch_size)

# Initialize the models
generator = Generator(input_dim=latent_dim+num_categories, output_dim=data.shape[1])
discriminator = Discriminator(input_dim=data.shape[1])
gan = GAN(generator, discriminator)

# Move the models to the device
generator.to(device)
discriminator.to(device)
gan.to(device)

# Define the optimizer
g_optimizer = optim.Adam(generator.parameters(), lr=lr, betas=(beta1, 0.999))
d_optimizer = optim.Adam(discriminator.parameters(), lr=lr, betas=(beta1, 0.999))

# Define the data loader
data_tensor = torch.tensor(data, dtype=torch.float32)
dataset = torch.utils.data.TensorDataset(data_tensor)
loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [18]:
# Train the models
for epoch in range(num_epochs):
    for batch_idx, (real_data,) in enumerate(loader):
        # Train the discriminator on real data
        real_data = real_data.to(device)
        real_labels = torch.ones(real_data.shape[0], 1).to(device)
        fake_labels = torch.zeros(real_data.shape[0], 1).to(device)
        z = torch.randn(real_data.shape[0], latent_dim).to(device)
        c = torch.randint(0, 2, (real_data.shape[0], num_categories)).float().to(device)
        fake_data = generator(z, c).detach()
        real_output = discriminator(real_data)
        fake_output = discriminator(fake_data)
        d_loss = discriminator_loss(real_output, fake_output)
        discriminator.zero_grad()
        d_loss.backward()
        d_optimizer.step()

        # Train the generator to fool the discriminator
        z = torch.randn(real_data.shape[0], latent_dim).to(device)
        c = torch.randint(0, 2, (real_data.shape[0], num_categories)).float().to(device)
        fake_data = generator(z, c)
        fake_output = discriminator(fake_data)
        g_loss = generator_loss(fake_output)
        generator.zero_grad()
        g_loss.backward()
        g_optimizer.step()

        # Print the loss
        if (batch_idx+1) % 10 == 0:
            print('Epoch [{}/{}], Batch [{}/{}], Discriminator Loss: {:.4f}, Generator Loss: {:.4f}'
                  .format(epoch+1, num_epochs, batch_idx+1, num_batches, d_loss.item(), g_loss.item()))

    # Generate 10,000 records after every epoch
    with torch.no_grad():
        z = torch.randn(10000, latent_dim).to(device)
        c = torch.randint(0, 2, (10000, num_categories)).float().to(device)
        fake_data = generator(z, c)
        fake_data = fake_data.cpu().numpy()