# Identify Songs by Lyrics

This notebook identifies songs within music blocks by reading the screen lyrics and matching them against the database.

It downloads the video (without audio) from a specified YouTube URL, captures image frames and processes them using cv2, reads the screen lyrics using easyocr, and matches them against database lyrics using rapidfuzz.

Inputs:
- YouTube_URL (must be the same one as in notebook 03)
- DB_URL (from .env) - assumes song_lyrics already in database 
- Music blocks (from notebook 03)

Outputs:
- Song blocks with data about individual song matches

## Imports

In [None]:
import os
import sys
import re
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
import easyocr
from rapidfuzz import process, fuzz

## SETTINGS

Add parent folder to python path (needed for retrieving YOUTUBE_URL from settings.py)

In [None]:
sys.path.append(str(Path("..").resolve()))

Initialise path for .env file (located in the parent directory)

In [None]:
from dotenv import load_dotenv

env_path = Path("..") / ".env"
load_dotenv(dotenv_path=env_path)

Data Directories (copied from notebook 01)

In [None]:
DATA_DIR = Path("..") / "data"  # '..' moves up one level to project root

RAW_DATA_DIR = DATA_DIR / "raw"
CLIPS_DATA_DIR = DATA_DIR / "clips"
RESULTS_DIR = DATA_DIR / "results"

STAGING_DIR = CLIPS_DATA_DIR / "segments"
MUSIC_CLIPS_DIR = CLIPS_DATA_DIR / "music"
NOT_MUSIC_CLIPS_DIR = CLIPS_DATA_DIR / "not-music"

# === Create the folders if they don't exist ===
RAW_DATA_DIR.mkdir(parents=True, exist_ok=True)
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
STAGING_DIR.mkdir(parents=True, exist_ok=True)
MUSIC_CLIPS_DIR.mkdir(parents=True, exist_ok=True)
NOT_MUSIC_CLIPS_DIR.mkdir(parents=True, exist_ok=True)

Output Video filepath

In [None]:
OUTPUT_VIDEO = RAW_DATA_DIR / "output_video.mp4"

YouTube video URL

In [None]:
from settings import YOUTUBE_URL

## Download Video (without Audio) from YouTube

Download the best video-only stream that is 480p or lower and encoded as mp4 (overwrite)
-  time to run `populate_song_blocks` function reduced from 2 min 27 sec to 17 sec (when compared to best quality) but gave same output

In [None]:
!yt-dlp -q --force-overwrites -f "bv*[height<=480][ext=mp4]" -o "{OUTPUT_VIDEO}" {YOUTUBE_URL}

## Get Lyrics From Database

Load Database URL from environment variables

In [None]:
DB_URL = os.getenv("DB_URL")

if DB_URL:
    print("Database URL loaded successfully.")
else:
    print("Error: DB_URL not found. Check your .env file path.")

Query database to get df_lyrics

In [None]:
from sqlalchemy import create_engine

engine = create_engine(DB_URL)

query = """
SELECT song_lyrics.song_id, songs.first_line, song_lyrics.content
FROM song_lyrics
JOIN songs ON song_lyrics.song_id = songs.id;
"""

df_lyrics = pd.read_sql_query(query, engine)
df_lyrics

Filter out lyric anomalies

In [None]:
mask = df_lyrics["content"].str.len() > 20
df_lyrics = df_lyrics[mask]
df_lyrics = df_lyrics.reset_index(drop=True)  # Important for iloc and RapidFuzz matches

Add column for cleaned lyrics

In [None]:
def clean_text(text):
    text = str(text).lower()
    text = re.sub(r"[^a-z0-9\s]", " ", text)
    text = re.sub(r"\s+", " ", text)
    return text.strip()


df_lyrics["cleaned"] = df_lyrics["content"].apply(clean_text)
df_lyrics.head()

## Read lyrics from screen

Initialize the reader (this downloads the model weights once)

In [None]:
reader = easyocr.Reader(['en'])

Print status helper

