In [5]:
!pip install tensorboardX catboost

Collecting tensorboardX
  Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl.metadata (5.8 kB)
Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl (101 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.7/101.7 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tensorboardX
Successfully installed tensorboardX-2.6.2.2


In [None]:
import shutil
import os

source_dir = '/kaggle/input/ball-detect-train-dataset/datasets'
dest_dir = '/kaggle/working/datasets'

shutil.copytree(source_dir, dest_dir)

print("✅ Dataset copied successfully!")


In [6]:
import os
import numpy as np
import pandas as pd
import cv2
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
import catboost as ctb
from scipy.spatial import distance
from itertools import groupby
from tensorboardX import SummaryWriter

In [None]:
#GT folder generation 
# GT Generation Configuration
SIZE = 30
VARIANCE = 15 
WIDTH = 1280
HEIGHT = 720

def gaussian_kernel(size, variance):
    x, y = np.mgrid[-size:size+1, -size:size+1]
    g = np.exp(-(x**2+y**2)/float(2*variance))
    return g

def create_gaussian(size, variance):
    gaussian_kernel_array = gaussian_kernel(size, variance)
    gaussian_kernel_array = gaussian_kernel_array * 255/gaussian_kernel_array[int(len(gaussian_kernel_array)/2)][int(len(gaussian_kernel_array)/2)]
    gaussian_kernel_array = gaussian_kernel_array.astype(int)
    return gaussian_kernel_array

In [None]:
#GT cell-2
def create_gt_images(path_input, path_output):
    gaussian_kernel_array = create_gaussian(SIZE, VARIANCE)
    for game_id in range(1,11):
        game = 'game{}'.format(game_id)
        clips = os.listdir(os.path.join(path_input, game))
        for clip in clips:
            print('Generating GT for game = {}, clip = {}'.format(game, clip))

            path_out_game = os.path.join(path_output, game)
            os.makedirs(path_out_game, exist_ok=True)

            path_out_clip = os.path.join(path_out_game, clip)    
            os.makedirs(path_out_clip, exist_ok=True)

            path_labels = os.path.join(path_input, game, clip, 'Label.csv')
            labels = pd.read_csv(path_labels)    
            for idx in range(labels.shape[0]):
                file_name, vis, x, y, _ = labels.loc[idx, :]
                heatmap = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)
                if vis != 0:
                    x = int(x)
                    y = int(y)
                    for i in range(-SIZE, SIZE+1):
                        for j in range(-SIZE, SIZE+1):
                                if x+i<WIDTH and x+i>=0 and y+j<HEIGHT and y+j>=0:
                                    temp = gaussian_kernel_array[i+SIZE][j+SIZE]
                                    if temp > 0:
                                        heatmap[y+j,x+i] = (temp,temp,temp)

                cv2.imwrite(os.path.join(path_out_clip, file_name), heatmap)
    print("GT image generation complete!")

In [None]:
#GT cell-3
def create_gt_labels(path_input, path_output, train_rate=0.7):
    df = pd.DataFrame()
    for game_id in range(1,11):
        game = 'game{}'.format(game_id)
        clips = os.listdir(os.path.join(path_input, game))
        for clip in clips:
            labels = pd.read_csv(os.path.join(path_input, game, clip, 'Label.csv'))
            labels['gt_path'] = 'gts/' + game + '/' + clip + '/' + labels['file name']
            labels['path1'] = 'images/' + game + '/' + clip + '/' + labels['file name']
            labels_target = labels[2:]
            labels_target.loc[:, 'path2'] = list(labels['path1'][1:-1])
            labels_target.loc[:, 'path3'] = list(labels['path1'][:-2])
            df = pd.concat([df, labels_target])
    
    df = df.reset_index(drop=True) 
    df = df[['path1', 'path2', 'path3', 'gt_path', 'x-coordinate', 'y-coordinate', 'status', 'visibility']]
    df = df.sample(frac=1)
    num_train = int(df.shape[0]*train_rate)
    df_train = df[:num_train]
    df_test = df[num_train:]
    
    os.makedirs(path_output, exist_ok=True)
    df_train.to_csv(os.path.join(path_output, 'labels_train.csv'), index=False)
    df_test.to_csv(os.path.join(path_output, 'labels_val.csv'), index=False)
    print("Label CSVs created successfully!")

