In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data_utils
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

def read_data(file_path):
    """
    Read data from a file.

    Args:
        file_path (str): Path to the data file.

    Returns:
        list: List of lines read from the file.
    """
    with open(file_path, 'r') as f:
        data = f.readlines()
    return data

def parse_player_data(data, params):
    """
    Parse player data from the provided list of data lines.

    Args:
        data (list): List of data lines.
        params (tuple): Tuple of parameter names.

    Returns:
        list: List of dictionaries containing player data.
    """
    players = []
    for player_count in range(0, len(data), len(params)):
        param_count = 0
        player = {}
        for param in params:
            player[f'{param}'] = np.array([float(j) for j in data[player_count + param_count].split(',')], dtype=np.float32)
            param_count += 1
        players.append(player)
    return players

def process_player_data(players, params):
    """
    Process inputs and targets for each player.

    Args:
        players (list): List of player dictionaries.
        params (tuple): Tuple of parameter names.

    Returns:
        tuple: Tuple containing X_train, Y_train, X_test, and Y_test tensors.
    """
    X_train, Y_train, X_test, Y_test = [], [], [], []

    for player in players:
        player_inputs = torch.from_numpy(player['season_id'][:-3]).view(1, 47, 1).float()
        for param in params[2:]: # Don't add fantasy scores to inputs. Season ids were already added to inputs.
            player_inputs = torch.cat((player_inputs, torch.from_numpy(player[f'{param}'][:-3]).view(1, 47, 1).float()), dim=2)
        X_train.append(player_inputs)
        Y_train.append(torch.from_numpy(player['fantasy_scores'][:-3]).view(1, 47, 1).float())

    for player in players:
        player_test_inputs = torch.from_numpy(player['season_id'][-3:-2]).view(1, 1, 1).float()
        for param in params[2:]: # Don't add fantasy scores to test inputs. Season ids were already added to test inputs.
            player_test_inputs = torch.cat((player_test_inputs, torch.from_numpy(player[f'{param}'][-3:-2]).view(1, 1, 1).float()), dim=2)
        X_test.append(player_test_inputs)
        Y_test.append(torch.from_numpy(player['fantasy_scores'][-3:-2]).view(1, 1, 1).float())

    X_train = torch.cat(X_train, dim=0).to(device)
    Y_train = torch.cat(Y_train, dim=0).to(device)
    X_test = torch.cat(X_test, dim=0).to(device)
    Y_test = torch.cat(Y_test, dim=0).to(device)
    return X_train, Y_train, X_test, Y_test

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Define parameters. 
# NOTE: Fantasy scores must be first parameter and season id must be second parameter.
params = ('fantasy_scores', 'season_id', 'week_id', 'defense_ranks',
          'defense_id', 'skill_score', 'seasons_played', 'player_id', 'three_game_average',
          'four_game_average', 'five_game_average', 'six_game_average', 'seven_game_average',
          'eight_game_average', 'nine_game_average', 'team_id', 'depth_chart', 'position_id',)
# Read data from file
data = read_data('data.csv')
# Parse player data
players = parse_player_data(data, params)
# Process inputs and targets for each player
X_train, Y_train, X_test, Y_test = process_player_data(players, params)

X_train.shape, Y_train.shape, X_test.shape, Y_test.shape

(torch.Size([326, 47, 17]),
 torch.Size([326, 47, 1]),
 torch.Size([326, 1, 17]),
 torch.Size([326, 1, 1]))

In [2]:
# Define the neural network architecture
class RegressionModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(RegressionModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, hidden_size)
        self.fc5 = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.relu(out)
        out = self.fc3(out)
        out = self.relu(out)
        out = self.fc4(out)
        out = self.relu(out)
        out = self.fc5(out)
        return out

# Set hyperparameters
initial_learning_rate = 0.002
batch_size = 60 
num_batches = len(X_train) // batch_size  # Only full batches

# Create the model
model = RegressionModel(16, 1000).to(device)

# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=initial_learning_rate)

train_dataset = data_utils.TensorDataset(X_train, Y_train)
train_loader = data_utils.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Set up a learning rate scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=500, gamma=0.5)

# prints the number of parameters in the model
total = 0
for player_count in list(model.parameters()):
    total += len(player_count)
total

8002

In [6]:
# Training loop for each player
epochs = 1200
 
# Training loop
for epoch in range(epochs):
    model.train()  # Set the model in training mode
    total_loss = 0.0

    for batch_X, batch_Y in train_loader:
        # Forward pass
        outputs = model(batch_X)
        loss = criterion(outputs, batch_Y)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # Adjust learning rate using the scheduler
    scheduler.step()

    # Print the average loss for the epoch
    if epoch + 1 == epochs:
        print(f"Training Loss: {total_loss/num_batches}")

# Evaluation on the test set
model.eval()  # Set the model in evaluation mode
with torch.no_grad():
    test_outputs = model(X_test)
    test_loss = criterion(test_outputs, Y_test)
    print(f"Test Loss: {test_loss.item()}")

print() # blank line
# Print the predictions and original scores for the player
for player_count in range(len(X_test[0])):
    for j in range(1, 6):
        print(f"Player {j}: Skill Score: {round(X_test[-j][player_count][4].item(), 2)}, Defense Rank: {X_test[-j][player_count][2].item()} " + 
              f"Predicted Fantasy Score: {round(test_outputs[-j][player_count].item(), 2)}, Actual Fantasy Score: {round(Y_test[-j][player_count].item(), 2)}")
    print() # blank line

Epoch 1/1, Average Loss: 124.30219078063965
Test Loss: 43.12538528442383

Player 1: Skill Score: 13.9, Defense Rank: 26.0 Predicted Fantasy Score: 2.86, Actual Fantasy Score: 10.2
Player 2: Skill Score: 11.96, Defense Rank: 14.0 Predicted Fantasy Score: 2.87, Actual Fantasy Score: 19.3
Player 3: Skill Score: 10.01, Defense Rank: 7.0 Predicted Fantasy Score: 2.88, Actual Fantasy Score: 11.2
Player 4: Skill Score: 8.96, Defense Rank: 6.0 Predicted Fantasy Score: 2.86, Actual Fantasy Score: 11.7
Player 5: Skill Score: 13.73, Defense Rank: 0.0 Predicted Fantasy Score: 2.83, Actual Fantasy Score: 0.0



In [None]:
# Flatten the NumPy arrays
test_outputs_cpu = test_outputs.cpu().flatten()
Y_test_cpu = Y_test.cpu().flatten()

# Create a scatter plot
plt.figure(figsize=(8, 6))
plt.scatter(Y_test_cpu, test_outputs_cpu, alpha=0.5)  # alpha parameter controls transparency

# Add labels and title
plt.xlabel('Actual Values (Y_test)')
plt.ylabel('Predicted Values (test_outputs)')
plt.title('Scatter Plot of Predicted vs. Actual Values')

# Add a diagonal reference line (perfect predictions)
plt.plot([min(Y_test_cpu), max(Y_test_cpu)], [min(Y_test_cpu), max(Y_test_cpu)], 
         color='red', linestyle='--', linewidth=2)

# Show the plot
plt.grid(True)
plt.show()