# 01 — Jamendo CC Catalog Enumeration

Enumerate Jamendo music catalog via API and compute metadata statistics for CC-BY and CC-BY-SA licensed tracks only.

Outputs: summary JSON, CSV breakdowns, optional Parquet archive.

In [1]:
%%bash
set -euo pipefail
export JAMENDO_CLIENT_ID="48ecf016"

In [2]:
from pathlib import Path
import os
import json
from datetime import datetime

# Base paths
BASE_DIR = Path("/root/workspace")
OUTPUT_DIR = BASE_DIR / "data" / "jamendo_cc_catalog"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
JAMENDO_CLIENT_ID="48ecf016"
# API Configuration

if not JAMENDO_CLIENT_ID:
    raise RuntimeError("Set JAMENDO_CLIENT_ID environment variable")

JAMENDO_API_BASE = "https://api.jamendo.com/v3.0"

# Processing Configuration
MAX_PAGES = int(os.environ.get("MAX_PAGES", "0"))  # 0 = all, 5 = dry run
PAGE_SIZE = 200  # Max allowed by Jamendo API
CHECKPOINT_INTERVAL = 10  # Save state every N pages
REQUEST_TIMEOUT = 30
RETRY_MAX_ATTEMPTS = 5
RETRY_BACKOFF_FACTOR = 2

# License Allowlist (strict)
ALLOWED_LICENSES = {"cc-by", "cc-by-sa"}

# Output files
STATE_FILE = OUTPUT_DIR / "state.json"
JSONL_FILE = OUTPUT_DIR / "jamendo_cc_tracks_metadata.jsonl"
SUMMARY_FILE = OUTPUT_DIR / "jamendo_cc_hours_summary.json"
LICENSE_CSV = OUTPUT_DIR / "jamendo_cc_hours_by_license.csv"
DURATION_CSV = OUTPUT_DIR / "jamendo_cc_duration_stats.csv"
PARQUET_FILE = OUTPUT_DIR / "jamendo_cc_tracks_metadata.parquet"

print(f"Output directory: {OUTPUT_DIR}")
print(f"Client ID configured: {'✓' if JAMENDO_CLIENT_ID else '✗'}")
print(f"Mode: {'DRY RUN (max ' + str(MAX_PAGES) + ' pages)' if MAX_PAGES > 0 else 'FULL CRAWL'}")

Output directory: /root/workspace/data/jamendo_cc_catalog
Client ID configured: ✓
Mode: FULL CRAWL


In [3]:
%pip install -q httpx pandas pyarrow tqdm tenacity

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
import httpx
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from typing import Optional, Dict, List, Any
import time

In [5]:
def canonicalize_license(license_ccurl: Optional[str]) -> Optional[str]:
    """
    Parse Creative Commons license URL and return canonical form.
    
    Returns: 'cc-by', 'cc-by-sa', or None (for rejected licenses)
    
    Accepts:
    - CC-BY 3.0/4.0: http://creativecommons.org/licenses/by/3.0/
    - CC-BY-SA 3.0/4.0: http://creativecommons.org/licenses/by-sa/3.0/
    
    Rejects (returns None):
    - CC-BY-NC (NonCommercial)
    - CC-BY-ND (NoDerivatives)
    - CC-BY-NC-SA
    - CC-BY-NC-ND
    - Any other license
    """
    if not license_ccurl or not isinstance(license_ccurl, str):
        return None
    
    # Normalize URL
    url_lower = license_ccurl.lower().strip().rstrip('/')
    
    # Extract license type from URL pattern
    # Expected: http(s)://creativecommons.org/licenses/{type}/{version}/
    if 'creativecommons.org/licenses/' not in url_lower:
        return None
    
    # Extract type component
    parts = url_lower.split('creativecommons.org/licenses/')
    if len(parts) != 2:
        return None
    
    license_part = parts[1].split('/')[0]  # Get type before version
    
    # Strict allowlist matching
    if license_part == 'by':
        return 'cc-by'
    elif license_part == 'by-sa':
        return 'cc-by-sa'
    else:
        # Reject: by-nc, by-nd, by-nc-sa, by-nc-nd, etc.
        return None

# Unit tests
assert canonicalize_license("http://creativecommons.org/licenses/by/3.0/") == "cc-by"
assert canonicalize_license("http://creativecommons.org/licenses/by-sa/4.0/") == "cc-by-sa"
assert canonicalize_license("http://creativecommons.org/licenses/by-nc/3.0/") is None
assert canonicalize_license("http://creativecommons.org/licenses/by-nc-sa/3.0/") is None
assert canonicalize_license("http://creativecommons.org/licenses/by-nd/3.0/") is None
assert canonicalize_license(None) is None
assert canonicalize_license("") is None
print("✓ License canonicalization tests passed")