In [None]:
# Set your paths (modify according to your Kaggle dataset structure)
INPUT_PATH = '/kaggle/input/ball-detect-train-dataset/datasets/trackNet/images'  # Where original data is
OUTPUT_PATH = '/kaggle/working/datasets/trackNet'  # Where to save generated data

# Run the generation
create_gt_images(INPUT_PATH, os.path.join(OUTPUT_PATH, 'gts'))
create_gt_labels(INPUT_PATH, OUTPUT_PATH)

print("Ground truth generation complete!")

In [7]:
#dataset class
class trackNetDataset(Dataset):
    def __init__(self, mode, input_width=1280 , input_height=720):
        self.path_dataset = '/kaggle/working/datasets/trackNet'  # Update path as needed
        assert mode in ['train', 'val'], 'incorrect mode'
        self.data = pd.read_csv(os.path.join(self.path_dataset, 'labels_{}.csv'.format(mode)))
        print('mode = {}, samples = {}'.format(mode, self.data.shape[0]))         
        self.height = input_height
        self.width = input_width
        
    def __len__(self):
        return self.data.shape[0]
    
    def __getitem__(self, idx):
        path, path_prev, path_preprev, path_gt, x, y, status, vis = self.data.loc[idx, :]
        
        path = os.path.join(self.path_dataset, path)
        path_prev = os.path.join(self.path_dataset, path_prev)
        path_preprev = os.path.join(self.path_dataset, path_preprev)
        path_gt = os.path.join(self.path_dataset, path_gt)
        if math.isnan(x):
            x = -1
            y = -1
        
        inputs = self.get_input(path, path_prev, path_preprev)
        outputs = self.get_output(path_gt)
        
        return inputs, outputs, x, y, vis
    
    def get_output(self, path_gt):
        img = cv2.imread(path_gt)
        img = cv2.resize(img, (self.width, self.height))
        img = img[:, :, 0]
        img = np.reshape(img, (self.width * self.height))
        return img
        
    def get_input(self, path, path_prev, path_preprev):
        img = cv2.imread(path)
        img = cv2.resize(img, (self.width, self.height))

        img_prev = cv2.imread(path_prev)
        img_prev = cv2.resize(img_prev, (self.width, self.height))
        
        img_preprev = cv2.imread(path_preprev)
        img_preprev = cv2.resize(img_preprev, (self.width, self.height))
        
        imgs = np.concatenate((img, img_prev, img_preprev), axis=2)
        imgs = imgs.astype(np.float32)/255.0

        imgs = np.rollaxis(imgs, 2, 0)
        return imgs

In [8]:
#Model defination

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, pad=1, stride=1, bias=True):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=pad, bias=bias),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels)
        )

    def forward(self, x):
        return self.block(x)

class BallTrackerNet(nn.Module):
    def __init__(self, out_channels=256):
        super().__init__()
        self.out_channels = out_channels

        self.conv1 = ConvBlock(in_channels=9, out_channels=64)
        self.conv2 = ConvBlock(in_channels=64, out_channels=64)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = ConvBlock(in_channels=64, out_channels=128)
        self.conv4 = ConvBlock(in_channels=128, out_channels=128)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv5 = ConvBlock(in_channels=128, out_channels=256)
        self.conv6 = ConvBlock(in_channels=256, out_channels=256)
        self.conv7 = ConvBlock(in_channels=256, out_channels=256)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv8 = ConvBlock(in_channels=256, out_channels=512)
        self.conv9 = ConvBlock(in_channels=512, out_channels=512)
        self.conv10 = ConvBlock(in_channels=512, out_channels=512)
        self.ups1 = nn.Upsample(scale_factor=2)
        self.conv11 = ConvBlock(in_channels=512, out_channels=256)
        self.conv12 = ConvBlock(in_channels=256, out_channels=256)
        self.conv13 = ConvBlock(in_channels=256, out_channels=256)
        self.ups2 = nn.Upsample(scale_factor=2)
        self.conv14 = ConvBlock(in_channels=256, out_channels=128)
        self.conv15 = ConvBlock(in_channels=128, out_channels=128)
        self.ups3 = nn.Upsample(scale_factor=2)
        self.conv16 = ConvBlock(in_channels=128, out_channels=64)
        self.conv17 = ConvBlock(in_channels=64, out_channels=64)
        self.conv18 = ConvBlock(in_channels=64, out_channels=self.out_channels)

        self.softmax = nn.Softmax(dim=1)
        self._init_weights()
                  
    def forward(self, x, testing=False): 
        batch_size = x.size(0)
        x = self.conv1(x)
        x = self.conv2(x)    
        x = self.pool1(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.pool2(x)
        x = self.conv5(x)
        x = self.conv6(x)
        x = self.conv7(x)
        x = self.pool3(x)
        x = self.conv8(x)
        x = self.conv9(x)
        x = self.conv10(x)
        x = self.ups1(x)
        x = self.conv11(x)
        x = self.conv12(x)
        x = self.conv13(x)
        x = self.ups2(x)
        x = self.conv14(x)
        x = self.conv15(x)
        x = self.ups3(x)
        x = self.conv16(x)
        x = self.conv17(x)
        x = self.conv18(x)
        out = x.reshape(batch_size, self.out_channels, -1)
        if testing:
            out = self.softmax(out)
        return out                       
    
    def _init_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Conv2d):
                nn.init.uniform_(module.weight, -0.05, 0.05)
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)
            elif isinstance(module, nn.BatchNorm2d):
                nn.init.constant_(module.weight, 1)
                nn.init.constant_(module.bias, 0)

