<a href="https://colab.research.google.com/github/janwaelty/Master-Thesis/blob/main/visual_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Neural Networks and the Art Market: A Deep Learning Approach to Valuation**


In [1]:

import os
import shutil
from google.colab import drive
# Mount Google Drive
drive.mount('/content/drive')
# Define source and destination paths
source_path = "/content/drive/My Drive/Colab Notebooks/visual_model.ipynb"
destination_path = "/content/drive/My Drive/visual_model.ipynb"

# Check if the source file exists before moving
if os.path.exists(source_path):
  # Move the file
  shutil.move(source_path, destination_path)
  print(f"Notebook moved to: {destination_path}")

import zipfile

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from scipy.stats import linregress
from sklearn.metrics import r2_score
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

from PIL import Image
import torch
import torchvision
from torchvision import transforms
from torchvision import datasets, transforms


!cp /content/drive/My\ Drive/data_preprocess.py /content/

from data_preprocess import (
    data_split as data_split,
    fit_visualization as fit_visualization,
    repeated_to_single_transactions as repeated_to_single_transactions,
    one_hot as one_hot,
    standardization as standardization,
    data_filter as data_filter,
    add_prev_avg_price as add_prev_avg_price,
    standardization as standardization,
    load_image_data as load_image_data,
    check_image_existence as check_image_existence,
    filter_data_for_missing_images as filter_data_for_missing_images,
    resnet_transform as resnet_transform
)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [16]:
class MixedTransactionDataset(torch.utils.data.Dataset):
    def __init__(self, X, y, base_dir = "/content/data", transform=resnet_transform()):
        # Store the image paths first, and drop 'image_url' before converting other data
        self.image_paths = [os.path.join(base_dir, path.lstrip('/')) for path in X['image_url'].dropna()]

        # Drop 'image_url' column, then convert the remaining numerical columns to float32
        X = X.drop(columns=['image_url'])
        self.X = X.reset_index(drop=True).values.astype('float32')
        self.y = y.reset_index(drop=True).values.astype('float32')  # Convert y to numpy array
        self.transform = transform

    def __getitem__(self, index):
        # Get numerical features (X) and target values (y)
        x = torch.tensor(self.X[index], dtype=torch.float32)
        y = torch.tensor(self.y[index], dtype=torch.float32)

        # Get the image path (image_url) and open the image
        image = Image.open(self.image_paths[index])

        #print(f"Image shape before transform: {image.size}")  # Print the image size (W, H) before transform

        # Apply any transformations if provided (resize, normalization, etc.)
        if self.transform:
            image = self.transform(image)

        #print(f"Image shape after transform: {image.shape}")  # Print the image shape after transform (should be [3, 224, 224])

        return x, image, y

    def __len__(self):
        return len(self.X)




def create_dataloaders(X_train, X_test, y_train, y_test, base_dir,  batch_size=128):
    # Create the training dataset and dataloader
    train_dataset = MixedTransactionDataset(X_train, y_train,  transform=resnet_transform())
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,  num_workers=2,drop_last=True)
    # Create the testing dataset and dataloader
    test_dataset = MixedTransactionDataset(X_test, y_test,  transform = resnet_transform())
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False,  num_workers=2,drop_last=True)

    return train_loader, test_loader