✓ License canonicalization tests passed


In [6]:
class JamendoAPIClient:
    """Jamendo API client with retry logic and rate limiting."""
    
    def __init__(self, client_id: str, timeout: int = 30):
        self.client_id = client_id
        self.timeout = timeout
        self.client = httpx.Client(timeout=timeout)
        self.last_request_time = 0
        self.min_request_interval = 0.1  # 100ms between requests
    
    def _rate_limit(self):
        """Simple rate limiting: ensure minimum interval between requests."""
        elapsed = time.time() - self.last_request_time
        if elapsed < self.min_request_interval:
            time.sleep(self.min_request_interval - elapsed)
        self.last_request_time = time.time()
    
    @retry(
        stop=stop_after_attempt(RETRY_MAX_ATTEMPTS),
        wait=wait_exponential(multiplier=RETRY_BACKOFF_FACTOR, min=1, max=60),
        retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError))
    )
    def fetch_tracks(
        self, 
        offset: int = 0, 
        limit: int = 200,
        include_fullcount: bool = False
    ) -> Dict[str, Any]:
        """
        Fetch tracks from Jamendo API with pagination.
        
        Returns: {"headers": {...}, "results": [...]}
        """
        self._rate_limit()
        
        params = {
            "client_id": self.client_id,
            "format": "json",
            "limit": limit,
            "offset": offset,
            "audiodownload": "true",  # Only downloadable tracks
        }
        
        if include_fullcount:
            params["fullcount"] = "true"
        
        url = f"{JAMENDO_API_BASE}/tracks/"
        
        try:
            response = self.client.get(url, params=params)
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                # Rate limit - tenacity will retry with backoff
                print(f"⚠ Rate limited (429), will retry...")
                raise
            elif e.response.status_code >= 500:
                # Server error - tenacity will retry
                print(f"⚠ Server error ({e.response.status_code}), will retry...")
                raise
            else:
                # Client error - don't retry
                print(f"✗ Client error ({e.response.status_code}): {e}")
                raise RuntimeError(f"API error: {e.response.status_code}") from e
    
    def close(self):
        self.client.close()

# Test client initialization
api_client = JamendoAPIClient(JAMENDO_CLIENT_ID, timeout=REQUEST_TIMEOUT)
print("✓ API client initialized")

✓ API client initialized


In [7]:
def load_state() -> Dict[str, Any]:
    """Load checkpoint state or return initial state."""
    if STATE_FILE.exists():
        with open(STATE_FILE, 'r') as f:
            state = json.load(f)
        print(f"✓ Loaded checkpoint: offset={state['last_offset']}, fetched={state['total_fetched']}")
        return state
    else:
        return {
            "last_offset": 0,
            "total_fetched": 0,
            "total_passed_filter": 0,
            "total_rejected": 0,
            "total_catalog_size": None,
            "start_time": datetime.utcnow().isoformat(),
            "last_update_time": None
        }

def save_state(state: Dict[str, Any]):
    """Save checkpoint state."""
    state["last_update_time"] = datetime.utcnow().isoformat()
    with open(STATE_FILE, 'w') as f:
        json.dump(state, f, indent=2)

# Load or initialize state
state = load_state()

✓ Loaded checkpoint: offset=596000, fetched=596000


