In [3]:
%%capture
!pip install -U yt_dlp youtube-search-python

In [1]:
!pip install moviepy



In [4]:
%%capture
!pip install "httpx<0.27" --force-reinstall

In [5]:
%%capture
!python3 -m pip install opencv-python==4.9.0.80 mediapipe==0.10.5 torch==2.2.0

In [6]:
from youtubesearchpython import VideosSearch
import yt_dlp
import os

def parse_duration(duration_str):
    parts = duration_str.split(':')
    if len(parts) == 2:  # mm:ss
        minutes, seconds = map(int, parts)
        return minutes * 60 + seconds
    elif len(parts) == 3:  # hh:mm:ss
        hours, minutes, seconds = map(int, parts)
        return hours * 3600 + minutes * 60 + seconds
    return 0  # if unknown or invalid

def download_videos(query, label, num_videos=5, save_dir='videos'):
    path = os.path.join(save_dir, label)
    os.makedirs(path, exist_ok=True)

    collected = 0
    search = VideosSearch(query, limit=30)  # Fetch more to filter

    for result in search.result()['result']:
        if 'duration' not in result:
            continue  # Skip livestreams or missing info

        duration_sec = parse_duration(result['duration'])
        if duration_sec >= 300:
            continue  # Skip videos 5 min or longer

        url = result['link']
        output_filename = os.path.join(path, f"{label}_{collected + 1}.mp4")

        ydl_opts = {
            'format': 'best[ext=mp4]/best',
            'outtmpl': output_filename,
            'quiet': True,
            'noplaylist': True,
        }

        try:
            print(f"Downloading [{label}] video {collected + 1}: {result['title']}")
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([url])
            collected += 1
        except Exception as e:
            print(f"Failed to download {url}: {e}")

        if collected >= num_videos:
            break

# Download 5 short ballet and 5 short hip-hop videos
download_videos("ballet dance performance", "ballet", num_videos=10)
download_videos("hip hop dance performance", "hiphop", num_videos=10)


Downloading [ballet] video 1: Dance of the Sugar Plum Fairy from The Nutcracker (The Royal Ballet)
Downloading [ballet] video 2: Swan Lake – Dance of the cygnets (The Royal Ballet)
Downloading [ballet] video 3: Don Quixote – Act III Kitri Variation (Akane Takada, The Royal Ballet)
Downloading [ballet] video 4: Winter Waltz - CASA DE BALET
Downloading [ballet] video 5: LED Ballerinas - Ballerina Dance / Modern Ballet Show - Contraband Entertainment
Downloading [ballet] video 6: Ella is FLYING 😍🩰✨ #ballerina #ballet #shorts #ad
Downloading [ballet] video 7: Can’t Help Falling in Love|Emotional Ballet Performance
Downloading [ballet] video 8: Jeeho Lee WOWS the audience with the La Esmeralda Finale!
Downloading [ballet] video 9: 12 years of ballet vs 2 💀😱 #ballet #challenge
Downloading [ballet] video 10: Rewrite The Stars - Daniel Jang | Ballet, PERFORMING ARTS STUDIO PH
Downloading [hiphop] video 1: 10 year old KILLS adult level dance 😱 #notlikeus #dance
Downloading [hiphop] video 2: Hip

In [7]:
from moviepy.editor import VideoFileClip

def split_video(video_path, clip_length=30, output_dir="clips"):
    try:
        video = VideoFileClip(video_path)
        duration = int(video.duration)
        base_name = os.path.splitext(os.path.basename(video_path))[0]

        os.makedirs(output_dir, exist_ok=True)

        clip_count = 0
        for start in range(0, duration, clip_length):
            end = min(start + clip_length, duration)
            subclip = video.subclip(start, end)
            output_path = os.path.join(output_dir, f"{base_name}_part{clip_count + 1}.mp4")
            subclip.write_videofile(output_path, codec="libx264", audio_codec="aac", logger=None)
            clip_count += 1

        print(f"✅ Done: {video_path} → {clip_count} clips.")
    except Exception as e:
        print(f"❌ Failed to process {video_path}: {e}")

# Process all videos in both folders
for label in ['ballet', 'hiphop']:
    input_dir = f"videos/{label}"
    output_dir = f"clips/{label}"
    for filename in os.listdir(input_dir):
        if filename.endswith(".mp4"):
            full_path = os.path.join(input_dir, filename)
            split_video(full_path, clip_length=30, output_dir=output_dir)

✅ Done: videos/ballet/ballet_9.mp4 → 2 clips.
✅ Done: videos/ballet/ballet_10.mp4 → 3 clips.
✅ Done: videos/ballet/ballet_6.mp4 → 1 clips.
✅ Done: videos/ballet/ballet_3.mp4 → 3 clips.
✅ Done: videos/ballet/ballet_7.mp4 → 6 clips.
✅ Done: videos/ballet/ballet_8.mp4 → 1 clips.
✅ Done: videos/ballet/ballet_5.mp4 → 6 clips.
✅ Done: videos/ballet/ballet_4.mp4 → 7 clips.
✅ Done: videos/ballet/ballet_2.mp4 → 4 clips.
✅ Done: videos/ballet/ballet_1.mp4 → 6 clips.
✅ Done: videos/hiphop/hiphop_10.mp4 → 1 clips.
✅ Done: videos/hiphop/hiphop_8.mp4 → 2 clips.
✅ Done: videos/hiphop/hiphop_1.mp4 → 1 clips.
✅ Done: videos/hiphop/hiphop_4.mp4 → 1 clips.
✅ Done: videos/hiphop/hiphop_5.mp4 → 2 clips.
✅ Done: videos/hiphop/hiphop_7.mp4 → 1 clips.





