In [1]:
from pathlib import Path
import os
import numpy as np
import pandas as pd
import random
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
from keras import models
from keras import layers
from sklearn import metrics

import seaborn as sns
import matplotlib.pyplot as plt

mids_dir = Path("D:\\MIDS-W207")
data = mids_dir/"datasets/soccertrack_square"
project = mids_dir/"MIDS-W207-Spring24-Soccer-Detection"
analysis = project/"analysis"
drive_dir = "/content/drive/MyDrive/School/Graduate/Datasci 207/"
colab_images = "/content/drive/MyDrive/School/Graduate/Datasci 207/soccertrack_square/"

# Author: Timothy Majidzadeh
# Date Created: April 9, 2024
# Date Updated: April 17, 2024
# Description: Try CNN for classification on the soccertrack_square dataset.
# Re-uses some code from T. Majidzadeh's Homework 10 submission for MIDS W207 Spring 2024.
# Notes: [v1] Created program.
#        [v2] Tried using Google Colab to manage compute resources.
# Inputs: Frame-by-frame labels saved as separate text files.
# Outputs: A CNN classifier for the soccertrack_square dataset.




In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

In [3]:
# 1. Load the labels data and map to classes.
class_dict = {
    0: "No Objects",
    1: "Team 0 Only",
    2: "Team 1 Only",
    3: "Ball Only",
    4: "Team 0 and Team 1",
    5: "Team 0 and Ball",
    6: "Team 1 and Ball",
    7: "All Classes"
}

# Because certain class combinations are rare, it may be more effective to try each class individually.
# Remap to a value of 1 if one class is present, 0 otherwise.
ball_remap = {
    0: 0,
    1: 0,
    2: 0,
    3: 1,
    4: 0,
    5: 1,
    6: 1,
    7: 1
}

team_0_remap = {
    0: 0,
    1: 1,
    2: 0,
    3: 0,
    4: 1,
    5: 1,
    6: 0,
    7: 1
}

team_1_remap = {
    0: 0,
    1: 0,
    2: 1,
    3: 0,
    4: 1,
    5: 0,
    6: 1,
    7: 1
}
objects_per_image = pd.read_pickle(data/"objects_per_image_oversampled.pkl")

In [4]:
def class_mapping(ball_count, team_0_count, team_1_count):
    '''
    Assigns each row in the objects_per_image DataFrame to a class based on object counts.
    '''
    if (ball_count == 0) & (team_0_count == 0) & (team_1_count == 0):
        return 0
    elif (ball_count == 0) & (team_0_count > 0) & (team_1_count == 0):
        return 1
    elif (ball_count == 0) & (team_0_count == 0) & (team_1_count > 0):
        return 2
    elif (ball_count > 0) & (team_0_count == 0) & (team_1_count == 0):
        return 3
    elif (ball_count == 0) & (team_0_count > 0) & (team_1_count > 0):
        return 4
    elif (ball_count > 0) & (team_0_count > 0) & (team_1_count == 0):
        return 5
    elif (ball_count > 0) & (team_0_count == 0) & (team_1_count > 0):
        return 6
    else:
        return 7

In [5]:
objects_per_image['class'] = objects_per_image[
    ['img_ball_count', 'img_team_0_count', 'img_team_1_count']
].apply(lambda x: class_mapping(x.img_ball_count, x.img_team_0_count, x.img_team_1_count), axis=1)

objects_per_image['multiclass'] = objects_per_image['class']

objects_per_image.head()

class,image_name,img_ball_count,img_team_0_count,img_team_1_count,class.1,multiclass
23905,top_view_31511.png,1,4,4,7,7
17885,top_view_26094.png,1,2,2,7,7
84829,wide_view_38446.png,1,5,2,7,7
57622,wide_view_1396.png,1,9,9,7,7
88127,wide_view_41413.png,0,0,1,2,2


In [6]:
# Adapted T. Majidzadeh's HW 10 submission, with some edits for the use case.

