In [6]:
from pathlib import Path
import glob
from PIL import Image
import typing as t
import cv2 as cv
from cv2.typing import MatLike
import datetime
import shutil


# Configuration
ANALYZE_WIDTH = 220
ANALYZE_HEIGHT = 88
TRAIN_IMAGE_SIZE = (64, 32)
TRAIN_TEST_SPLIT = 0.8

PATH_TRAINING_DATA = Path("./data/training")
PATH_TESTING_DATA = Path("./data/testing")


def setup_directories():
    """Create clean training and testing directories."""
    for path in [PATH_TRAINING_DATA, PATH_TESTING_DATA]:
        if path.exists() and path.is_dir():
            shutil.rmtree(path)
        path.mkdir(parents=True, exist_ok=True)


def find_playhead_position(img: MatLike, template: MatLike) -> t.Tuple[int, int]:
    """Find the position of the playhead in the image using template matching."""
    res = cv.matchTemplate(img, template, cv.TM_CCOEFF)
    min_val, max_val, min_loc, max_loc = cv.minMaxLoc(res)
    return (max_loc[0], max_loc[1])


def extract_regions(img: MatLike, playhead_pos: t.Tuple[int, int], template_shape: t.Tuple[int, int]) -> t.Tuple[MatLike, MatLike]:
    """Extract left and right analysis regions around the playhead."""
    analyze_bbox_left = (playhead_pos[0] - ANALYZE_WIDTH, playhead_pos[1], 
                        playhead_pos[0], playhead_pos[1] + ANALYZE_HEIGHT)
    analyze_bbox_right = (playhead_pos[0] + template_shape[1], playhead_pos[1], 
                         playhead_pos[0] + template_shape[1] + ANALYZE_WIDTH, playhead_pos[1] + ANALYZE_HEIGHT)
    
    img_left = img[analyze_bbox_left[1]:analyze_bbox_left[3], analyze_bbox_left[0]:analyze_bbox_left[2], :]
    img_right = img[analyze_bbox_right[1]:analyze_bbox_right[3], analyze_bbox_right[0]:analyze_bbox_right[2], :]
    
    return img_left, img_right


def save_training_images(img_bass: MatLike, img_breakdown: MatLike, is_training: bool):
    """Save bass and breakdown images to appropriate directories."""
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')
    file_name_bass = f"bass_{timestamp}.png"
    file_name_breakdown = f"breakdown_{timestamp}.png"
    
    # Convert BGR to RGB and resize
    img_pil_bass = Image.fromarray(cv.cvtColor(img_bass, cv.COLOR_BGR2RGB)).resize(TRAIN_IMAGE_SIZE)
    img_pil_breakdown = Image.fromarray(cv.cvtColor(img_breakdown, cv.COLOR_BGR2RGB)).resize(TRAIN_IMAGE_SIZE)
    
    # Save to appropriate directory
    save_dir = PATH_TRAINING_DATA if is_training else PATH_TESTING_DATA
    img_pil_bass.save(save_dir / file_name_bass)
    img_pil_breakdown.save(save_dir / file_name_breakdown)


def process_images():
    """Main function to process all images and create training/testing datasets."""
    # Load template and input images
    playhead_template = cv.imread("./templates/playhead.png")
    images_bass = glob.glob("./data/raw/bass/*.png")
    images_breakdown = glob.glob("./data/raw/breakdown/*.png")
    all_images = images_bass + images_breakdown
    
    setup_directories()
    
    n_images = len(all_images)
    print(f"Processing {n_images} images...")
    
    for i, img_path in enumerate(all_images):
        img = cv.imread(img_path)
        playhead_pos = find_playhead_position(img, playhead_template)
        img_left, img_right = extract_regions(img, playhead_pos, playhead_template.shape)
        
        # Determine which region corresponds to which class
        is_breakdown_source = "breakdown" in img_path
        train_image_breakdown = img_right if is_breakdown_source else img_left
        train_image_bass = img_left if is_breakdown_source else img_right
        
        # Determine if this goes to training or testing set
        is_training = i < n_images * TRAIN_TEST_SPLIT
        
        save_training_images(train_image_bass, train_image_breakdown, is_training)
    
    print(f"Created {int(n_images * TRAIN_TEST_SPLIT * 2)} training images")
    print(f"Created {int(n_images * (1 - TRAIN_TEST_SPLIT) * 2)} testing images")


# Run the processing
process_images()

Processing 59 images...
Created 94 training images
Created 23 testing images
Created 94 training images
Created 23 testing images


In [7]:
from pathlib import Path
import numpy as np
from PIL import Image
from sklearn.metrics import accuracy_score, classification_report
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import LinearSVC
import joblib


def load_dataset(folder: Path) -> tuple[np.ndarray, np.ndarray]:
    images: list[np.ndarray] = []
    labels: list[int] = []
    for img_path in sorted(folder.glob("*.png")):
        label = 1 if "breakdown" in img_path.stem else 0
        with Image.open(img_path) as img:
            arr = np.asarray(img.convert("RGB"), dtype=np.float32).reshape(-1)
        images.append(arr)
        labels.append(label)
    if not images:
        raise ValueError(f"No PNG images found in {folder}")
    features = np.stack(images) / 255.0
    targets = np.asarray(labels, dtype=np.int64)
    return features, targets


train_dir = Path("./data/training")
test_dir = Path("./data/testing")

x_train, y_train = load_dataset(train_dir)
x_test, y_test = load_dataset(test_dir)