✅ Done: videos/hiphop/hiphop_3.mp4 → 1 clips.
✅ Done: videos/hiphop/hiphop_9.mp4 → 1 clips.
✅ Done: videos/hiphop/hiphop_2.mp4 → 2 clips.
✅ Done: videos/hiphop/hiphop_6.mp4 → 1 clips.


In [8]:
import cv2
import numpy as np
import mediapipe as mp
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.0.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/usr/local/lib/python3.11/dist-packages/colab_kernel_launcher.py", line 37, in <module>
    ColabKernelApp.launch_instance()
  File "/usr/local/lib/python3.11/dist-packages/traitlets/config/application.py", line 992, in launch_instance
    app.start()
  File "/usr/local/lib/python3.11/dist-packages/ipykernel/kernelapp.py", line 712, in start
    self.io_loop.start()
  File "/usr/local/lib/python3.11/dist-package

In [9]:
# ----------------------
# Config
# ----------------------
TARGET_SIZE = (224, 224)
SEQUENCE_LENGTH = 32
STRIDE = 16
USE_SKELETON = True
NUM_CLASSES = 5

In [10]:
# ----------------------
# Pose Estimation Setup
# ----------------------
mp_pose = mp.solutions.pose
pose_model = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)

def extract_pose(frame):
    results = pose_model.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    if not results.pose_landmarks:
        return np.zeros((33, 3))  # x, y, z
    return np.array([[l.x, l.y, l.z] for l in results.pose_landmarks.landmark])

In [11]:
def process_video(video_path, use_pose=True, frame_skip=5):
    cap = cv2.VideoCapture(video_path)
    frames, keypoints = [], []
    frame_idx = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_skip == 0:
            frame = cv2.resize(frame, TARGET_SIZE)

            if use_pose:
                keypoints.append(extract_pose(frame).flatten())

            frame = frame.astype(np.float32) / 255.0
            frames.append(frame)

        frame_idx += 1

    cap.release()
    return np.array(frames), np.array(keypoints) if use_pose else None

In [12]:
# ----------------------
# Create Fixed-Length Clips
# ----------------------
def create_clips(frames, keypoints=None, sequence_length=32, stride=16):
    clips, pose_clips = [], []
    for i in range(0, len(frames) - sequence_length + 1, stride):
        if keypoints is not None:
            pose_clip = keypoints[i:i + sequence_length]
            if len(pose_clip) == sequence_length:
                pose_clips.append(pose_clip)

    return np.array(pose_clips)

In [13]:
# ----------------------
# LSTM Model
# ----------------------
class PoseLSTM(nn.Module):
    def __init__(self, input_size=99, hidden_size=128, num_layers=2, num_classes=NUM_CLASSES):
        super(PoseLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]
        return self.fc(out)

In [14]:
# ----------------------
# Train Model
# ----------------------
def train_model(video_paths, labels):
    all_clips = []
    all_labels = []

    for i, video in enumerate(video_paths):
        _, keypoints = process_video(video, use_pose=True, frame_skip=5)
        pose_clips = create_clips(_, keypoints, sequence_length=SEQUENCE_LENGTH, stride=STRIDE)
        all_clips.extend(pose_clips)
        all_labels.extend([labels[i]] * len(pose_clips))

    X = torch.tensor(np.stack(all_clips)).float()
    y = torch.tensor(all_labels).long()

    dataset = TensorDataset(X, y)
    loader = DataLoader(dataset, batch_size=8, shuffle=True)

    model = PoseLSTM()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(5):
        for xb, yb in loader:
            pred = model(xb)
            loss = criterion(pred, yb)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

    return model

In [25]:

# ----------------------
# Predict on New Video
# ----------------------
def predict(video_path, model):
    _, keypoints = process_video(video_path, use_pose=True)
    pose_clips = create_clips(_, keypoints, sequence_length=SEQUENCE_LENGTH, stride=STRIDE)

    model.eval()
    with torch.no_grad():
        inputs = torch.tensor(pose_clips).float()
        outputs = model(inputs)
        avg_probs = torch.softmax(outputs, dim=1).mean(dim=0)
        pred_class = torch.argmax(avg_probs).item()

    return pred_class

In [16]:
import random

def get_labeled_video_paths(clip_dir='clips'):
    train_videos, train_labels = [], []
    test_videos, test_labels = [], []

    for label_name, label_value in [('ballet', 0), ('hiphop', 1)]:
        folder_path = os.path.join(clip_dir, label_name)
        video_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.mp4')])

        if len(video_files) < 2:
            raise ValueError(f"Not enough videos in {folder_path} to split into train/test.")

        random.shuffle(video_files)  # Shuffle to randomize test selection

        test_file = video_files.pop()  # Leave one for testing
        test_videos.append(os.path.join(folder_path, test_file))
        test_labels.append(label_value)

        for file in video_files:
            train_videos.append(os.path.join(folder_path, file))
            train_labels.append(label_value)

    return train_videos, train_labels, test_videos, test_labels


In [18]:
train_videos, train_labels, test_videos, test_labels = get_labeled_video_paths()
model = train_model(train_videos, train_labels)

Epoch 1, Loss: 0.9548
Epoch 2, Loss: 0.6637
Epoch 3, Loss: 0.5656
Epoch 4, Loss: 0.5386
Epoch 5, Loss: 0.5886


In [26]:
print("\n🧪 Test Results:")
for i, video_path in enumerate(test_videos):
    pred = predict(video_path, model)
    print(f"Video: {os.path.basename(video_path)} | Actual: {test_labels[i]} | Predicted: {pred}")


🧪 Test Results:
Video: ballet_1_part5.mp4 | Actual: 0 | Predicted: 0
Video: hiphop_1_part1.mp4 | Actual: 1 | Predicted: 0
