In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet18
from torch import nn
from torch.optim import Adam
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np
import torchvision.transforms.functional as F

In [2]:
# unnormalize and display an image
def imshow(img):
    img = img / 2 + 0.5  
    npimg = img.cpu().numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

# Define the display_top_predictions function
def display_top_predictions(dataloader, model, feature_extractor, device, top_k=5):
    model.eval()
    feature_extractor.eval()
    correct_pred = {classname: [] for classname in classes}
    incorrect_pred = {classname: [] for classname in classes}

    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            features = feature_extractor(images)
            outputs = model(features)
            _, preds = torch.max(outputs, 1)
            for i in range(len(images)):
                label = labels[i]
                pred = preds[i]
                if pred == label:
                    correct_pred[classes[label.item()]].append(images[i])
                else:
                    incorrect_pred[classes[label.item()]].append(images[i])

    for classname, image_list in correct_pred.items():
        print(f'\nTop 5 correct predictions for class {classname}:')
        for i, image in enumerate(image_list[:top_k]):
            imshow(F.to_pil_image(torchvision.utils.make_grid(image)))
            print(f'Image {i+1}: Correctly predicted as {classname}')

    for classname, image_list in incorrect_pred.items():
        print(f'\nTop 5 incorrect predictions for class {classname}:')
        for i, image in enumerate(image_list[:top_k]):
            imshow(F.to_pil_image(torchvision.utils.make_grid(image)))
            print(f'Image {i+1}: Incorrectly predicted as {classname}')


In [3]:
# CIFAR-10 dataset loading
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=4)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=64, shuffle=False, num_workers=4)

# Model definitions
resnet = resnet18(pretrained=True)
resnet.fc = nn.Identity()

class LogisticRegression(nn.Module):
    def __init__(self):
        super(LogisticRegression, self).__init__()
        self.fc = nn.Linear(512, 10)  # 512 for ResNet18 output, 10 for CIFAR-10 classes

    def forward(self, x):
        return self.fc(x)

logistic_regression_model = LogisticRegression()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resnet.to(device)
logistic_regression_model.to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = Adam(logistic_regression_model.parameters(), lr=0.001)

# Training function for logistic regression
def train_logistic_regression_model(model, feature_extractor, train_loader, criterion, optimizer, num_epochs=5):
    feature_extractor.eval()  # Feature extractor is in eval mode
    for epoch in range(num_epochs):
        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            with torch.no_grad():
                features = feature_extractor(inputs)
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            if i % 200 == 199:
                print(f'Epoch {epoch + 1}, Batch {i + 1}, Loss: {loss.item()}')

# Training function for finetuning ResNet model
def train_finetuned_model(model, train_loader, criterion, optimizer, num_epochs=5):
    model.train()
    for epoch in range(num_epochs):
        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            if i % 200 == 199:
                print(f'Epoch {epoch + 1}, Batch {i + 1}, Loss: {loss.item()}')

# Evaluation function
def evaluate_model(model, feature_extractor, test_loader):
    model.eval()
    feature_extractor.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            features = feature_extractor(inputs)
            outputs = model(features)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f'Accuracy: {100 * correct / total}%')

# Assuming 'classes' is a list of class names in the order they are indexed by the model
classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

# Train and evaluate logistic regression model
print("Training logistic regression model...")
train_logistic_regression_model(logistic_regression_model, resnet, trainloader, criterion, optimizer, num_epochs=5)
evaluate_model(logistic_regression_model, resnet, testloader)

# Fine-tune the entire ResNet18 model
for param in resnet.parameters():
    param.requires_grad = True
optimizer_ft = Adam(resnet.parameters(), lr=0.001)

print("Finetuning the ResNet18 model...")
train_finetuned_model(resnet, trainloader, criterion, optimizer_ft, num_epochs=5)
evaluate_model(logistic_regression_model, resnet, testloader)

# Display predictions
print("Displaying top predictions for the logistic regression model...")
display_top_predictions(testloader, logistic_regression_model, resnet, device, top_k=5)
print("Displaying top predictions for the finetuned ResNet18 model...")
display_top_predictions(testloader, resnet, resnet, device, top_k=5)

Files already downloaded and verified
Files already downloaded and verified




Training logistic regression model...


In [None]:
import pandas as pd
import numpy as np
from torch.autograd import Variable
from torch.optim import SGD
import matplotlib.pyplot as plt
from sklearn.metrics.pairwise import cosine_similarity

# Load the MovieLens dataset
ratings_path = '/Users/student/Desktop/SPRING 24/IDS 576/ml-latest-small/ratings.csv' 
movies_path = '/Users/student/Desktop/SPRING 24/IDS 576/ml-latest-small/movies.csv'   

ratings = pd.read_csv(ratings_path)
movies = pd.read_csv(movies_path)

# Convert ratings to a binary preference
ratings['like'] = (ratings['rating'] >= 3.5).astype(int)

# Create a pivot table
pivot_table = ratings.pivot(index='userId', columns='movieId', values='like').fillna(0)

# Create a co-occurrence matrix
cooccurrence_matrix = pivot_table.T.dot(pivot_table)
np.fill_diagonal(cooccurrence_matrix.values, 0)

# Convert to torch tensors
X_ij = torch.tensor(cooccurrence_matrix.values, dtype=torch.float32)

# Number of movies
num_movies = X_ij.shape[0]

# Initialize embeddings
V = Variable(torch.rand(num_movies, 50), requires_grad=True)

# Define the optimization function
def cost_function(V, X_ij):
    VtV = torch.matmul(V, V.t())
    loss = (VtV - X_ij).pow(2).sum()
    return loss

# Optimization setup
optimizer = SGD([V], lr=0.01, momentum=0.9)

# Training loop
epochs = 100
loss_values = []

for epoch in range(epochs):
    optimizer.zero_grad()
    loss = cost_function(V, X_ij)
    loss.backward()
    optimizer.step()
    loss_values.append(loss.item())
    print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item()}')

# Plot the loss over epochs
plt.plot(loss_values)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.show()

# Function to get movie recommendations
def get_movie_recommendations(movie_title, movie_embeddings, movies_df, k=10):
    movie_id = movies_df.loc[movies_df['title'] == movie_title, 'movieId'].item()
    movie_idx = movie_to_idx[movie_id]
    query_embedding = movie_embeddings[movie_idx].reshape(1, -1)
    similarity = cosine_similarity(query_embedding, movie_embeddings.detach().numpy())
    top_k_indices = similarity[0].argsort()[-k:][::-1]
    
    recommendations = movies_df.iloc[top_k_indices]['title'].values
    return recommendations

# Create a mapping from movieId to index and vice versa
movie_to_idx = {movie: idx for idx, movie in enumerate(cooccurrence_matrix.columns)}
idx_to_movie = {idx: movie for movie, idx in movie_to_idx.items()}

# Get recommendations for a movie
recommended_movies = get_movie_recommendations('Apollo 13 (1995)', V, movies, k=10)
print("Movies recommended based on 'Apollo 13 (1995)':")
for movie in recommended_movies:
    print(movie)