model = make_pipeline(StandardScaler(with_mean=False), LinearSVC())
model.fit(x_train, y_train)

# Save the trained model
model_path = Path("./bass_breakdown_classifier.pkl")
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")

y_pred = model.predict(x_test)

print(f"Accuracy: {accuracy_score(y_test, y_pred):.3f}")
print(classification_report(y_test, y_pred, target_names=["bass", "breakdown"]))

Model saved to bass_breakdown_classifier.pkl
Accuracy: 1.000
              precision    recall  f1-score   support

        bass       1.00      1.00      1.00        11
   breakdown       1.00      1.00      1.00        11

    accuracy                           1.00        22
   macro avg       1.00      1.00      1.00        22
weighted avg       1.00      1.00      1.00        22





In [8]:
import time

if "model" not in globals() or "x_train" not in globals():
    raise RuntimeError("Run the training cell before benchmarking inference.")

# Measure single-image inference latency on the training set
start = time.perf_counter()
for sample in x_train:
    _ = model.predict(sample.reshape(1, -1))
total = time.perf_counter() - start
per_image_ms = (total / x_train.shape[0]) * 1000.0

print(f"Predicted {x_train.shape[0]} images in {total:.3f} s")
print(f"Average latency per image: {per_image_ms:.3f} ms")

Predicted 96 images in 0.012 s
Average latency per image: 0.123 ms


In [10]:
from pathlib import Path
import numpy as np
from PIL import Image
import cv2 as cv
from cv2.typing import MatLike
import typing as t
import joblib


# Configuration (must match training parameters)
ANALYZE_WIDTH = 220
ANALYZE_HEIGHT = 88
TRAIN_IMAGE_SIZE = (64, 32)

# Global model variable for efficiency
_model = None


def load_classifier_model(model_path: str = "./bass_breakdown_classifier.pkl"):
    """Load the trained classifier model. Call this once before inference."""
    global _model
    if not Path(model_path).exists():
        raise FileNotFoundError(f"Model file not found: {model_path}")
    _model = joblib.load(model_path)
    print(f"Model loaded from {model_path}")


def predict_bass_or_breakdown(img: MatLike, playhead_pos: t.Tuple[int, int], 
                             template_shape: t.Tuple[int, int] = (19, 88)) -> str:
    """
    Predict whether the current state is 'bass' or 'breakdown' from a live image.
    
    Args:
        img: Full captured image (BGR format from cv2)
        playhead_pos: (x, y) position of the playhead
        template_shape: (width, height) of the playhead template for region calculation
    
    Returns:
        "bass" or "breakdown"
    """
    global _model
    if _model is None:
        raise RuntimeError("Model not loaded. Call load_classifier_model() first.")
    
    # Extract analysis regions around the playhead
    analyze_bbox_left = (playhead_pos[0] - ANALYZE_WIDTH, playhead_pos[1], 
                        playhead_pos[0], playhead_pos[1] + ANALYZE_HEIGHT)
    analyze_bbox_right = (playhead_pos[0] + template_shape[1], playhead_pos[1], 
                         playhead_pos[0] + template_shape[1] + ANALYZE_WIDTH, playhead_pos[1] + ANALYZE_HEIGHT)
    
    img_left = img[analyze_bbox_left[1]:analyze_bbox_left[3], analyze_bbox_left[0]:analyze_bbox_left[2], :]
    img_right = img[analyze_bbox_right[1]:analyze_bbox_right[3], analyze_bbox_right[0]:analyze_bbox_right[2], :]
    
    # Convert regions to RGB, resize, and flatten for model input
    img_left_rgb = cv.cvtColor(img_left, cv.COLOR_BGR2RGB)
    img_right_rgb = cv.cvtColor(img_right, cv.COLOR_BGR2RGB)
    
    img_left_pil = Image.fromarray(img_left_rgb).resize(TRAIN_IMAGE_SIZE)
    img_right_pil = Image.fromarray(img_right_rgb).resize(TRAIN_IMAGE_SIZE)
    
    # Convert to feature vectors (same format as training)
    left_features = np.asarray(img_left_pil, dtype=np.float32).reshape(-1) / 255.0
    right_features = np.asarray(img_right_pil, dtype=np.float32).reshape(-1) / 255.0
    
    # Predict both regions
    left_pred = _model.predict(left_features.reshape(1, -1))[0]
    right_pred = _model.predict(right_features.reshape(1, -1))[0]
    
    # Logic: if left is breakdown, then current state is breakdown
    # if right is breakdown, then current state is bass
    if left_pred == 1:  # left is breakdown
        return "breakdown"
    elif right_pred == 1:  # right is breakdown
        return "bass"
    else:
        # Both predict bass - need to determine which side we're looking at
        # Default to bass if unclear
        return "bass"


# Example usage:
load_classifier_model()  # Call once at startup

# During live inference:
img = cv.imread("data/raw/bass/bass_20251022_142938_107288.png")
template = cv.imread("templates/playhead_template.png")
playhead_pos = find_playhead_position(img, template)  # from cell 1
result = predict_bass_or_breakdown(img, playhead_pos)
print(f"Current state: {result}")

Model loaded from ./bass_breakdown_classifier.pkl


error: OpenCV(4.11.0) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\templmatch.cpp:1164: error: (-215:Assertion failed) (depth == CV_8U || depth == CV_32F) && type == _templ.type() && _img.dims() <= 2 in function 'cv::matchTemplate'
