In [4]:
# Define model
import torch
import torch.nn as nn
from torch.utils.data import DataLoader


NUM_FEATURES = 0
NUM_QUALITY_SCORES = 0


# Network that takes in all the input features and outputs a vector of qualities
class QualitiesPredictor(nn.Module):
    def __init__(self, input_size, num_quality_scores, hidden_layer_sizes):
        super(QualitiesPredictor, self).__init__()

        # Create input layer
        self.input_layer = nn.Linear(input_size, hidden_layer_sizes[0])

        # Create hidden layers dynamically based on args
        hidden_layers = []
        for i in range(1, len(hidden_layer_sizes)):
            hidden_size = hidden_layer_sizes[i]
            hidden_layers.append(nn.Linear(hidden_layer_sizes[i - 1], hidden_size))
        self.hidden_layers = nn.ModuleList(hidden_layers)
        self.output_layer = nn.Linear(hidden_layer_sizes[-1], num_quality_scores)

    def forward(self, x):
        x = torch.relu(self.input_layer(x))
        for hidden_layer in self.hidden_layers:
            x = torch.relu(hidden_layer(x))
        x = self.output_layer(x)
        return x


# Network that takes a quality and the price and predicts the satisfaction
class SatisfactionPredictor(nn.Module):
    def __init__(self, hidden_layer_sizes):
        super(SatisfactionPredictor, self).__init__()

        # Create input layer
        self.input_layer = nn.Linear(2, hidden_layer_sizes[0])

        # Create hidden layers dynamically based on args
        hidden_layers = []
        for i in range(1, len(hidden_layer_sizes)):
            hidden_size = hidden_layer_sizes[i]
            hidden_layers.append(nn.Linear(hidden_layer_sizes[i - 1], hidden_size))
        self.hidden_layers = nn.ModuleList(hidden_layers)
        self.output_layer = nn.Linear(hidden_layer_sizes[-1], 1)

    def forward(self, q, p):
        x = torch.cat((q, p), dim = 1)
        x = torch.relu(self.input_layer(x))
        for hidden_layer in self.hidden_layers:
            x = torch.relu(hidden_layer(x))
        x = self.output_layer(x)
        return x


# Takes all the quality scores and predicts a total quality variable
class TotalQualityPredictor(nn.Module):
    def __init__(self, num_quality_scores, hidden_layer_sizes):
        super(TotalQualityPredictor, self).__init__()

        # Create input layer
        self.input_layer = nn.Linear(num_quality_scores, hidden_layer_sizes[0])

        # Create hidden layers dynamically based on args
        hidden_layers = []
        for i in range(1, len(hidden_layer_sizes)):
            hidden_size = hidden_layer_sizes[i]
            hidden_layers.append(nn.Linear(hidden_layer_sizes[i - 1], hidden_size))
        self.hidden_layers = nn.ModuleList(hidden_layers)
        self.output_layer = nn.Linear(hidden_layer_sizes[-1], 1)

    def forward(self, x):
        x = torch.relu(self.input_layer(x))
        for hidden_layer in self.hidden_layers:
            x = torch.relu(hidden_layer(x))
        x = self.output_layer(x)
        return x


class CausalModel(nn.Module):
    def __init__(self, input_size, num_quality_scores, activation_on_quality,
                 qualities_predictor_hidden_layer_sizes,
                 satisfaction_predictor_hidden_layer_sizes,
                 total_quality_predictor_hidden_layer_sizes):
        super(CausalModel, self).__init__()

        self.num_quality_scores = num_quality_scores

        self.qualities_predictor_net = QualitiesPredictor(input_size, num_quality_scores,
                                                          qualities_predictor_hidden_layer_sizes)

        self.activation_on_quality = activation_on_quality

        self.satisfaction_predictors = [
            SatisfactionPredictor(satisfaction_predictor_hidden_layer_sizes) for _ in range(num_quality_scores)
        ]

        self.total_quality_predictor = TotalQualityPredictor(num_quality_scores,
                                                             total_quality_predictor_hidden_layer_sizes)

        self.total_satisfaction_predictor = SatisfactionPredictor(satisfaction_predictor_hidden_layer_sizes)

    def forward(self, x, p):
        qualities = self.qualities_predictor_net(x)
        qualities = self.activation_on_quality(qualities)

        satisfactions = torch.zeros(self.num_quality_scores, 1)
        for i, sat_predictor in enumerate(self.satisfaction_predictors):
            satisfactions[i] = sat_predictor(qualities[i], p)

        total_quality = self.total_quality_predictor(qualities)
        total_quality = self.activation_on_quality(total_quality)

        total_satisfaction = self.total_satisfaction_predictor(qualities, p)

        return satisfactions, total_satisfaction, total_quality

    def loss_function(self, satisfactions, total_satisfaction, satisfactions_targets, total_satisfaction_target):
        # Define custom loss function
        criterion = nn.MSELoss()  # Use Mean Squared Error as the loss criterion
        # Compute the loss for satisfactions
        loss_satisfactions = criterion(satisfactions, satisfactions_targets)
        # Compute the loss for total_satisfaction
        loss_total_satisfaction = criterion(total_satisfaction, total_satisfaction_target)
        # Add up the losses with appropriate weights (if desired)
        total_loss = loss_satisfactions + loss_total_satisfaction

        return total_loss

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset

class CausalDataloader(Dataset):
    def __init__(self, path_to_data_dir, transform=None, target_transform=None):
        self.data_dir = path_to_data_dir
        self.transform = transform
        self.target_transform = target_transform

        CATEGORIES = ["staff", "facilities", "cleanliness", "comfort", "valueForMoney", "location"]
        df = pd.read_csv("data.csv")
        X, p = df[["stars", "reviews", "rating"] + CATEGORIES + list(pd.get_dummies(df["city"]).columns)], df["price"]
    
        self.data = combined_data
        self.labels = combined_labels

    def process_data(self, data):
        # make sure to add city into the processed data
        processed_data = []
        label = 0
        return processed_data, label

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data = self.data[idx, :]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(data)
        if self.target_transform:
            label = self.target_transform(label)
        return data, label
# Dataloader class

In [None]:
import numpy as np

# Step 1: Load and preprocess your dataset
# Assume dataset is loaded and preprocessed, and stored in `data` and `labels` variables

data = []
s = []
total_s = []
price = []


# Step 2: Split the dataset into train, test, and validation sets
# Define the ratio for train, test, and validation sets
train_ratio = 0.8
test_ratio = 0.1
val_ratio = 0.1

# Get the number of samples in the dataset
num_samples = len(data)

# Calculate the number of samples for train, test, and validation sets
num_train = int(train_ratio * num_samples)
num_test = int(test_ratio * num_samples)
num_val = num_samples - num_train - num_test

# Shuffle the dataset
indices = np.arange(num_samples)
np.random.shuffle(indices)

# Split the dataset into train, test, and validation sets
train_data = data[indices[:num_train]]
test_data = data[indices[num_train:num_train + num_test]]
val_data = data[indices[num_train + num_test:]]



In [12]:
model = CausalModel(NUM_FEATURES, NUM_QUALITY_SCORES, nn.Sigmoid, [], [], [])

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)


#