In [None]:
!pip install -q yt-dlp split-folders opencv-python-headless tqdm

In [None]:
!pip install -q tensorflow pillow matplotlib seaborn

In [None]:
import os,random,json,math,shlex,subprocess
from pathlib import Path
from tqdm import tqdm
import numpy as np
import shutil

In [None]:
from PIL import Image
import cv2
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
print("Tensorflow Version: ", tf.__version__)
tf.config.list_physical_devices('GPU')

In [None]:
root = "./content/game_classifier"
raw_video_dir = os.path.join(root, "videos")
frames_dir = os.path.join(root, "frames")
dataset_dir = os.path.join(root, "dataset")
models_dir = os.path.join(root, "models")
for d in [raw_video_dir, frames_dir, dataset_dir, models_dir]:
  os.makedirs(d, exist_ok=True)

In [None]:
!unzip frame_file.zip

In [None]:
IMG_SIZE=  224
BATCH_SIZE = 32
random.seed(42)
tf.random.set_seed(42)

In [None]:
games = {
    "gta5": ["https://www.youtube.com/watch?v=K89JWVEDmV0&list=WL&index=8",
            "https://www.youtube.com/watch?v=BiZtze2u2TA"],
    "indiana_jones": ["https://www.youtube.com/watch?v=8vuxip2nO-M&list=WL&index=9",
                     "https://www.youtube.com/watch?v=0ciyN9mgMFs",
                     "https://www.youtube.com/watch?v=lDo6AkgaAJs"],
    "tomb_raider": ["https://www.youtube.com/watch?v=cqGCtwxMuWQ&list=WL&index=7",
                   "https://www.youtube.com/watch?v=XHtTcebsQcE",
                   "https://www.youtube.com/watch?v=J7EPtPmt62c"],
    "spiderman": ["https://www.youtube.com/watch?v=fAnIUbnOekA"]
}

step_seconds = {
    "gta5": 4,
    "indiana_jones": 3,
    "tomb_raider": 4,
    "spiderman": 4
}

print("Games Registered: ", list(games.keys()))

In [None]:
import yt_dlp
def download_videos(url, out_path):
  os.makedirs(os.path.dirname(out_path), exist_ok=True)
  ydl_opt = {
      'outtmpl': out_path,
      'format': 'best[ext=mp4]/best'
  }
  try:
    with yt_dlp.YoutubeDL(ydl_opt) as ydl:
      ydl.download([url])
    return True
  except Exception as e:
    print("download Failed:", e)
    return False

In [None]:
for game, urls in games.items():
    game_dir = os.path.join(raw_video_dir, game)
    os.makedirs(game_dir, exist_ok=True)

    for idx, url in enumerate(urls):
        out_vid = os.path.join(game_dir, f"{game}_{idx}.mp4")

        if os.path.exists(out_vid):
            print(f"{out_vid} already exists")
            continue

        print("downloading", out_vid)
        ok = download_videos(url, out_vid)

        if not ok:
            print(f"failed to download {url}")
        else:
            print("Saved:", out_vid)

