In [None]:
!pip install ultralytics yt-dlp opencv-python pillow matplotlib pandas numpy requests

In [None]:
import os
import random
import time
import requests
import cv2
import json
import yt_dlp
import pandas as pd
import numpy as np
from PIL import Image
from pathlib import Path
import matplotlib.pyplot as plt
from ultralytics import YOLO
import shutil
from scipy.interpolate import splprep, splev
from collections import defaultdict

def create_project_structure():
  base_dir = '/content/sports_ball_detection'
  directories = [
    'data/raw_videos',
    'data/frames',
    'data/frames_augmented',
    'data/annotations',
    'data/yolo_dataset/images/train',
    'data/yolo_dataset/images/val',
    'data/yolo_dataset/labels/train',
    'data/yolo_dataset/labels/val',
    'results/visualisations',
    'results/metrics'
  ]

  for dir_path in directories:
    full_path = os.path.join(base_dir, dir_path)
    os.makedirs(full_path, exist_ok=True)

  return base_dir

def get_dataset_metadata(num):
  # Create a temporary directory for the data
  os.makedirs('/content/temp', exist_ok=True)

  # Clone the repository to get the metadata
  !git clone https://github.com/gtoderici/sports-1m-dataset.git /content/temp/sports-1m-dataset

  # Read the labels file
  with open('/content/temp/sports-1m-dataset/labels.txt', 'r') as f:
      labels = [line.strip() for line in f.readlines()]

  # Read the training partition file
  with open('/content/temp/sports-1m-dataset/original/train_partition.txt', 'r') as f:
      train_videos = [line.strip().split(' ') for line in f.readlines()]

  # Convert to a more manageable format
  video_data = []
  for video in train_videos[:num]:  # Only process first N for speed
    if len(video) >= 2:
      url, labels_str = video
      video_id = url.split('=')[-1]
      labels_list = [int(l) for l in labels_str.split(',')]
      video_data.append({
        'id': video_id,
        'label487': labels_list,
        'stitle': f"Video {video_id}",
        'labels_text': [labels[l] for l in labels_list]
      })

  return video_data

def download_video(video_id, output_path, max_retries=3):
  """
  Download video from youtube using yt-dlp
  """
  url = f"https://www.youtube.com/watch?v={video_id}"
  output_template = os.path.join(output_path, f"{video_id}.mp4")

  # First verify if video exists
  try:
    response = requests.get(url)
    if response.status_code != 200:
      print(f"Video {video_id} is not available (Status code: {response.status_code})")
      return None
  except Exception as e:
    print(f"Error checking video availability for {video_id}: {str(e)}")
    return None

  # Configure yt-dlp options
  ydl_opts = {
    'format': 'worst[ext=mp4]',  # Get lowest quality to save time
    'outtmpl': output_template,
    'quiet': True,
    'no_warnings': True,
    'ignoreerrors': True,
    'nocheckcertificate': True,
    'extract_flat': False,
    'socket_timeout': 30,
  }

  for attempt in range(max_retries):
    try:
      with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])

      if os.path.exists(output_template):
        return output_template

    except Exception as e:
      print(f"Attempt {attempt + 1}/{max_retries} failed for video {video_id}: {str(e)}")
      time.sleep(2 * (attempt + 1))  # Exponential backoff

      if attempt == max_retries - 1:
        print(f"Failed to download video {video_id} after {max_retries} attempts")
        return None

  return None

def extract_frames(video_path, output_dir, sample_rate=1):
  """
  Extract frames from video at given sample rate (1 frame every N seconds)
  """
  if not os.path.exists(video_path):
    return []

  cap = cv2.VideoCapture(video_path)
  fps = cap.get(cv2.CAP_PROP_FPS)
  frame_interval = int(fps * sample_rate)
  frame_paths = []

  os.makedirs(output_dir, exist_ok=True)
  frame_count = 0
  while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
      break

    if frame_count % frame_interval == 0:
      frame_path = os.path.join(
        output_dir,
        f"{os.path.basename(video_path)}_{frame_count}.jpg"
      )
      cv2.imwrite(frame_path, frame)
      frame_paths.append(frame_path)

    frame_count += 1

  cap.release()
  return frame_paths