In [8]:
def process_track(track: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """
    Filter and extract metadata from a track.
    Returns None if track doesn't pass license filter.
    """
    license_url = track.get("license_ccurl")
    canonical_license = canonicalize_license(license_url)
    
    if canonical_license is None:
        return None
    
    # Extract relevant metadata
    return {
        "id": track.get("id"),
        "name": track.get("name"),
        "duration": track.get("duration"),  # in seconds
        "artist_id": track.get("artist_id"),
        "artist_name": track.get("artist_name"),
        "album_id": track.get("album_id"),
        "album_name": track.get("album_name"),
        "license": canonical_license,
        "license_ccurl": license_url,
        "releasedate": track.get("releasedate"),
        "audiodownload_allowed": track.get("audiodownload_allowed"),
    }

# Main crawl loop
offset = state["last_offset"]
page_count = 0
audit_passed = []
audit_rejected = []

# Open JSONL in append mode
jsonl_mode = 'a' if offset > 0 else 'w'
jsonl_file = open(JSONL_FILE, jsonl_mode)

try:
    # Get total catalog size on first request
    if state["total_catalog_size"] is None:
        print("Fetching catalog size...")
        first_response = api_client.fetch_tracks(offset=0, limit=1, include_fullcount=True)
        total_size = first_response["headers"].get("results_fullcount", 0)
        state["total_catalog_size"] = total_size
        print(f"Total catalog size: {total_size:,} tracks")
    
    total_size = state["total_catalog_size"]
    total_pages = (total_size // PAGE_SIZE) + (1 if total_size % PAGE_SIZE else 0)
    
    if MAX_PAGES > 0:
        total_pages = min(total_pages, MAX_PAGES)
        print(f"DRY RUN: Processing {total_pages} pages only")
    
    # Progress bar
    with tqdm(total=total_pages, initial=offset // PAGE_SIZE, desc="Crawling pages", unit="page") as pbar:
        while True:
            # Check if we've reached max pages (dry run mode)
            if MAX_PAGES > 0 and page_count >= MAX_PAGES:
                print(f"Reached MAX_PAGES limit ({MAX_PAGES})")
                break
            
            # Check if we've exhausted the catalog
            if offset >= total_size:
                print(f"Completed: processed all {total_size:,} tracks")
                break
            
            # Fetch page
            response = api_client.fetch_tracks(offset=offset, limit=PAGE_SIZE)
            results = response.get("results", [])
            
            if not results:
                print(f"No more results at offset {offset}")
                break
            
            # Process tracks
            for track in results:
                state["total_fetched"] += 1
                
                processed = process_track(track)
                if processed:
                    # Write to JSONL immediately (streaming)
                    jsonl_file.write(json.dumps(processed) + '\n')
                    state["total_passed_filter"] += 1
                    
                    # Collect audit samples
                    if len(audit_passed) < 10:
                        audit_passed.append({
                            "id": track.get("id"),
                            "name": track.get("name"),
                            "license_ccurl": track.get("license_ccurl"),
                            "canonical": processed["license"]
                        })
                else:
                    state["total_rejected"] += 1
                    
                    # Collect audit samples
                    if len(audit_rejected) < 10:
                        audit_rejected.append({
                            "id": track.get("id"),
                            "name": track.get("name"),
                            "license_ccurl": track.get("license_ccurl"),
                            "reason": "license_not_allowed"
                        })
            
            # Update state
            offset += len(results)
            state["last_offset"] = offset
            page_count += 1
            
            # Checkpoint periodically
            if page_count % CHECKPOINT_INTERVAL == 0:
                jsonl_file.flush()
                save_state(state)
            
            pbar.update(1)
            pbar.set_postfix({
                "passed": state["total_passed_filter"],
                "rejected": state["total_rejected"]
            })
    
    # Final checkpoint
    jsonl_file.flush()
    save_state(state)
    
finally:
    jsonl_file.close()
    api_client.close()

print("\n" + "="*60)
print("CRAWL COMPLETE")
print("="*60)
print(f"Total tracks fetched: {state['total_fetched']:,}")
print(f"Passed filter (CC-BY/CC-BY-SA): {state['total_passed_filter']:,}")
print(f"Rejected (NC/ND/other): {state['total_rejected']:,}")
print(f"Pass rate: {100 * state['total_passed_filter'] / state['total_fetched']:.1f}%")

Crawling pages:  70%|######9   | 2980/4258 [00:00<?, ?page/s]

KeyboardInterrupt: 

In [None]:
print("\n" + "="*60)
print("AUDIT TRAIL - Sample Tracks")
print("="*60)

print("\n✓ PASSED FILTER (CC-BY/CC-BY-SA):")
for i, track in enumerate(audit_passed, 1):
    print(f"{i}. [{track['id']}] {track['name']}")
    print(f"   License URL: {track['license_ccurl']}")
    print(f"   Canonical: {track['canonical']}")
    print()

print("\n✗ REJECTED (NC/ND/other):")
for i, track in enumerate(audit_rejected, 1):
    print(f"{i}. [{track['id']}] {track['name']}")
    print(f"   License URL: {track['license_ccurl']}")
    print(f"   Reason: {track['reason']}")
    print()

In [None]:
# Load JSONL into DataFrame
print("Loading metadata for aggregation...")
df = pd.read_json(JSONL_FILE, lines=True)
print(f"Loaded {len(df):,} tracks")
print(f"\nColumns: {list(df.columns)}")
print(f"\nFirst few rows:")
df.head()

In [None]:
# Summary statistics
total_tracks = len(df)
total_duration_seconds = df['duration'].sum()
total_hours = total_duration_seconds / 3600

summary = {
    "generated_at": datetime.utcnow().isoformat(),
    "source": "jamendo_api",
    "license_filter": list(ALLOWED_LICENSES),
    "total_tracks": int(total_tracks),
    "total_duration_seconds": float(total_duration_seconds),
    "total_duration_hours": float(total_hours),
    "total_catalog_tracks_fetched": state["total_fetched"],
    "pass_rate_percent": float(100 * state["total_passed_filter"] / state['total_fetched']),
    "crawl_start_time": state["start_time"],
    "crawl_end_time": state["last_update_time"],
}

# Save summary JSON
with open(SUMMARY_FILE, 'w') as f:
    json.dump(summary, f, indent=2)

print("="*60)
print("SUMMARY STATISTICS")
print("="*60)
print(f"Total tracks: {summary['total_tracks']:,}")
print(f"Total duration: {summary['total_duration_hours']:,.1f} hours")
print(f"Average track duration: {total_duration_seconds / total_tracks / 60:.1f} minutes")
print(f"Pass rate: {summary['pass_rate_percent']:.1f}%")
print(f"\n✓ Saved: {SUMMARY_FILE}")

In [None]:
# Group by license type
license_stats = df.groupby('license').agg({
    'duration': ['count', 'sum']
}).reset_index()

license_stats.columns = ['license', 'track_count', 'total_duration_seconds']
license_stats['total_duration_hours'] = license_stats['total_duration_seconds'] / 3600
license_stats['percentage_of_tracks'] = 100 * license_stats['track_count'] / total_tracks

# Save CSV
license_stats.to_csv(LICENSE_CSV, index=False)

print("="*60)
print("BREAKDOWN BY LICENSE TYPE")
print("="*60)
print(license_stats.to_string(index=False))
print(f"\n✓ Saved: {LICENSE_CSV}")

In [None]:
# Compute duration percentiles
duration_minutes = df['duration'] / 60

duration_stats = pd.DataFrame({
    'metric': ['mean', 'median (p50)', 'p90', 'p95', 'p99', 'min', 'max'],
    'duration_minutes': [
        duration_minutes.mean(),
        duration_minutes.quantile(0.50),
        duration_minutes.quantile(0.90),
        duration_minutes.quantile(0.95),
        duration_minutes.quantile(0.99),
        duration_minutes.min(),
        duration_minutes.max(),
    ]
})

# Save CSV
duration_stats.to_csv(DURATION_CSV, index=False)

print("="*60)
print("DURATION STATISTICS")
print("="*60)
print(duration_stats.to_string(index=False))
print(f"\n✓ Saved: {DURATION_CSV}")

In [None]:
# Save as Parquet (more efficient for large datasets)
try:
    df.to_parquet(PARQUET_FILE, index=False, compression='snappy')
    print(f"✓ Saved Parquet: {PARQUET_FILE}")
    print(f"  JSONL size: {JSONL_FILE.stat().st_size / 1024 / 1024:.1f} MB")
    print(f"  Parquet size: {PARQUET_FILE.stat().st_size / 1024 / 1024:.1f} MB")
except Exception as e:
    print(f"⚠ Failed to save Parquet: {e}")

In [None]:
print("\n" + "="*60)
print("ALL OUTPUTS GENERATED")
print("="*60)
print(f"\nOutput directory: {OUTPUT_DIR}")
print(f"\nFiles created:")
print(f"  1. {JSONL_FILE.name} - Raw filtered metadata (append-only)")
print(f"  2. {SUMMARY_FILE.name} - Topline metrics")
print(f"  3. {LICENSE_CSV.name} - License breakdown")
print(f"  4. {DURATION_CSV.name} - Duration statistics")
print(f"  5. {STATE_FILE.name} - Checkpoint state (for resume)")
if PARQUET_FILE.exists():
    print(f"  6. {PARQUET_FILE.name} - Parquet archive (optional)")

print(f"\n✓ Pipeline complete!")
print(f"\nTo resume crawl if interrupted:")
print(f"  - Re-run this notebook (it will resume from offset {state['last_offset']})")
print(f"\nTo start fresh:")
print(f"  - Delete {STATE_FILE} and {JSONL_FILE}")