def load_data(image_paths, labels, splits):
    """ Load and split data into train, validation and test sets

    Params:
    -------
    image_paths (np.ndarray): Paths to load images.
    labels (np.ndarray): Labels of shape (N,)
    splits (tuple): 3 values summing to 1 defining split of train, validation and test sets

    Returns:
    --------
    X_train (np.ndarray): Train images of shape (N_train, 100, 100)
    y_train (np.ndarray): Train labels of shape (N_train,)
    X_val (np.ndarray): Val images of shape (N_val, 100, 100)
    y_val (np.ndarray): Val labels of shape (N_val,)
    X_test (np.ndarray): Test images of shape (N_test, 100, 100)
    y_test (np.ndarray): Test labels of shape (N_test,)

    """

    # Data is already shuffled on input.
    # create data splits (training, val, and test sets)
    train_val_index = int(np.floor(len(labels) * splits[0]))
    val_test_index = int(np.floor(len(labels) * (splits[0] + splits[1])))

    X_train_paths, y_train = image_paths[0:train_val_index], labels[0:train_val_index]
    X_val_paths, y_val = image_paths[train_val_index:val_test_index], labels[train_val_index:val_test_index]
    X_test_paths, y_test = image_paths[val_test_index:], labels[val_test_index:]

    print("Loading train...")
    X_train = [img_to_array(load_img(path, target_size=(100,100))) for path in X_train_paths]
    print("Loading val...")
    X_val = [img_to_array(load_img(path, target_size=(100,100))) for path in X_val_paths]
    print("Loading test...")
    X_test = [img_to_array(load_img(path, target_size=(100,100))) for path in X_test_paths]
    return X_train, y_train, X_val, y_val, X_test, y_test

def preprocess_data(X_train, y_train, X_val, y_val, X_test, y_test):
    """
    Applies augmentations

    Params:
    -------
    X_train (np.ndarray): Train images of shape (N_train, 100, 100)
    y_train (np.ndarray): Train labels of shape (N_train,)
    X_val (np.ndarray): Val images of shape (N_val, 100, 100)
    y_val (np.ndarray): Val labels of shape (N_val,)
    X_test (np.ndarray): Test images of shape (N_test, 100, 100)
    y_test (np.ndarray): Test labels of shape (N_test,)

    Returns:
    --------
    Augmented & Rescaled Values for X_train
    """
    # image augmentation (random flip) on training data
    print("Augmenting: Left-right flip")
    X_train = tf.image.flip_left_right(X_train)
    print("Augmenting: Hue")
    X_train = tf.image.stateless_random_hue(X_train, .0075, (298996, 815818))
    print("Augmenting: Saturation")
    X_train = tf.image.stateless_random_saturation(X_train, 0, .7, (857174, 135799))
    print("Augmenting: Brightness")
    X_train = tf.image.stateless_random_brightness(X_train, .4, (153696, 274697))

    # shuffle X_train and y_train, i.e., shuffle two tensors in the same order
    print("Shuffling...")
    shuffle = tf.random.shuffle(tf.range(tf.shape(X_train)[0], dtype=tf.int32))
    X_train = tf.gather(X_train, shuffle).numpy() # transform X back to numpy array instead of tensor
    y_train = tf.gather(y_train, shuffle).numpy() # transform y back to numpy array instead of tensor

    # rescale training, val, and test images by dividing each pixel by 255.0
    print("Rescaling and Squeezing...")
    X_train, X_val, X_test = np.squeeze(np.array(X_train)), np.squeeze(np.array(X_val)), np.squeeze(np.array(X_test))
    X_train, X_val, X_test = X_train / 255, X_val / 255, X_test / 255

    print("Done!")
    return X_train, y_train, X_val, y_val, X_test, y_test

In [7]:
# 2. Load the images and match to labels. Format as NumPy arrays.
paths = np.array([data/"images"/name for name in objects_per_image['image_name']])
labels = np.array(objects_per_image['multiclass'])
splits = (0.8, 0.1, 0.1)
X_train, multiclass_train, X_val, multiclass_val, X_test, multiclass_test = load_data(paths, labels, splits)


Loading train...
Loading val...
Loading test...


In [8]:
X_train, multiclass_train, X_val, multiclass_val, X_test, multiclass_test = preprocess_data(X_train, multiclass_train, X_val, multiclass_val, X_test, multiclass_test)