In [None]:
def extract_frames_for_game(game_name, video_path, out_dir, step_seconds=4, max_frames=None):
    os.makedirs(out_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Cannot open video:", video_path)
        return 0

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    step_frames = max(1, int(round(fps * step_seconds)))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
    saved = 0
    frame_idx = 0
    video_name = Path(video_path).stem

    pbar = tqdm(total=total_frames, desc=f"Extracting {game_name} | {video_name}", unit="fr")

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % step_frames == 0:
            out_path = os.path.join(
                out_dir,
                f"{game_name}_{video_name}_{saved:05d}.jpg"
            )
            cv2.imwrite(out_path, frame)
            saved += 1

            if max_frames and saved >= max_frames:
                break

        frame_idx += 1
        pbar.update(1)

    pbar.close()
    cap.release()

    print(f"Saved {saved} frames from {video_name}")
    return saved

for game in games.keys():
    game_video_dir = os.path.join(raw_video_dir, game)
    out_folder = os.path.join(frames_dir, game)
    os.makedirs(out_folder, exist_ok=True)
    video_files = sorted(Path(game_video_dir).glob("*.mp4"))

    if len(video_files) == 0:
        print(f"No videos found for {game}, skipping.")
        continue

    existing_frames = len(list(Path(out_folder).glob("*.jpg")))
    if existing_frames > 50:
        print(f"Frames for {game} already exist ({existing_frames}), skipping extraction.")
        continue

    ss = step_seconds.get(game, 4)
    print(f"\nExtracting frames for {game} | step_seconds={ss}")

    for vid in video_files:
        extract_frames_for_game(
            game_name=game,
            video_path=str(vid),
            out_dir=out_folder,
            step_seconds=ss
        )

In [None]:
from IPython.display import display
counts = {}
for g in games.keys():
    folder = os.path.join(frames_dir, g)
    n = len(list(Path(folder).glob("*.jpg")))
    counts[g] = n
print("Frame counts:", counts)

first = list(games.keys())[0]
sample_files = list(Path(os.path.join(frames_dir, first)).glob("*.jpg"))[:6]
import matplotlib.pyplot as plt
plt.figure(figsize=(12,6))
for i,f in enumerate(sample_files):
    plt.subplot(2,3,i+1)
    img = Image.open(f).convert("RGB").resize((320,180))
    plt.imshow(img); plt.axis('off')
plt.suptitle(first); plt.show()

In [None]:
import shutil
shutil.rmtree("content/game_classifier/dataset")

In [None]:
TARGET = 450
frames_dir = Path(frames_dir)
for game in frames_dir.iterdir():
    imgs = list(game.glob("*.jpg"))
    if len(imgs) > TARGET:
        remove = random.sample(imgs, len(imgs) - TARGET)
        for f in remove:
            f.unlink()
        print(f"{game.name}: reduced to {TARGET}")
    else:
        print(f"{game.name}: kept {len(imgs)}")

In [None]:
!zip -r frame_file.zip content/game_classifier/frames/*

In [None]:
SPLIT_ROOT = os.path.join(dataset_dir)
if os.path.exists(SPLIT_ROOT):
    print("Dataset split root exists:", SPLIT_ROOT)
else:
    os.makedirs(SPLIT_ROOT, exist_ok=True)

def make_splits(frames_root, out_root, train_ratio=0.7, val_ratio=0.2):
    for cls in os.listdir(frames_root):
        src_dir = os.path.join(frames_root, cls)
        files = sorted([str(p) for p in Path(src_dir).glob("*.jpg")])
        random.shuffle(files)
        n = len(files)
        if n == 0:
            continue
        n_train = int(n * train_ratio)
        n_val = int(n * val_ratio)
        train_files = files[:n_train]
        val_files = files[n_train:n_train+n_val]
        test_files = files[n_train+n_val:]
        for split, flist in [("train",train_files), ("val", val_files), ("test", test_files)]:
            out_dir = os.path.join(out_root, split, cls)
            os.makedirs(out_dir, exist_ok=True)
            for src in flist:
                dst = os.path.join(out_dir, os.path.basename(src))
                shutil.copy2(src, dst)

make_splits(frames_dir, SPLIT_ROOT)
print("Created train/val/test in:", SPLIT_ROOT)

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    os.path.join(SPLIT_ROOT, "train"),
    image_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    seed=42, shuffle=True
)
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    os.path.join(SPLIT_ROOT, "val"),
    image_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    seed=42, shuffle=False
)
test_ds = tf.keras.preprocessing.image_dataset_from_directory(
    os.path.join(SPLIT_ROOT, "test"),
    image_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    seed=42, shuffle=False
)

class_names = train_ds.class_names
print("Classes:", class_names)

In [None]:
from tensorflow.keras.applications.efficientnet import preprocess_input
data_augmentation = keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.06),
    layers.RandomZoom(0.08),
    layers.RandomContrast(0.08),
])

def prepare(ds, augment=False):
    ds = ds.map(lambda x,y: (tf.image.resize(x, (IMG_SIZE, IMG_SIZE)), y),
                num_parallel_calls=AUTOTUNE)

    ds = ds.map(lambda x,y: (preprocess_input(x), y),
                num_parallel_calls=AUTOTUNE)

    if augment:
        ds = ds.map(lambda x,y: (data_augmentation(x, training=True), y),
                    num_parallel_calls=AUTOTUNE)

    return ds.cache().prefetch(AUTOTUNE)


train_ds_pre = prepare(train_ds, augment=True)
val_ds_pre = prepare(val_ds, augment=False)
test_ds_pre = prepare(test_ds, augment=False)

In [None]:
num_classes = len(class_names)
base_model = tf.keras.applications.EfficientNetB0(include_top=False, input_shape=(IMG_SIZE,IMG_SIZE,3), weights='imagenet')
base_model.trainable = False

inputs = keras.Input(shape=(IMG_SIZE,IMG_SIZE,3))
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs, outputs)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
model.compile(
    optimizer=keras.optimizers.Adam(1e-4),
    loss=loss_fn,
    metrics=['accuracy']
)
model.summary()

In [None]:
labels = []

for _, y in train_ds.unbatch():
    labels.append(y.numpy())

labels = np.array(labels)
EPOCH_HEAD = 5
EPOCH_FINE = 8

history_head = model.fit(train_ds_pre, validation_data=val_ds_pre, epochs=EPOCH_HEAD)
base_model.trainable = True
freeze_until = 200
for layer in base_model.layers[:freeze_until]:
    layer.trainable = False
model.compile(
    optimizer=keras.optimizers.Adam(1e-5),
    loss=loss_fn,
    metrics=['accuracy']
)

history_ft = model.fit(train_ds_pre, validation_data=val_ds_pre, epochs=EPOCH_FINE)

In [None]:
os.makedirs(models_dir, exist_ok=True)
model_path = os.path.join(models_dir, "game_effnetb0.h5")
class_json = os.path.join(models_dir, "class_names.json")
model.save(model_path)
with open(class_json, "w") as f:
    json.dump(class_names, f)

print("Saved model to:", model_path)
print("Saved class names to:", class_json)
print("Class order:", class_names)

In [None]:
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns

y_true = []
y_pred = []
for images, labels in test_ds_pre:
    preds = model.predict(images)
    y_pred.extend(np.argmax(preds, axis=1).tolist())
    y_true.extend(labels.numpy().tolist())
acc = accuracy_score(y_true, y_pred)
print(f"Test accuracy: {acc*100:.2f}%")
print(classification_report(y_true, y_pred, target_names=class_names, zero_division=0))

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d", xticklabels=class_names, yticklabels=class_names, cmap="Blues")
plt.xlabel("Predicted");
plt.ylabel("True");
plt.show()