# Extract Play-by-Play Commentaries

## Load Data

We've manually annotated some commentaries.

In [None]:
import json

with open(
    "../manually-labeled-data/SoccerNet/2016-09-24 - 19-30 Arsenal 3 - 0 Chelsea/1_224p_annotated.json"
) as f:
    commentaries = json.load(f)

data = []
labeled_play_by_play = []
labeled_analysis = []
unlabeled_comments = []

for seg in commentaries["segments"]:
    data.append((seg["text"], "play-by-play" if seg["is_pbp"] else "analysis"))
    if seg["is_pbp"]:
        labeled_play_by_play.append(seg["text"])
    else:
        labeled_analysis.append(seg["text"])

print(f"len(labeled_play_by_play) = {len(labeled_play_by_play)}")
print(f"len(labeled_analysis) = {len(labeled_analysis)}")
print(f"len(unlabeled_comments) = {len(unlabeled_comments)}")

## K-Means Clustering

We can initialize the centroids with the mean sentence embedding of manually labeled commentaries.

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans

# Load Sentence Transformer model
sent_emb_model = SentenceTransformer("all-MiniLM-L6-v2")

# Encode sentences into embeddings
play_by_play_embeddings = sent_emb_model.encode(
    labeled_play_by_play, normalize_embeddings=True
)
analysis_embeddings = sent_emb_model.encode(labeled_analysis, normalize_embeddings=True)
unlabeled_embeddings = sent_emb_model.encode(
    unlabeled_comments, normalize_embeddings=True
)

# Compute centroids for play-by-play and analysis
play_by_play_centroid = np.mean(play_by_play_embeddings, axis=0)
analysis_centroid = np.mean(analysis_embeddings, axis=0)

# Stack centroids for initialization
initial_centroids = np.vstack([play_by_play_centroid, analysis_centroid])

if len(unlabeled_comments) > 0:
    # Apply K-Means clustering with seeded centroids
    kmeans = KMeans(
        n_clusters=2, init=initial_centroids, n_init=1, max_iter=300, random_state=42
    )
    kmeans.fit(np.vstack([unlabeled_embeddings]))  # Fit on unlabeled data

    # Assign clusters
    labels = kmeans.predict(unlabeled_embeddings)

    # Output results
    for comment, label in zip(unlabeled_comments, labels):
        cluster_type = "Play-by-Play" if label == 0 else "Analysis"
        print(f"[{cluster_type}] {comment}")

## Logistic Regression with TF-IDF

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import StratifiedKFold

# Convert to DataFrame
df = pd.DataFrame(data, columns=["text", "label"])

# Encode labels: Play-by-play → 1, Analysis → 0
df["label"] = df["label"].map({"play-by-play": 1, "analysis": 0})

# Define 5-fold cross-validation
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

vectorizer = TfidfVectorizer(
    ngram_range=(1, 2), stop_words="english", max_features=5000
)
for train_index, test_index in kf.split(df["text"], df["label"]):
    train_texts, test_texts = df["text"].iloc[train_index], df["text"].iloc[test_index]
    train_labels, test_labels = (
        df["label"].iloc[train_index],
        df["label"].iloc[test_index],
    )

    # TF-IDF vectorization
    X_train = vectorizer.fit_transform(train_texts)
    X_test = vectorizer.transform(test_texts)

    # Train Logistic Regression model
    clf = LogisticRegression(max_iter=1000, class_weight="balanced")
    clf.fit(X_train, train_labels)

    # Predictions
    predictions = clf.predict(X_test)

    # Evaluation
    print(f"Accuracy: {accuracy_score(test_labels, predictions):.4f}")
    print(
        classification_report(
            test_labels, predictions, target_names=["Analysis", "Play-by-Play"]
        )
    )

## Logistic Regression with Sentence Embeddings