In [9]:
#Utility functions

def postprocess(feature_map, scale=2):
    feature_map *= 255
    feature_map = feature_map.reshape((360, 640))
    feature_map = feature_map.astype(np.uint8)
    ret, heatmap = cv2.threshold(feature_map, 127, 255, cv2.THRESH_BINARY)
    circles = cv2.HoughCircles(heatmap, cv2.HOUGH_GRADIENT, dp=1, minDist=1, param1=50, param2=2, minRadius=2,
                               maxRadius=7)
    x,y = None, None
    if circles is not None:
        if len(circles) == 1:
            x = circles[0][0][0]*scale
            y = circles[0][0][1]*scale
    return x, y

def train(model, train_loader, optimizer, device, epoch, max_iters=200):
    start_time = time.time()
    losses = []
    criterion = nn.CrossEntropyLoss()
    for iter_id, batch in enumerate(train_loader):
        optimizer.zero_grad()
        model.train()
        out = model(batch[0].float().to(device))
        gt = torch.tensor(batch[1], dtype=torch.long, device=device)
        loss = criterion(out, gt)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        end_time = time.time()
        duration = time.strftime("%H:%M:%S", time.gmtime(end_time - start_time))
        print('train | epoch = {}, iter = [{}|{}], loss = {}, time = {}'.format(epoch, iter_id, max_iters,
                                                                                round(loss.item(), 6), duration))
        losses.append(loss.item())
        
        if iter_id > max_iters - 1:
            break
        
    return np.mean(losses)

def validate(model, val_loader, device, epoch, min_dist=5):
    losses = []
    tp = [0, 0, 0, 0]
    fp = [0, 0, 0, 0]
    tn = [0, 0, 0, 0]
    fn = [0, 0, 0, 0]
    criterion = nn.CrossEntropyLoss()
    model.eval()
    for iter_id, batch in enumerate(val_loader):
        with torch.no_grad():
            out = model(batch[0].float().to(device))
            gt = torch.tensor(batch[1], dtype=torch.long, device=device)
            loss = criterion(out, gt)
            losses.append(loss.item())
            # metrics
            output = out.argmax(dim=1).detach().cpu().numpy()
            for i in range(len(output)):
                x_pred, y_pred = postprocess(output[i])
                x_gt = batch[2][i]
                y_gt = batch[3][i]
                vis = batch[4][i]
                if x_pred:
                    if vis != 0:
                        dst = distance.euclidean((x_pred, y_pred), (x_gt, y_gt))
                        if dst < min_dist:
                            tp[vis] += 1
                        else:
                            fp[vis] += 1
                    else:        
                        fp[vis] += 1
                if not x_pred:
                    if vis != 0:
                        fn[vis] += 1
                    else:
                        tn[vis] += 1
            print('val | epoch = {}, iter = [{}|{}], loss = {}, tp = {}, tn = {}, fp = {}, fn = {} '.format(epoch,
                                                                                                            iter_id,
                                                                                                            len(val_loader),
                                                                                                            round(np.mean(losses), 6),
                                                                                                            sum(tp),
                                                                                                            sum(tn),
                                                                                                            sum(fp),
                                                                                                            sum(fn)))
    eps = 1e-15
    precision = sum(tp) / (sum(tp) + sum(fp) + eps)
    vc1 = tp[1] + fp[1] + tn[1] + fn[1]
    vc2 = tp[2] + fp[2] + tn[2] + fn[2]
    vc3 = tp[3] + fp[3] + tn[3] + fn[3]
    recall = sum(tp) / (vc1 + vc2 + vc3 + eps)
    f1 = 2 * precision * recall / (precision + recall + eps)
    print('precision = {}'.format(precision))
    print('recall = {}'.format(recall))
    print('f1 = {}'.format(f1))

    return np.mean(losses), precision, recall, f1