class Numerical_Model(nn.Module):
    def __init__(self, in_features):
        super(Numerical_Model, self).__init__()
        self.fc1 = nn.Linear(in_features, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.fc3 = nn.Linear(128, 64)
        self.bn3 = nn.BatchNorm1d(64)
        self.fc4 = nn.Linear(64, 10)

        self.dropout = nn.Dropout(0.3)  # Lower dropout
        self.leaky_relu = nn.LeakyReLU(0.01)  # Leaky ReLU instead of ReLU

    def forward(self, x):
        x = self.leaky_relu(self.bn1(self.fc1(x)))
        x = self.dropout(x)
        x = self.leaky_relu(self.bn2(self.fc2(x)))
        x = self.dropout(x)
        x = self.leaky_relu(self.bn3(self.fc3(x)))
        x = self.dropout(x)
        x = self.fc4(x)  # No activation for regression
        return x

class CombinedModel(nn.Module):
    def __init__(self, in_features):
        super(CombinedModel, self).__init__()
        self.numerical_model = Numerical_Model(in_features)
        image_model = torchvision.models.resnet50(weights="DEFAULT")
        num_features = image_model.fc.in_features
        image_model.fc = nn.Linear(num_features, 10)
        self.image_model = image_model
        self.fc1 = nn.Linear(20, 1)  # Assuming the final combined features are of size 20 (10 from each model)

    def forward(self, numerical_input, image_input):
        # Get features from the numerical model
        num_features = self.numerical_model(numerical_input)

        img_features = self.image_model(image_input)

        combined = torch.cat((num_features, img_features), dim=1)

        # Final fully connected layer
        output = self.fc1(combined)
        return output


def train(network, trainloader, testloader, epochs=100, eta=0.001):
    optimizer = torch.optim.Adam(network.parameters(), lr=eta, weight_decay=1e-5)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    network = network.to(device)
    loss = torch.nn.MSELoss()
    val_loss = []

    for epoch in range(epochs):
        # Initialize total loss for train and test data
        J_train = 0
        total_train_samples = 0  # Counter for number of train samples

        # Train the network on training data
        for num_input, img_input, target in trainloader:
          network.train()
          optimizer.zero_grad()
          num_input, img_input, target = num_input.to(device), img_input.to(device), target.to(device)

          prediction = network(num_input, img_input)
          target = target.view(-1, 1)
          J = loss(prediction, target)

          J_train += J.item() * num_input.size(0)
          total_train_samples += num_input.size(0)

          J.backward()
          optimizer.step()


        # Compute average train loss
        avg_train_loss = J_train / total_train_samples

        # Test on validation data
        J_test = 0
        total_test_samples = 0  # Counter for number of test samples
        with torch.no_grad():
            network.eval()
            for num_input, img_input, target in testloader:
                num_input, img_input, target = num_input.to(device), img_input.to(device), target.to(device)

                prediction = network(num_input, img_input)
                target = target.view(-1, 1)
                J_test += loss(prediction, target).item() * target.size(0)
                total_test_samples += target.size(0)

        # Compute the average test loss
        avg_test_loss = J_test / total_test_samples

        # Print train and test loss for each epoch
        print(f'Epoch [{epoch + 1}/{epochs}]',
              f'Train loss: {avg_train_loss:.6f}',
              f'Test loss: {avg_test_loss:.6f}')

    # Save predictions and targets after last epoch
    pred, target_vals = [], []
    with torch.no_grad():
        network.eval()
        for num_input, img_input, target in testloader:
            prediction = network(num_input.to(device), img_input.to(device))
            pred.extend(prediction.view(-1).detach().cpu().numpy())
            target_vals.extend(target.view(-1).detach().cpu().numpy())

    return network, pred, target_vals


In [17]:
if __name__ == '__main__':
  """
  # Mount Google Drive
  drive.mount('/content/drive')
  # Define source and destination paths
  source_path = "/content/drive/My Drive/Colab Notebooks/visual_model.ipynb"
  destination_path = "/content/drive/My Drive/visual_model.ipynb"

  # Check if the source file exists before moving
  if os.path.exists(source_path):
    # Move the file
    shutil.move(source_path, destination_path)
    print(f"Notebook moved to: {destination_path}")


  artist_data_path= "/content/drive/My Drive/artist_graph_data.xlsx"
  artwork_numeric_data = pd.read_excel(artist_data_path)

  # image data
  zip_path = "/content/drive/My Drive/pic.zip"
  extract_path = "/content/data"
  load_image_data(zip_path, extract_path)
  """
  # Check for missing images
  base_dir = "/content/data"
  image_paths = artwork_numeric_data['image_url'].values
  missing_images = check_image_existence(image_paths, base_dir)
  data_filtered = filter_data_for_missing_images(artwork_numeric_data, missing_images, base_dir)
  # check functioning of image data after filtering
  print(f"After filtering:{check_image_existence(data_filtered['image_url'],base_dir)} ")


  selected_vars =  ['category', 'artist', 'transaction_price', 'height',
                  'width', 'medium', 'transaction_house',
                    'transaction_year_semi', 'image_url']
  one_hot_vars = ['category', 'artist', 'medium', 'transaction_house']

  X_train, X_test, y_train, y_test = data_split(data_filtered,
                                                   selected_vars,
                                                   one_hot_vars,
                                                 2020, image_url = True)

  train_loader, test_loader = create_dataloaders(X_train, X_test, y_train,
                                                 y_test, base_dir)



  # Initialize the CombinedModel (subtract 1 for image_url column)

  model = CombinedModel(X_train.shape[1]-1)


  # Train the model
  CombinedModel, prediction_combined_model, target = train(model, train_loader,
                                                           test_loader,
                                                           epochs = 10)



  print(f"Total test batches: {len(test_loader)}")
  print(f"Total test samples: {len(y_test)}")
  print(f"Test data shape : {X_test.shape}")
  print(f"Train data shape : {X_train.shape}")
  print("Std of y_test:", np.std(y_test))
  print("Std of predictions:", np.std(prediction_combined_model))



  # Train the model using the training data
  model = LinearRegression()
  model.fit(X_train, y_train)

  # Make predictions on the test set
  y_pred = model.predict(X_test)








number of nans: 1201
missing images: 86
number of nans: 0
missing images: 0
After filtering:[] 
Epoch [1/10] Train loss: 3.732557 Test loss: 1.661303


KeyboardInterrupt: 