Augmenting: Left-right flip
Augmenting: Hue
Augmenting: Saturation
Augmenting: Brightness
Shuffling...
Rescaling and Squeezing...
Done!


In [9]:
ball_binary_train, ball_binary_val, ball_binary_test = np.vectorize(ball_remap.get)(multiclass_train), np.vectorize(ball_remap.get)(multiclass_val), np.vectorize(ball_remap.get)(multiclass_test)
ball_binary_train.shape

(8320,)

In [10]:
team_0_binary_train, team_0_binary_val, team_0_binary_test = np.vectorize(team_0_remap.get)(multiclass_train), np.vectorize(team_0_remap.get)(multiclass_val), np.vectorize(team_0_remap.get)(multiclass_test)
team_0_binary_train.shape

(8320,)

In [11]:
team_1_binary_train, team_1_binary_val, team_1_binary_test = np.vectorize(team_1_remap.get)(multiclass_train), np.vectorize(team_1_remap.get)(multiclass_val), np.vectorize(team_1_remap.get)(multiclass_test)
team_1_binary_train.shape

(8320,)

In [12]:
for object in [X_train, multiclass_train, X_val, multiclass_val, X_test, multiclass_test]:
    print(object.shape)

(8320, 100, 100, 3)
(8320,)
(1040, 100, 100, 3)
(1040,)
(1040, 100, 100, 3)
(1040,)


In [57]:
tf.keras.backend.clear_session()

In [58]:
# define an instance of the early_stopping class - from HW 10.
early_stopping = tf.keras.callbacks.EarlyStopping(
  monitor='accuracy',
  verbose=1,
  patience=4,
  mode='max',
  restore_best_weights=True
)

In [59]:
tf.random.set_seed(33269)
np.random.seed(33269)

# initialize model
multiclass_model = tf.keras.Sequential()

# add convolutional layer to model1
inputs = tf.keras.layers.Input(shape=(100,100,3))
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (6, 6),
    strides = (2, 2),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_1',
    activation = 'relu'
)
multiclass_model.add(inputs)
# Adding additiona complexity via more convolutional layers. The last one takes 2 strides instead of 1.
multiclass_model.add(x)
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (6, 6),
    strides = (1, 1),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_2',
    activation = 'relu'
)

multiclass_model.add(x)
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (4, 4),
    strides = (1, 1),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_3',
    activation = 'relu'
)
multiclass_model.add(x)

# add max pooling layer to model1
x = tf.keras.layers.MaxPooling2D(
    pool_size = (2, 2)
)
multiclass_model.add(x)

# add dropout layer to model1
x = tf.keras.layers.Dropout(
    rate = .2
)
multiclass_model.add(x)

# add a flattening layer to model1
x = tf.keras.layers.Flatten()
multiclass_model.add(x)

# add the classification layer to model1
x = layers.Dense(units=8, activation='sigmoid')
multiclass_model.add(x)

# build and compile model1
multiclass_model.build(input_shape=(None, 100, 100, 3))
multiclass_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

# print model1 summary
multiclass_model.summary()

# train model1 on (X_train, y_train) data
multiclass_model.fit(
    X_train,
    multiclass_train,
    validation_split=0.1,
    epochs=20,
    callbacks=[early_stopping]
)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv_1 (Conv2D)             (None, 50, 50, 64)        6976      
                                                                 
 conv_2 (Conv2D)             (None, 50, 50, 64)        147520    
                                                                 
 conv_3 (Conv2D)             (None, 50, 50, 64)        65600     
                                                                 
 max_pooling2d (MaxPooling2  (None, 25, 25, 64)        0         
 D)                                                              
                                                                 
 dropout (Dropout)           (None, 25, 25, 64)        0         
                                                                 
 flatten (Flatten)           (None, 40000)             0         
                                                        

<keras.src.callbacks.History at 0x2665a1003a0>

In [60]:
multiclass_predictions = np.argmax(multiclass_model.predict(X_test), axis=1)



In [61]:
print("Precision: ", metrics.precision_score(multiclass_test, multiclass_predictions, average='weighted'))
print("Recall: ", metrics.recall_score(multiclass_test, multiclass_predictions, average='weighted'))