In [None]:
#test
def main():
    # Configuration
    batch_size = 1
    num_epochs = 1000
    lr = 0.0001
    val_intervals = 1
    train_steps_per_epoch = 200
    val_steps_per_epoch = 200
    exp_id = 'default'
    
    # Initialize datasets and loaders (same as before)
    train_dataset = trackNetDataset('train',1280,720)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
    
    val_dataset = trackNetDataset('val',1280,720)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
    
    # Initialize model
    model = BallTrackerNet()
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = model.to(device)
    
    # Setup logging
    exps_path = f'/kaggle/working/exps/{exp_id}'
    os.makedirs(exps_path, exist_ok=True)
    log_writer = SummaryWriter(os.path.join(exps_path, 'plots'))
    
    # Model paths
    model_last_path = os.path.join(exps_path, 'model_last.pth')
    model_best_path = os.path.join(exps_path, 'model_best.pth')
    
    # Training setup
    optimizer = optim.Adadelta(model.parameters(), lr=lr)
    val_best_metric = -1  # Initialize to -1 instead of 0
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_losses = []
        for batch_idx, batch in enumerate(train_loader):
            if batch_idx >= train_steps_per_epoch:
                break
                
            optimizer.zero_grad()
            out = model(batch[0].float().to(device))
            gt = batch[1].long().to(device)
            loss = nn.CrossEntropyLoss()(out, gt)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())
        
        avg_train_loss = np.mean(train_losses)
        print(f'Epoch {epoch+1} Train Loss: {avg_train_loss:.4f}')
        
        # Validation phase
        if epoch % val_intervals == 0:
            model.eval()
            val_losses = []
            all_tp, all_fp, all_tn, all_fn = 0, 0, 0, 0
            
            with torch.no_grad():
                for batch_idx, batch in enumerate(val_loader):
                    if batch_idx >= val_steps_per_epoch:
                        break
                        
                    out = model(batch[0].float().to(device))
                    gt = batch[1].long().to(device)
                    val_losses.append(nn.CrossEntropyLoss()(out, gt).item())
                    
                    # Calculate metrics
                    preds = out.argmax(dim=1).cpu().numpy()
                    for i in range(len(preds)):
                        x_pred, y_pred = postprocess(preds[i])
                        x_gt, y_gt, vis = batch[2][i], batch[3][i], batch[4][i]
                        
                        if x_pred is not None:  # Prediction exists
                            if vis != 0:  # Ball is visible
                                dst = distance.euclidean((x_pred, y_pred), (x_gt, y_gt))
                                if dst < 5:  # Correct prediction
                                    all_tp += 1
                                else:  # Wrong position
                                    all_fp += 1
                            else:  # False positive (predicted but no ball)
                                all_fp += 1
                        else:  # No prediction
                            if vis != 0:  # Missed ball
                                all_fn += 1
                            else:  # Correct non-detection
                                all_tn += 1
            
            # Calculate metrics
            eps = 1e-7
            precision = all_tp / (all_tp + all_fp + eps)
            recall = all_tp / (all_tp + all_fn + eps)
            f1 = 2 * (precision * recall) / (precision + recall + eps)
            
            print(f'Validation Metrics - Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}')
            
            # Always save last model
            torch.save(model.state_dict(), model_last_path)
            
            # Save best model
            if f1 > val_best_metric:
                val_best_metric = f1
                torch.save(model.state_dict(), model_best_path)
                print(f'New best model saved with F1: {f1:.4f}')
                
    print("Training complete!")
    print(f"Final best validation F1: {val_best_metric:.4f}")

if __name__ == '__main__':
    main()