In [None]:
for train_index, test_index in kf.split(df["text"], df["label"]):
    train_texts, test_texts = df["text"].iloc[train_index], df["text"].iloc[test_index]
    train_labels, test_labels = (
        df["label"].iloc[train_index],
        df["label"].iloc[test_index],
    )

    # Encode texts using SBERT
    X_train = sent_emb_model.encode(train_texts.tolist())
    X_test = sent_emb_model.encode(test_texts.tolist())

    # Train Logistic Regression model with balanced class weights
    clf = LogisticRegression(max_iter=1000, class_weight="balanced")
    clf.fit(X_train, train_labels)

    # Predictions
    predictions = clf.predict(X_test)

    # Evaluation
    print(f"Accuracy: {accuracy_score(test_labels, predictions):.4f}")
    print(
        classification_report(
            test_labels, predictions, target_names=["Analysis", "Play-by-Play"]
        )
    )

## XGBoost with Sentence Embeddings

In [None]:
from xgboost import XGBClassifier

for train_index, test_index in kf.split(df["text"], df["label"]):
    train_texts, test_texts = df["text"].iloc[train_index], df["text"].iloc[test_index]
    train_labels, test_labels = (
        df["label"].iloc[train_index],
        df["label"].iloc[test_index],
    )

    # Encode texts using SBERT
    X_train = sent_emb_model.encode(train_texts.tolist())
    X_test = sent_emb_model.encode(test_texts.tolist())

    # Train XGBoost classifier
    xgb_clf = XGBClassifier(
        objective="binary:logistic",
        eval_metric="logloss",
        n_estimators=100,
        learning_rate=0.1,
        max_depth=4,
        random_state=42,
    )
    xgb_clf.fit(X_train, train_labels)

    # Predictions
    predictions = xgb_clf.predict(X_test)

    # Evaluation
    print(f"Accuracy: {accuracy_score(test_labels, predictions):.4f}")
    print(
        classification_report(
            test_labels, predictions, target_names=["Analysis", "Play-by-Play"]
        )
    )

## MLP Classifier with Sentence Embeddings

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import classification_report
from torch.utils.data import DataLoader, TensorDataset, random_split

# Encode labels: Play-by-play → 1, Analysis → 0
texts, labels = zip(*data)
labels = np.array([1 if label == "play-by-play" else 0 for label in labels])

# Encode sentences into SBERT embeddings
embeddings = sent_emb_model.encode(list(texts))

# Convert to PyTorch tensors
X_tensor = torch.tensor(embeddings, dtype=torch.float32)
y_tensor = torch.tensor(labels, dtype=torch.long)

# Train-validation-test split (70% train, 15% validation, 15% test)
train_size = int(0.7 * len(X_tensor))
val_size = int(0.15 * len(X_tensor))
test_size = len(X_tensor) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(
    TensorDataset(X_tensor, y_tensor), [train_size, val_size, test_size]
)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)


# Define MLP Classifier
class MLPClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, dropout=0.2):
        super(MLPClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_dim, 2)  # 2 classes: play-by-play vs analysis

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x


# Model initialization
input_dim = embeddings.shape[1]  # Dimension of SBERT embeddings
model = MLPClassifier(input_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Early stopping settings
best_val_loss = float("inf")
patience = 3  # Stop training if validation loss doesn't improve for `patience` epochs
patience_counter = 0
best_model_state = None

# Training loop with early stopping
num_epochs = 50  # Set high, early stopping will stop it if needed
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    # Validation step
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()

    # Compute average loss
    train_loss = total_loss / len(train_loader)
    val_loss = val_loss / len(val_loader)
    print(
        f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}"
    )

    # Check for early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_state = model.state_dict()  # Save the best model
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

# Load best model state
model.load_state_dict(best_model_state)

# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

# Print classification report
print(
    "\n"
    + classification_report(
        all_labels, all_preds, target_names=["Analysis", "Play-by-Play"]
    )
)

## BiLSTM Sequence Classification with Sentence Embeddings

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import classification_report
from torch.utils.data import DataLoader, TensorDataset, random_split