Precision:  0.20249999999999999
Recall:  0.45


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [62]:
tf.keras.backend.clear_session()

In [63]:
# Try applying the model for binary outputs
# Start with the ball.
tf.random.set_seed(326809)
np.random.seed(326809)

# initialize model
binary_ball_model = tf.keras.Sequential()

# add convolutional layer to model1
inputs = tf.keras.layers.Input(shape=(100,100,3))
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (6, 6),
    strides = (2, 2),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_1',
    activation = 'relu'
)
binary_ball_model.add(inputs)
# Adding additiona complexity via more convolutional layers. The last one takes 2 strides instead of 1.
binary_ball_model.add(x)
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (6, 6),
    strides = (1, 1),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_2',
    activation = 'relu'
)
binary_ball_model.add(x)
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (4, 4),
    strides = (1, 1),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_3',
    activation = 'relu'
)
binary_ball_model.add(x)

# add max pooling layer to model1
x = tf.keras.layers.MaxPooling2D(
    pool_size = (2, 2)
)
binary_ball_model.add(x)

# add dropout layer to model1
x = tf.keras.layers.Dropout(
    rate = .2
)
binary_ball_model.add(x)

# add a flattening layer to model1
x = tf.keras.layers.Flatten()
binary_ball_model.add(x)

# add the classification layer to model1
x = layers.Dense(units=1, activation='sigmoid')
binary_ball_model.add(x)

# build and compile model1
binary_ball_model.build(input_shape=(None, 100, 100, 3))
binary_ball_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=['accuracy'])

# print model1 summary
binary_ball_model.summary()

# train model1 on (X_train, y_train) data
binary_ball_model.fit(
    X_train,
    ball_binary_train,
    validation_split=0.1,
    epochs=20,
    callbacks=[early_stopping]
)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv_1 (Conv2D)             (None, 50, 50, 64)        6976      
                                                                 
 conv_2 (Conv2D)             (None, 50, 50, 64)        147520    
                                                                 
 conv_3 (Conv2D)             (None, 50, 50, 64)        65600     
                                                                 
 max_pooling2d (MaxPooling2  (None, 25, 25, 64)        0         
 D)                                                              
                                                                 
 dropout (Dropout)           (None, 25, 25, 64)        0         
                                                                 
 flatten (Flatten)           (None, 40000)             0         
                                                        

<keras.src.callbacks.History at 0x2665a355540>

In [64]:
ball_binary_predictions = np.round(binary_ball_model.predict(X_test)) # Class threshold of 0.5



In [65]:
print("Precision: ", metrics.precision_score(ball_binary_test, ball_binary_predictions, average='binary'))
print("Recall: ", metrics.recall_score(ball_binary_test, ball_binary_predictions, average='binary'))

Precision:  0.0
Recall:  0.0


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [66]:
tf.keras.backend.clear_session()

In [67]:
# Team 0.
tf.random.set_seed(326809)
np.random.seed(326809)

# initialize model
team_0_model = tf.keras.Sequential()

# add convolutional layer to model1
inputs = tf.keras.layers.Input(shape=(100,100,3))
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (6, 6),
    strides = (2, 2),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_1',
    activation = 'relu'
)
team_0_model.add(inputs)
# Adding additiona complexity via more convolutional layers. The last one takes 2 strides instead of 1.
team_0_model.add(x)
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (6, 6),
    strides = (1, 1),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_2',
    activation = 'relu'
)
team_0_model.add(x)
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (4, 4),
    strides = (1, 1),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_3',
    activation = 'relu'
)
team_0_model.add(x)

# add max pooling layer to model1
x = tf.keras.layers.MaxPooling2D(
    pool_size = (2, 2)
)
team_0_model.add(x)

# add dropout layer to model1
x = tf.keras.layers.Dropout(
    rate = .2
)
team_0_model.add(x)

# add a flattening layer to model1
x = tf.keras.layers.Flatten()
team_0_model.add(x)

# add the classification layer to model1
x = layers.Dense(units=1, activation='sigmoid')
team_0_model.add(x)