def create_initial_dataset(video_data, base_dir, num_videos=5, max_attempts=10):
  """
  Create a initial dataset
  """
  filtered_videos = []
  for video in video_data:
    if any('ball' in label.lower() for label in video['labels_text']): # select videos with 'ball' in label
      filtered_videos.append(video)

  if not filtered_videos:
    raise ValueError("No videos found matching the sports criteria")

  dataset_info = []
  attempts = 0
  processed_video_ids = set()

  # Keep trying until we get the desired number of videos or reach max attempts
  while len(dataset_info) < num_videos and attempts < max_attempts:
    # Get random videos we haven't tried yet
    remaining_videos = [v for v in filtered_videos if v['id'] not in processed_video_ids]
    if not remaining_videos:
      print("No more videos available to try")
      break

    selected_video = random.choice(remaining_videos)
    video_id = selected_video['id']
    processed_video_ids.add(video_id)

    print(f"\nAttempting to process video {video_id} ({len(dataset_info) + 1}/{num_videos})")

    try:
      video_path = download_video(video_id, os.path.join(base_dir, 'data/raw_videos'))
      if video_path:
        frames_dir = os.path.join(base_dir, 'data/frames', video_id)
        print(frames_dir)
        frame_paths = extract_frames(video_path, frames_dir)

        if frame_paths:
          dataset_info.append({
            'video_id': video_id,
            'frame_paths': frame_paths,
            'labels': selected_video['labels_text'],
            'label_ids': selected_video['label487']
          })
          print(f"Successfully processed video {video_id}")
        else:
          print(f"No frames could be extracted from video {video_id}")
      else:
        print(f"Failed to download video {video_id}")
    except Exception as e:
      print(f"Error processing video {video_id}: {str(e)}")
    finally:
      # Clean up video file
      try:
          os.remove(video_path)
          print(f"Cleaned up video file: {video_path}")
      except Exception as e:
          print(f"Error cleaning up video file {video_path}: {str(e)}")

    attempts += 1

  print(f"\nFinished processing. Successfully downloaded {len(dataset_info)} videos after {attempts} attempts")
  return dataset_info

class BallDetectionMetrics:
    def __init__(self, base_dir):
      self.metrics_history = []
      self.base_dir = base_dir

    def calculate_metrics(self, model):
      """Calculate precision, recall, and mAP for the model"""
      dataset_yaml = os.path.join(self.base_dir, 'data', 'yolo_dataset', 'dataset.yaml')

      if not os.path.exists(dataset_yaml):
        raise ValueError("No dataset.yaml found. Please ensure you have valid data before training.")

      results = model.val(data=dataset_yaml)
      metrics = {
        'mAP50-95': results.box.map,
        'mAP50': results.box.map50,
        'mAP75': results.box.map75
      }
      self.metrics_history.append(metrics)
      return metrics

    def plot_metrics_history(self, save_path):
        """Plot metrics history over iterations"""
        metrics = pd.DataFrame(self.metrics_history)
        plt.figure(figsize=(12, 6))
        for column in metrics.columns:
          plt.plot(metrics[column], label=column)
        plt.xlabel('Iteration')
        plt.ylabel('Score')
        plt.title('Model Performance Over Iterations')
        plt.legend()
        plt.savefig(save_path)
        plt.close()