# Create sequences: BiLSTM requires sequential input (group sentences in sequences of 3)
sequence_length = 5
X_sequences = [
    X_tensor[i : i + sequence_length] for i in range(len(X_tensor) - sequence_length)
]
y_sequences = [
    y_tensor[i + sequence_length - 1] for i in range(len(y_tensor) - sequence_length)
]

# Convert to dataset
X_sequences = torch.stack(X_sequences)
y_sequences = torch.tensor(y_sequences)

# Train-validation-test split (70% train, 15% validation, 15% test)
train_size = int(0.7 * len(X_sequences))
val_size = int(0.15 * len(X_sequences))
test_size = len(X_sequences) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(
    TensorDataset(X_sequences, y_sequences), [train_size, val_size, test_size]
)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)


# Define BiLSTM Classifier
class BiLSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, num_layers=2, dropout=0.3):
        super(BiLSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(
            input_dim,
            hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
        )
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, 2)  # *2 for bidirectional

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)  # h_n is (num_layers * 2, batch_size, hidden_dim)
        h_n = torch.cat(
            (h_n[-2], h_n[-1]), dim=1
        )  # Concatenate last forward & backward states
        h_n = self.dropout(h_n)
        return self.fc(h_n)


# Model initialization
input_dim = embeddings.shape[1]  # SBERT embedding size
model = BiLSTMClassifier(input_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Early stopping settings
best_val_loss = float("inf")
patience = 3
patience_counter = 0
best_model_state = None

# Training loop with early stopping
num_epochs = 50  # Set high, early stopping will stop it if needed
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    # Validation step
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()

    # Compute average loss
    train_loss = total_loss / len(train_loader)
    val_loss = val_loss / len(val_loader)
    print(
        f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}"
    )

    # Check for early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_state = model.state_dict()
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

# Load best model state
model.load_state_dict(best_model_state)

# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

# Print classification report
print(
    "\n"
    + classification_report(
        all_labels, all_preds, target_names=["Analysis", "Play-by-Play"]
    )
)

## MLP Sequence Classification with Sentence Embeddings

In [None]:
sequence_length = 10
X_sequences = [
    X_tensor[i : i + sequence_length] for i in range(len(X_tensor) - sequence_length)
]
y_sequences = [
    y_tensor[i + sequence_length - 1] for i in range(len(y_tensor) - sequence_length)
]

# Convert to dataset
X_sequences = torch.stack(X_sequences)
y_sequences = torch.tensor(y_sequences)

# Train-validation-test split (70% train, 15% validation, 15% test)
train_size = int(0.7 * len(X_sequences))
val_size = int(0.15 * len(X_sequences))
test_size = len(X_sequences) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(
    TensorDataset(X_sequences, y_sequences), [train_size, val_size, test_size]
)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)


class MLPSeqClassifier(nn.Module):
    def __init__(
        self, input_dim, seq_len=3, hidden_dim=256, num_classes=2, dropout=0.3
    ):
        super().__init__()
        self.fc1 = nn.Linear(input_dim * seq_len, hidden_dim)  # Flatten sequence input
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(
            hidden_dim, num_classes
        )  # Output layer (Softmax for 2 classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.flatten(1)
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x)  # (batch, num_classes)


# Model initialization
input_dim = embeddings.shape[1]  # SBERT embedding size
model = MLPSeqClassifier(input_dim, seq_len=sequence_length)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Early stopping settings
best_val_loss = float("inf")
patience = 3
patience_counter = 0
best_model_state = None

# Training loop with early stopping
num_epochs = 50  # Set high, early stopping will stop it if needed
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    # Validation step
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()

    # Compute average loss
    train_loss = total_loss / len(train_loader)
    val_loss = val_loss / len(val_loader)
    print(
        f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}"
    )

    # Check for early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_state = model.state_dict()
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

# Load best model state
model.load_state_dict(best_model_state)

# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

# Print classification report
print(
    "\n"
    + classification_report(
        all_labels, all_preds, target_names=["Analysis", "Play-by-Play"]
    )
)