# build and compile model1
team_0_model.build(input_shape=(None, 100, 100, 3))
team_0_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=['accuracy'])

# print model1 summary
team_0_model.summary()

# train model1 on (X_train, y_train) data
team_0_model.fit(
    X_train,
    team_0_binary_train,
    validation_split=0.1,
    epochs=20,
    callbacks=[early_stopping]
)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv_1 (Conv2D)             (None, 50, 50, 64)        6976      
                                                                 
 conv_2 (Conv2D)             (None, 50, 50, 64)        147520    
                                                                 
 conv_3 (Conv2D)             (None, 50, 50, 64)        65600     
                                                                 
 max_pooling2d (MaxPooling2  (None, 25, 25, 64)        0         
 D)                                                              
                                                                 
 dropout (Dropout)           (None, 25, 25, 64)        0         
                                                                 
 flatten (Flatten)           (None, 40000)             0         
                                                        

<keras.src.callbacks.History at 0x2665a036c20>

In [68]:
team_0_predictions = np.round(team_0_model.predict(X_test)) # Class threshold of 0.5



In [69]:
print("Precision: ", metrics.precision_score(team_0_binary_test, team_0_predictions, average='binary'))
print("Recall: ", metrics.recall_score(team_0_binary_test, team_0_predictions, average='binary'))

Precision:  0.7163461538461539
Recall:  1.0


In [70]:
tf.keras.backend.clear_session()

In [71]:
# Team 1.
tf.random.set_seed(326809)
np.random.seed(326809)

# initialize model
team_1_model = tf.keras.Sequential()

# add convolutional layer to model1
inputs = tf.keras.layers.Input(shape=(100,100,3))
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (6, 6),
    strides = (2, 2),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_1',
    activation = 'relu'
)
team_1_model.add(inputs)
# Adding additiona complexity via more convolutional layers. The last one takes 2 strides instead of 1.
team_1_model.add(x)
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (6, 6),
    strides = (1, 1),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_2',
    activation = 'relu'
)
team_1_model.add(x)
x = tf.keras.layers.Conv2D(
    filters = 64,
    kernel_size = (4, 4),
    strides = (1, 1),
    padding = 'same',
    data_format = 'channels_last',
    name = 'conv_3',
    activation = 'relu'
)
team_1_model.add(x)

# add max pooling layer to model1
x = tf.keras.layers.MaxPooling2D(
    pool_size = (2, 2)
)
team_1_model.add(x)

# add dropout layer to model1
x = tf.keras.layers.Dropout(
    rate = .2
)
team_1_model.add(x)

# add a flattening layer to model1
x = tf.keras.layers.Flatten()
team_1_model.add(x)

# add the classification layer to model1
x = layers.Dense(units=1, activation='sigmoid')
team_1_model.add(x)

# build and compile model1
team_1_model.build(input_shape=(None, 100, 100, 3))
team_1_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=['accuracy'])

# print model1 summary
team_1_model.summary()

# train model1 on (X_train, y_train) data
team_1_model.fit(
    X_train,
    team_1_binary_train,
    validation_split=0.1,
    epochs=20,
    callbacks=[early_stopping]
)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv_1 (Conv2D)             (None, 50, 50, 64)        6976      
                                                                 
 conv_2 (Conv2D)             (None, 50, 50, 64)        147520    
                                                                 
 conv_3 (Conv2D)             (None, 50, 50, 64)        65600     
                                                                 
 max_pooling2d (MaxPooling2  (None, 25, 25, 64)        0         
 D)                                                              
                                                                 
 dropout (Dropout)           (None, 25, 25, 64)        0         
                                                                 
 flatten (Flatten)           (None, 40000)             0         
                                                        

<keras.src.callbacks.History at 0x2666e5e9c60>

In [72]:
team_1_predictions = np.round(team_1_model.predict(X_test)) # Class threshold of 0.5



In [73]:
print("Precision: ", metrics.precision_score(team_1_binary_test, team_1_predictions, average='binary'))
print("Recall: ", metrics.recall_score(team_1_binary_test, team_1_predictions, average='binary'))

Precision:  0.7221153846153846
Recall:  1.0