class BallDetectionPipeline:
  def __init__(self, base_dir):
    self.base_dir = base_dir
    self.model = YOLO('yolov8x.pt', verbose=False)
    self.ball_classes = [32]  # 'sports ball' class
    self.metrics = BallDetectionMetrics(base_dir=base_dir)

  def generate_pseudo_annotations(self, dataset_info):
    """Generate annotations using YOLOv8 pretrained model"""
    annotations = []

    for video in dataset_info:
      print(f"Processing video {video['video_id']}")
      video_annotations = []
      for frame_path in video['frame_paths']:
        results = self.model(frame_path, verbose=False)

        for r in results:
          boxes = r.boxes
          for box in boxes:
            if int(box.cls) in self.ball_classes:
              x1, y1, x2, y2 = box.xyxy[0].tolist()
              img = Image.open(frame_path)
              w, h = img.size
              x_center = (x1 + x2) / (2 * w)
              y_center = (y1 + y2) / (2 * h)
              width = (x2 - x1) / w
              height = (y2 - y1) / h

              video_annotations.append({
                'frame_path': frame_path,
                'bbox': [x_center, y_center, width, height],
                'confidence': float(box.conf),
                'class': int(box.cls)
              })

      if video_annotations:
        annotations.append({
            'video_id': video['video_id'],
            'annotations': video_annotations
        })

    return annotations

  def save_yolo_annotations(self, annotations, confidence_threshold=0.5, is_validation=False):
    """Save annotations in YOLO format with train/val split"""
    if not annotations:
      print("No annotations to save. Skipping dataset creation.")
      return False

    dataset_dir = os.path.join(self.base_dir, 'data', 'yolo_dataset')
    split = 'val' if is_validation else 'train'

    yaml_path = os.path.join(dataset_dir, 'dataset.yaml')

    saved_any_images = False

    for video_data in annotations:
      video_id = video_data['video_id']

      video_img_dir = os.path.join(dataset_dir, 'images', split, str(video_id))
      video_label_dir = os.path.join(dataset_dir, 'labels', split, str(video_id))
      os.makedirs(video_img_dir, exist_ok=True)
      os.makedirs(video_label_dir, exist_ok=True)

      high_conf_annotations = [ann for ann in video_data['annotations']
                            if ann['confidence'] >= confidence_threshold]

      if high_conf_annotations:
        saved_any_images = True
        for ann in high_conf_annotations:
          img_path = Path(ann['frame_path'])
          new_img_path = os.path.join(video_img_dir, img_path.name)
          os.system(f'cp {img_path} {new_img_path}')

          label_path = os.path.join(video_label_dir, img_path.stem + '.txt')
          with open(label_path, 'w') as f:
            f.write(f"0 {' '.join(map(str, ann['bbox']))}\n")

    if saved_any_images:
      if not os.path.exists(yaml_path):
        yaml_content = {
          'path': dataset_dir,
          'train': 'images/train',
          'val': 'images/val',
          'names': {0: 'sports_ball'}
        }
        with open(yaml_path, 'w') as f:
          json.dump(yaml_content, f, indent=2)
    return saved_any_images

  def train_model(self, epochs=100):
    """Train YOLOv8 model on our dataset"""
    dataset_yaml = os.path.join(self.base_dir, 'data', 'yolo_dataset', 'dataset.yaml')

    if not os.path.exists(dataset_yaml):
      raise ValueError("No dataset.yaml found. Please ensure you have valid data before training.")

    images_dir = os.path.join(self.base_dir, 'data', 'yolo_dataset', 'images')
    if not os.path.exists(images_dir) or not any(os.scandir(images_dir)):
      raise ValueError("No training data found. Please ensure you have valid images before training.")

    self.model.train(
      data=dataset_yaml,
      epochs=epochs,
      imgsz=640,
      batch=16,
      name='ball_detection',
      verbose=False
    )

  def visualise_detections(self, annotations, num_frames=5, save_dir=None):
    """
    Visualise ball detections on frames with bounding boxes, organised by video ID
    """
    if save_dir is None:
      save_dir = os.path.join(self.base_dir, 'results', 'visualisations')

    # Process each video separately
    saved_paths = []

    for video_data in annotations:
      video_id = video_data['video_id']
      video_save_dir = os.path.join(save_dir, str(video_id))
      os.makedirs(video_save_dir, exist_ok=True)

      # Sort frames by confidence for this video
      frames_with_balls = sorted(
        video_data['annotations'],
        key=lambda x: x['confidence'],
        reverse=True
      )[:num_frames]

      for i, detection in enumerate(frames_with_balls):
        # Read the image
        frame_path = detection['frame_path']
        image = cv2.imread(frame_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Get image dimensions
        height, width = image.shape[:2]

        # Convert YOLO format to pixel coordinates
        x_center, y_center, box_width, box_height = detection['bbox']
        x_center *= width
        y_center *= height
        box_width *= width
        box_height *= height

        # Calculate box corners
        x1 = int(x_center - box_width/2)
        y1 = int(y_center - box_height/2)
        x2 = int(x_center + box_width/2)
        y2 = int(y_center + box_height/2)

        # Draw the bounding box
        color = (255, 0, 0)  # Red
        thickness = 2
        image = cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness)

        # Add confidence score and video ID
        conf_text = f"Conf:{detection['confidence']:.2f}"
        cv2.putText(image, conf_text, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, thickness)

        # Save the image
        save_path = os.path.join(video_save_dir, f'detection_{i+1}.jpg')
        image_pil = Image.fromarray(image)
        image_pil.save(save_path)
        saved_paths.append(save_path)

        print(f"Saved visualisation for video {video_id}: {i+1}/{len(frames_with_balls)} to {save_path}")

    return saved_paths

  def generate_detection_video(self, video_path, output_path, confidence_threshold=0.5):
    """
    Generate a video with ball detection boxes overlaid on original footage
    """
    # Open the input video
    cap = cv2.VideoCapture(video_path)

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Setup video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Tracking detections across frames
    frame_count = 0
    total_detections = []

    # Progress tracking
    print(f"Processing video: {os.path.basename(video_path)}")
    print(f"Total frames: {total_frames}")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Detect balls in the current frame
        results = self.model(frame, verbose=False)

        # Store detections for this frame
        frame_detections = []

        for r in results:
            boxes = r.boxes
            for box in boxes:
                # Check if detection is a ball and meets confidence threshold
                if (int(box.cls) in self.ball_classes and
                    float(box.conf) >= confidence_threshold):

                    # Convert YOLO format to pixel coordinates
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
                    confidence = float(box.conf)

                    # Draw bounding box
                    color = (0, 255, 0)  # Green color
                    thickness = 2
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness)

                    # Add confidence text
                    text = f'Ball: {confidence:.2f}'
                    cv2.putText(frame, text, (x1, y1-10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

                    # Store detection details
                    frame_detections.append({
                        'frame': frame_count,
                        'bbox': [x1, y1, x2, y2],
                        'confidence': confidence
                    })

        # Write annotated frame to output video
        out.write(frame)

        # Store detections
        if frame_detections:
            total_detections.append({
                'frame': frame_count,
                'detections': frame_detections
            })

        frame_count += 1

        # Progress update
        if frame_count % 100 == 0:
            print(f"Processed {frame_count}/{total_frames} frames")

    # Release video objects
    cap.release()
    out.release()

    # Generate detection summary
    print("\nDetection Summary:")
    print(f"Total frames processed: {frame_count}")
    print(f"Frames with ball detections: {len(total_detections)}")

    # Optional: Save detection details to a JSON for further analysis
    detection_summary_path = output_path.replace('.mp4', '_detections.json')
    with open(detection_summary_path, 'w') as f:
        json.dump(total_detections, f, indent=2)

    return output_path

class DatasetImprovement:
    def __init__(self, pipeline, base_dir):
      self.pipeline = pipeline
      self.base_dir = base_dir

    def cleanup_iteration_data(self):
      """Clean up the YOLO dataset directory between iterations"""
      dataset_dir = os.path.join(self.base_dir, 'data', 'yolo_dataset')
      paths_to_clean = [
        os.path.join(dataset_dir, 'images', 'train'),
        os.path.join(dataset_dir, 'images', 'val'),
        os.path.join(dataset_dir, 'labels', 'train'),
        os.path.join(dataset_dir, 'labels', 'val')
      ]

      for path in paths_to_clean:
        if os.path.exists(path):
          shutil.rmtree(path)
          os.makedirs(path)  # Recreate empty directory

      # Also clean up augmented frames
      aug_dir = os.path.join(self.base_dir, 'data', 'frames_augmented')
      if os.path.exists(aug_dir):
        shutil.rmtree(aug_dir)
        os.makedirs(aug_dir)

    def split_dataset(self, dataset_info, val_split=0.2):
      """Split dataset into training and validation sets"""
      random.shuffle(dataset_info)
      split_idx = int(len(dataset_info) * (1 - val_split))
      return dataset_info[:split_idx], dataset_info[split_idx:]

    def identify_easy_cases(self, annotations, confidence_threshold=0.8):
      """Identify frames where model is most confident"""
      easy_cases = []
      for video in annotations:
        video_easy_cases = [
          ann for ann in video['annotations']
          if confidence_threshold <= ann['confidence'] <= 1.0
        ]
        if video_easy_cases:
          easy_cases.append({
            'video_id': video['video_id'],
            'annotations': video_easy_cases
          })
      return easy_cases

    def augment_annotations(self, annotations, confidence_threshold=0.5):
      """
      Augment high-confidence detections with their corresponding annotations
      Returns new annotations in YOLO format
      """
      augmented_annotations = []

      for video_data in annotations:
        video_id = video_data['video_id']
        high_conf_detections = [
          ann for ann in video_data['annotations']
          if ann['confidence'] >= confidence_threshold
        ]

        if not high_conf_detections:
          continue

        aug_annotations = []

        for detection in high_conf_detections:
          # Read original image
          img = cv2.imread(detection['frame_path'])
          if img is None:
            continue

          original_h, original_w = img.shape[:2]
          bbox = detection['bbox']  # [x_center, y_center, width, height] in normalised coordinates

          # Create augmentations
          augmentations = [
            ('bright', self._brightness_augmentation(img)),
            ('flip', self._horizontal_flip_augmentation(img))
          ]

          for aug_name, aug_img in augmentations:
            # Create augmented image filename and path
            base_name = os.path.basename(detection['frame_path'])
            aug_img_name = f"{os.path.splitext(base_name)[0]}_{aug_name}.jpg"

            # Save augmented image to YOLO dataset directory
            aug_img_path = os.path.join(
              self.base_dir, 'data', 'yolo_dataset',
              'images', 'train', video_id,
              aug_img_name
            )
            os.makedirs(os.path.dirname(aug_img_path), exist_ok=True)
            cv2.imwrite(aug_img_path, aug_img)

            # Calculate augmented bbox
            if aug_name == 'flip':
              # For horizontal flip, adjust x coordinate
              aug_bbox = [
                  1.0 - bbox[0],  # flip x_center
                  bbox[1],
                  bbox[2],
                  bbox[3]
              ]
            else:
              # For brightness, keep same bbox
              aug_bbox = bbox.copy()

            # Add augmented annotation
            aug_annotations.append({
              'frame_path': aug_img_path,
              'bbox': aug_bbox,
              'confidence': detection['confidence'],
              'class': detection['class']
            })

        if aug_annotations:
          augmented_annotations.append({
            'video_id': f"{video_id}_aug",
            'annotations': aug_annotations
          })

      return augmented_annotations

    def _brightness_augmentation(self, img):
      """Apply brightness augmentation"""
      return cv2.convertScaleAbs(img, alpha=1.2, beta=10)

    def _horizontal_flip_augmentation(self, img):
      """Apply horizontal flip augmentation"""
      return cv2.flip(img, 1)

    def iterative_improvement(self, initial_dataset, num_iterations=3):
      """Iteratively improve the dataset and model"""

      for iteration in range(num_iterations):
        print(f"\nIteration {iteration + 1}/{num_iterations}")

        # Clean up data from previous iteration
        print("\nCleaning up previous iteration data...")
        self.cleanup_iteration_data()
        print("Cleaned up")

        # Split dataset
        print("\nSplitting dataset...")
        train_data, val_data = self.split_dataset(initial_dataset)
        print("Split")

        print(f"\nTraining data size: {len(train_data)}")
        print(f"Validation data size: {len(val_data)}")
        print("\nTraining data")
        for data in train_data:
          print(f"Video ID: {data['video_id']}, Labels: {data['labels']}")

        print("\nValidation data")
        for data in val_data:
          print(f"Video ID: {data['video_id']}, Labels: {data['labels']}")

        # Generate annotations
        print("\nGenerating annotations...")
        train_annotations = self.pipeline.generate_pseudo_annotations(train_data)
        val_annotations = self.pipeline.generate_pseudo_annotations(val_data)
        print(f"Generated {len(train_annotations)} training annotations and {len(val_annotations)} validation annotations")

        # Save annotations and train model
        print("\nSaving annotations...")
        self.pipeline.save_yolo_annotations(train_annotations)
        self.pipeline.save_yolo_annotations(val_annotations, is_validation=True)
        print(f"Saved annotations")

        # Generate and save augmented annotations from high-confidence detections
        print("\nGenerating augmented annotations...")
        augmented_annotations = self.augment_annotations(train_annotations)
        if augmented_annotations:
          print(f"Generated {len(augmented_annotations)} augmented annotations")
          print("Saving augmented annotations...")
          self.pipeline.save_yolo_annotations(augmented_annotations)
        else:
          print("No high-confidence detections found for augmentation")

        # Train model
        print("\nTraining model...")
        self.pipeline.train_model(epochs=2)
        print(f"Model trained for iteration {iteration + 1}")

        # Calculate metrics
        print("\nCalculating metrics")
        metrics = self.pipeline.metrics.calculate_metrics(self.pipeline.model)
        print(f"Metrics for iteration {iteration + 1}:", metrics)

      # Plot final metrics
      self.pipeline.metrics.plot_metrics_history(
        os.path.join(self.base_dir, 'results', 'metrics', 'metrics_history.png')
      )

def complete_cleanup(base_dir):
  """Clean up all temporary and iteration-specific data"""
  # Clean up YOLO dataset directories
  dataset_dir = os.path.join(base_dir, 'data', 'yolo_dataset')
  paths_to_clean = [
    os.path.join(dataset_dir, 'images', 'train'),
    os.path.join(dataset_dir, 'images', 'val'),
    os.path.join(dataset_dir, 'labels', 'train'),
    os.path.join(dataset_dir, 'labels', 'val')
  ]

  for path in paths_to_clean:
    if os.path.exists(path):
      shutil.rmtree(path)
      os.makedirs(path)  # Recreate empty directory

  # Clean up augmented frames
  aug_dir = os.path.join(base_dir, 'data', 'frames_augmented')
  if os.path.exists(aug_dir):
    shutil.rmtree(aug_dir)
    os.makedirs(aug_dir)

  # Clean up YOLO training results
  runs_dir = os.path.join(base_dir, 'runs')
  if os.path.exists(runs_dir):
    shutil.rmtree(runs_dir)

  # Clean up results directory
  results_dirs = [
    os.path.join(base_dir, 'results', 'visualisations'),
    os.path.join(base_dir, 'results', 'metrics')
  ]
  for dir_path in results_dirs:
    if os.path.exists(dir_path):
      shutil.rmtree(dir_path)
      os.makedirs(dir_path)

  # Remove dataset.yaml if it exists
  yaml_path = os.path.join(dataset_dir, 'dataset.yaml')
  if os.path.exists(yaml_path):
    os.remove(yaml_path)

# def run_me_first():
  # # Create the project structure
  # base_dir = create_project_structure()
  # print(f"Project directory created at: {base_dir}")

  # # Get the dataset metadata
  # video_data = get_dataset_metadata(num=1000)
  # print(f"Retrieved metadata for {len(video_data)} videos")

  # # Create initial dataset
  # dataset_info = create_initial_dataset(video_data, base_dir, num_videos=20, max_attempts=30)
  # print(f"\nInitial dataset created with {len(dataset_info)} videos")

  # return base_dir, video_data, dataset_info

# def run_me_second(base_dir, video_data, dataset_info):
#   complete_cleanup(base_dir)

#   # Initialise the ball detection pipeline
#   pipeline = BallDetectionPipeline(base_dir)
#   print("\nBall detection pipeline initialised")

#   # Initialise the improvement pipeline
#   improvement_pipeline = DatasetImprovement(pipeline, base_dir)
#   print("\nDataset improvement pipeline initialised")

#   # Run the iterative improvement process
#   print("\nStarting iterative improvement process...")
#   improvement_pipeline.iterative_improvement(dataset_info, num_iterations=2)
#   print("\nIterative improvement complete!")

#   # Generate final annotations
#   print("\nGenerating final annotations...")
#   final_annotations = pipeline.generate_pseudo_annotations(dataset_info)

#   # Generate detection visualisations
#   print("\nGenerating detection visualisations...")
#   visualisation_paths = pipeline.visualise_detections(final_annotations, num_frames=5)
#   print(f"Created {len(visualisation_paths)} visualisation images")

#   print("\nTraining and evaluation complete!")
#   print(f"Results saved in: {os.path.join(base_dir, 'results')}")

In [None]:
# base_dir, video_data, dataset_info = run_me_first()


# Create the project structure
base_dir = create_project_structure()
print(f"Project directory created at: {base_dir}")

# Get the dataset metadata
video_data = get_dataset_metadata(num=1000)
print(f"Retrieved metadata for {len(video_data)} videos")

# Create initial dataset
dataset_info = create_initial_dataset(video_data, base_dir, num_videos=10, max_attempts=30)
print(f"\nInitial dataset created with {len(dataset_info)} videos")

In [None]:
# run_me_second(base_dir, video_data, dataset_info)


complete_cleanup(base_dir)

# Initialise the ball detection pipeline
pipeline = BallDetectionPipeline(base_dir)
print("\nBall detection pipeline initialised")

# Initialise the improvement pipeline
improvement_pipeline = DatasetImprovement(pipeline, base_dir)
print("\nDataset improvement pipeline initialised")

# Run the iterative improvement process
print("\nStarting iterative improvement process...")
improvement_pipeline.iterative_improvement(dataset_info, num_iterations=2)
print("\nIterative improvement complete!")

# Generate final annotations
print("\nGenerating final annotations...")
final_annotations = pipeline.generate_pseudo_annotations(dataset_info)

print("\nTraining and evaluation complete!")
print(f"Results saved in: {os.path.join(base_dir, 'results')}")

In [None]:
# Generate detection visualisations
print("\nGenerating detection visualisations...")
visualisation_paths = pipeline.visualise_detections(final_annotations, num_frames=5)
print(f"Created {len(visualisation_paths)} visualisation images")

print(f"Results saved in: {os.path.join(base_dir, 'results', 'visualisations')}")

In [None]:
# Select a video to annotate
video_id = 'JF2_Evit478'

# Download video
print(f"Downloading video: {video_id}")
video_path = download_video(video_id, os.path.join(base_dir, 'data/raw_videos'))
print(f"Downloaded video: {video_path}")

# Create output directory for detection videos
print("Creating output directory for detection videos...")
detection_video_dir = os.path.join(base_dir, 'results', 'detection_videos')
os.makedirs(detection_video_dir, exist_ok=True)
print("Output directory created")

# Generate output video path
print("Generating output video path...")
output_video_path = os.path.join(detection_video_dir, f"{video_id}_ball_detection.mp4")
print("Output video path generated")

# Generate video with ball detections
print("Generating video with ball detections...")
pipeline.generate_detection_video(
    video_path=video_path,
    output_path=output_video_path,
    confidence_threshold=0.5
)

# Clean raw video
print("Cleaning raw video...")
os.remove(video_path)
print("Raw video cleaned")

print("Video with ball detections generated")
print("Saved video to:", output_video_path)

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"
!zip -r '/content/file.zip' '/content/sports_ball_detection'

In [None]:
from google.colab import files
files.download("/content/file.zip")