In [None]:
def print_status(message, width=80):
    print(message.ljust(width), end="\r")

Helper functions relating to video

In [None]:
def populate_song_blocks(music_blocks, file_path, youtube_url, debug=False):

    print_status("Populating song blocks...")

    # Load video capture
    cap = cv2.VideoCapture(file_path)

    # Define video properties
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    lyric_start_row = int(height * 0.6)
    lyric_end_row   = height

    SHIFT_TIME = 30  # In case lyrics aren't showing at start and end of song

    song_blocks = {}
    for _, row in music_blocks.iterrows():
        block_id = row['block_id']
        block_start_time = row['start_sec']
        block_end_time = row['end_sec']
        
        song_blocks[block_id] = []
        
        song_start_time = block_start_time
        song_start_time_shifted = song_start_time + SHIFT_TIME
        block_end_time_shifted = block_end_time - SHIFT_TIME
        
        if debug:
            print(f"\n--- Processing Music Block: {block_start_time}s to {block_end_time}s ---")

        # Get song_id at end (once)
        song_id_end = None
        while (song_id_end is None) and (block_end_time_shifted > block_start_time):
            song_end = _get_best_matching_song(cap, lyric_start_row, lyric_end_row, width, block_end_time_shifted, debug=debug)
            if not song_end:
                block_end_time_shifted -= SHIFT_TIME # Shift back if nothing found
                continue
            song_id_end = song_end["id"]
        if song_id_end is None:
            if debug:
                print("NO SONG FOUND IN THIS SONG BLOCK")
            continue
        
        # ===== GET SONG(S) IN SONG BLOCK =====
        while song_start_time_shifted < block_end_time_shifted:

            # Get song_id at start of song
            song_id_start = None
            while (song_id_start is None) and (song_start_time_shifted < block_end_time_shifted):
                song_start = _get_best_matching_song(cap, lyric_start_row, lyric_end_row, width, song_start_time_shifted, debug=debug)
                if not song_start:
                    song_start_time_shifted += SHIFT_TIME # Shift forward if nothing found
                    continue
                song_id_start = song_start["id"]
                song_first_line_start = song_start["first_line"]
            if song_id_start is None:
                if debug:
                    print("NO SONG FOUND IN THIS SONG BLOCK")
                break

            # Check if song covers rest of song block
            if song_id_start == song_id_end:
                if debug:
                    print(f"SONG COMPLETED: {song_start}")
                
                song_blocks[block_id].append({
                    "id": song_id_start,
                    "first_line": song_first_line_start,
                    "start": song_start_time,
                    "end": block_end_time,
                    "start_format": _format_timestamp(song_start_time),
                    "end_format": _format_timestamp(block_end_time),
                    "link": f"{youtube_url}&t={song_start_time}",
                    })
                break # Entire block is one song, we are done with this block

            # ============= Multiple songs in song block =============
            if debug:
                print(f"MULTIPLE SONGS IN BLOCK - commence binary search..")

            # Binary search for the transition point
            end_time = _get_song_end_time(song_start_time_shifted, block_end_time_shifted, lyric_start_row, lyric_end_row, width, song_id_start, cap, debug=debug)

            # Assume delay in changing lyrics to new song
            end_time -= 5

            if debug:
                print(f"SONG COMPLETED: {song_start}")

            song_blocks[block_id].append({
                "id": song_id_start,
                "first_line": song_first_line_start,
                "start": song_start_time,
                "end": end_time,
                "start_format": _format_timestamp(song_start_time),
                "end_format": _format_timestamp(end_time),
                "link": f"{youtube_url}&t={song_start_time}",
                })
            
            song_start_time = end_time  # Move to the start of the next song
            song_start_time_shifted = song_start_time + SHIFT_TIME

    cap.release()
    return song_blocks


def _format_timestamp(seconds: int) -> str:
    h, r = divmod(int(seconds), 3600)
    m, s = divmod(r, 60)
    return f"{h:02d}:{m:02d}:{s:02d}"


def _identify_songs_from_lyrics(search_text, df_lyrics, threshold=80):
    search_text_cleaned = clean_text(search_text)
    
    if not search_text_cleaned or len(search_text_cleaned) < 10:
        return []

    # extract returns a list of (string, score, index) tuples
    results = process.extract(
        search_text_cleaned, 
        df_lyrics['cleaned'], 
        scorer=fuzz.partial_ratio,
        # scorer=fuzz.token_set_ratio,  # could try this instead
        score_cutoff=threshold,
        limit=5 
    )

    matches = []
    for _, score, idx in results:
        match_row = df_lyrics.iloc[idx]
        matches.append({
            "id": int(match_row["song_id"]),
            "first_line": match_row["first_line"],
            "score": round(float(score), 1)
        })

    # RapidFuzz's extract automatically sorts by score DESC
    return matches


def _display_images(lyric_zone, thresh):
    """View original image frame and formatted frame seen by OCR"""
    fig, ax = plt.subplots(1, 2, figsize=(15, 5))

    ax[0].imshow(cv2.cvtColor(lyric_zone, cv2.COLOR_BGR2RGB))
    ax[0].set_title("Original Crop")

    ax[1].imshow(thresh, cmap='gray')
    ax[1].set_title("Thresholded (OCR Input)")

    plt.show()


def _get_screen_text(frame, lyric_start_row, lyric_end_row, width, debug):
    """Return text shown on specified region of screen"""
    
    # 1. CROP: Keep only the bottom section
    lyric_zone = frame[lyric_start_row:lyric_end_row, 0:width]

    # 2. GRAYSCALE: Process only the small cropped area
    gray_lyric = cv2.cvtColor(lyric_zone, cv2.COLOR_BGR2GRAY)

    # 3. THRESHOLD: High Contrast (Makes OCR 2x more accurate)
    _, thresh = cv2.threshold(gray_lyric, 200, 255, cv2.THRESH_BINARY)
    
    # 4. OCR: read lyrics
    results = reader.readtext(thresh, detail=0)
    if not results:
        if debug:
            _display_images(lyric_zone, thresh)
        return None

    # 5. FORMAT: convert from list to string
    text = " ".join(results)    
    if len(text) < 20:  # catch random noise or too few lyrics
        return None
    
    return text


def _get_song_end_time(left, right, lyric_start_row, lyric_end_row, width, song_id, cap, debug):
    """Binary search for time when changes from one song to another"""

    while (right - left) > 2:  # Stop when within 2 seconds
        mid = (left + right) // 2
        
        song = _get_best_matching_song(cap, lyric_start_row, lyric_end_row, width, mid, debug=debug)
        if not song:
            # Catch no song match (e.g. lyrics not displayed on screen)
            right = right - 10
            continue
        
        mid_id = song["id"]

        if mid_id == song_id:
            left = mid
        else:
            right = mid

    return left


def _get_best_matching_song(cap, lyric_start_row, lyric_end_row, width, sec, debug):
    cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000)
    ret, frame = cap.read()
    if not ret:
        return None

    text = _get_screen_text(frame, lyric_start_row, lyric_end_row, width, debug=debug)
    songs = _identify_songs_from_lyrics(text, df_lyrics)
    
    if not songs:
        if debug:
            print(f"{sec}s: NO MATCH FOUND: text={text}")
        return None

    return songs[0]


def display_song_block_summary(song_blocks):
    for block_id in song_blocks.keys():
        print("---".ljust(80))
        for song in song_blocks[block_id]:
            print(f"{song["start_format"]} - {song["link"]} - {song["first_line"]}")
    

Retrieve music blocks from CSV (from notebook 03)

In [None]:
music_blocks = pd.read_csv(RESULTS_DIR / "music_blocks.csv")
music_blocks

Run function for populating song blocks with songs

In [None]:
song_blocks = populate_song_blocks(music_blocks, file_path=OUTPUT_VIDEO, youtube_url=YOUTUBE_URL, debug=False)
display_song_block_summary(song_blocks)