In [1]:
import pandas as pd
import numpy as np
import sqlite3
import requests
import time
import hashlib
from pathlib import Path
from PIL import Image, ImageFilter
import cv2
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict, Tuple, Optional, Any
import logging
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
import pickle
import json
from urllib.parse import urlparse
from contextlib import contextmanager

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class ImageFeatures:
    """Container for extracted image features"""
    image_id: str
    color_histogram: np.ndarray
    texture_features: np.ndarray
    edge_features: np.ndarray
    dominant_colors: List[Tuple[int, int, int]]
    brightness: float
    contrast: float
    saturation: float
    complexity_score: float
    
class AlbumCoverAnalyzer:
    """Advanced computer vision analysis for album covers"""
    
    def __init__(self, target_size: Tuple[int, int] = (224, 224)):
        self.target_size = target_size
        
    def extract_color_features(self, image: Image.Image) -> Tuple[np.ndarray, List[Tuple[int, int, int]]]:
        """Extract color histogram and dominant colors"""
        # Convert to RGB if needed
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        # Resize for consistent analysis
        image_resized = image.resize(self.target_size)
        img_array = np.array(image_resized)
        
        # Color histogram (8 bins per channel)
        hist_r = np.histogram(img_array[:,:,0], bins=8, range=(0, 256))[0]
        hist_g = np.histogram(img_array[:,:,1], bins=8, range=(0, 256))[0]
        hist_b = np.histogram(img_array[:,:,2], bins=8, range=(0, 256))[0]
        color_hist = np.concatenate([hist_r, hist_g, hist_b])
        color_hist = color_hist / np.sum(color_hist)  # Normalize
        
        # Dominant colors using K-means
        pixels = img_array.reshape(-1, 3)
        kmeans = KMeans(n_clusters=5, random_state=42, n_init=10)
        kmeans.fit(pixels)
        dominant_colors = [(int(c[0]), int(c[1]), int(c[2])) for c in kmeans.cluster_centers_]
        
        return color_hist, dominant_colors
    
    def extract_texture_features(self, image: Image.Image) -> np.ndarray:
        """Extract texture features using Local Binary Patterns approximation"""
        # Convert to grayscale
        gray = image.convert('L').resize(self.target_size)
        gray_array = np.array(gray)
        
        # Simple texture analysis using gradient magnitudes
        # Calculate gradients
        grad_x = cv2.Sobel(gray_array, cv2.CV_64F, 1, 0, ksize=3)
        grad_y = cv2.Sobel(gray_array, cv2.CV_64F, 0, 1, ksize=3)
        magnitude = np.sqrt(grad_x**2 + grad_y**2)
        
        # Texture histogram (16 bins)
        texture_hist = np.histogram(magnitude, bins=16, range=(0, 255))[0]
        texture_hist = texture_hist / np.sum(texture_hist)  # Normalize
        
        # Additional texture measures
        variance = np.var(gray_array)
        mean_gradient = np.mean(magnitude)
        
        return np.concatenate([texture_hist, [variance / 1000, mean_gradient / 100]])
    
    def extract_edge_features(self, image: Image.Image) -> np.ndarray:
        """Extract edge-based features"""
        # Convert to grayscale
        gray = image.convert('L').resize(self.target_size)
        gray_array = np.array(gray)
        
        # Canny edge detection
        edges = cv2.Canny(gray_array, 50, 150)
        
        # Edge density and distribution
        edge_density = np.sum(edges > 0) / edges.size
        
        # Edge direction histogram
        grad_x = cv2.Sobel(gray_array, cv2.CV_64F, 1, 0, ksize=3)
        grad_y = cv2.Sobel(gray_array, cv2.CV_64F, 0, 1, ksize=3)
        angles = np.arctan2(grad_y, grad_x)
        angle_hist = np.histogram(angles, bins=8, range=(-np.pi, np.pi))[0]
        angle_hist = angle_hist / np.sum(angle_hist) if np.sum(angle_hist) > 0 else angle_hist
        
        return np.concatenate([angle_hist, [edge_density]])
    
    def extract_global_features(self, image: Image.Image) -> Tuple[float, float, float, float]:
        """Extract global image properties"""
        # Convert to different color spaces
        rgb_img = image.convert('RGB').resize(self.target_size)
        hsv_img = rgb_img.convert('HSV')
        
        rgb_array = np.array(rgb_img)
        hsv_array = np.array(hsv_img)
        
        # Brightness (average luminance)
        brightness = np.mean(rgb_array)
        
        # Contrast (standard deviation of luminance)
        gray = np.mean(rgb_array, axis=2)
        contrast = np.std(gray)
        
        # Saturation (average saturation in HSV)
        saturation = np.mean(hsv_array[:,:,1])
        
        # Complexity (edge density + color variance)
        edges = cv2.Canny(gray.astype(np.uint8), 50, 150)
        edge_density = np.sum(edges > 0) / edges.size
        color_variance = np.var(rgb_array.reshape(-1, 3), axis=0).mean()
        complexity_score = edge_density + (color_variance / 1000)
        
        return brightness, contrast, saturation, complexity_score
    
    def analyze_image(self, image: Image.Image, image_id: str) -> ImageFeatures:
        """Complete feature extraction pipeline"""
        try:
            # Extract all feature types
            color_hist, dominant_colors = self.extract_color_features(image)
            texture_features = self.extract_texture_features(image)
            edge_features = self.extract_edge_features(image)
            brightness, contrast, saturation, complexity = self.extract_global_features(image)
            
            return ImageFeatures(
                image_id=image_id,
                color_histogram=color_hist,
                texture_features=texture_features,
                edge_features=edge_features,
                dominant_colors=dominant_colors,
                brightness=brightness,
                contrast=contrast,
                saturation=saturation,
                complexity_score=complexity
            )
        except Exception as e:
            logger.error(f"Failed to analyze image {image_id}: {e}")
            # Return default features
            return ImageFeatures(
                image_id=image_id,
                color_histogram=np.zeros(24),
                texture_features=np.zeros(18),
                edge_features=np.zeros(9),
                dominant_colors=[(0, 0, 0)] * 5,
                brightness=0.0,
                contrast=0.0,
                saturation=0.0,
                complexity_score=0.0
            )

# Initialize analyzer
analyzer = AlbumCoverAnalyzer()

In [3]:
# Fixed Cover Generation - Adapts to Your Actual Data Structure

import sqlite3
from pathlib import Path
from PIL import Image, ImageDraw
import numpy as np
import hashlib

# Setup paths
PROJECT_ROOT = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
DATA_DIR = PROJECT_ROOT / 'data'
DB_PATH = DATA_DIR / 'database' / 'vinyl_catalog.db'
COVERS_DIR = DATA_DIR / 'album_covers'

print("Investigating actual data structure...")

# First, let's see what columns actually exist
with sqlite3.connect(str(DB_PATH)) as conn:
    cursor = conn.execute("PRAGMA table_info(releases)")
    columns = [row[1] for row in cursor.fetchall()]
    print(f"Available columns in releases table: {columns}")
    
    # Get a sample of actual data to see what we're working with
    cursor = conn.execute("SELECT * FROM releases LIMIT 3")
    sample_data = cursor.fetchall()
    
    print(f"\nSample data (first 3 records):")
    for i, row in enumerate(sample_data):
        print(f"Record {i+1}:")
        for j, col in enumerate(columns):
            if row[j] is not None:
                print(f"  {col}: {row[j]}")
    
    # Check what could serve as unique identifiers
    print(f"\nChecking potential identifiers:")
    
    # Check for non-null values in key columns
    key_columns = ['release_id', 'discogs_id', 'title', 'artist']
    for col in key_columns:
        if col in columns:
            cursor = conn.execute(f"SELECT COUNT(*) FROM releases WHERE {col} IS NOT NULL")
            count = cursor.fetchone()[0]
            print(f"  {col}: {count} non-null values")

print(f"\n" + "="*60)
print(f"CREATING COVERS WITH FLEXIBLE APPROACH")
print(f"="*60)

def create_genre_based_cover(release_info):
    """Create a cover based on available release information"""
    size = (300, 300)
    
    # Genre-based color palettes
    genre_colors = {
        'rock': [(220, 50, 50), (180, 40, 40), (140, 30, 30)],
        'pop': [(255, 20, 147), (200, 15, 110), (150, 10, 80)],
        'jazz': [(50, 100, 150), (40, 80, 120), (30, 60, 90)],
        'electronic': [(100, 255, 100), (70, 200, 70), (40, 150, 40)],
        'classical': [(128, 0, 128), (100, 0, 100), (70, 0, 70)],
        'hip hop': [(255, 215, 0), (200, 170, 0), (150, 120, 0)],
        'country': [(139, 69, 19), (110, 55, 15), (80, 40, 10)],
        'folk': [(34, 139, 34), (25, 100, 25), (15, 70, 15)],
        'blues': [(0, 0, 139), (0, 0, 100), (0, 0, 70)],
        'reggae': [(255, 255, 0), (200, 200, 0), (150, 150, 0)],
    }
    
    # Default colors for unknown genres
    default_colors = [(100, 100, 100), (70, 70, 70), (40, 40, 40)]
    
    # Get genre and normalize it
    genre = str(release_info.get('genre', 'unknown')).lower().strip()
    colors = genre_colors.get(genre, default_colors)
    
    # Find closest match for partial genre names
    if colors == default_colors and genre != 'unknown':
        for known_genre, palette in genre_colors.items():
            if known_genre in genre or genre in known_genre:
                colors = palette
                break
    
    # Create image
    img = Image.new('RGB', size, color=colors[2])
    draw = ImageDraw.Draw(img)
    
    # Create gradient background
    center_x, center_y = size[0] // 2, size[1] // 2
    max_radius = min(center_x, center_y)
    
    for y in range(size[1]):
        for x in range(size[0]):
            # Distance from center
            dx, dy = x - center_x, y - center_y
            distance = (dx*dx + dy*dy) ** 0.5
            
            # Normalize distance (0 to 1)
            norm_dist = min(distance / max_radius, 1.0)
            
            # Interpolate between colors
            if norm_dist < 0.5:
                # Blend between color[0] and color[1]
                blend = norm_dist * 2
                color = tuple(
                    int(colors[0][i] * (1-blend) + colors[1][i] * blend)
                    for i in range(3)
                )
            else:
                # Blend between color[1] and color[2]
                blend = (norm_dist - 0.5) * 2
                color = tuple(
                    int(colors[1][i] * (1-blend) + colors[2][i] * blend)
                    for i in range(3)
                )
            
            draw.point((x, y), fill=color)
    
    # Add decorative elements based on year
    year = release_info.get('year')
    if year:
        try:
            year = int(year)
            if year < 1970:
                # Vintage style - concentric circles
                for i in range(3):
                    radius = 30 + i * 20
                    draw.ellipse([center_x-radius, center_y-radius, 
                                center_x+radius, center_y+radius], 
                               outline=colors[0], width=2)
            elif year >= 1970 and year < 1990:
                # 70s-80s style - geometric shapes
                draw.polygon([center_x, center_y-50, center_x+50, center_y+50, 
                            center_x-50, center_y+50], outline=colors[0], width=3)
            elif year >= 1990 and year < 2010:
                # 90s-2000s style - rectangles
                draw.rectangle([center_x-60, center_y-40, center_x+60, center_y+40], 
                             outline=colors[0], width=2)
            else:
                # Modern style - minimalist line
                draw.line([50, center_y, size[0]-50, center_y], fill=colors[0], width=4)
        except (ValueError, TypeError):
            pass  # Skip year-based decoration if year is not a valid number
    
    return img

def generate_covers_flexible():
    """Generate covers using whatever data is available"""
    COVERS_DIR.mkdir(parents=True, exist_ok=True)
    
    with sqlite3.connect(str(DB_PATH)) as conn:
        conn.row_factory = sqlite3.Row
        
        # Use flexible query - don't require discogs_id
        cursor = conn.execute("""
            SELECT release_id, title, artist, album, year, genre, 
                   popularity_score, rating, discogs_id
            FROM releases 
            WHERE title IS NOT NULL AND artist IS NOT NULL
            ORDER BY CASE 
                WHEN popularity_score IS NOT NULL THEN popularity_score 
                ELSE 0 
            END DESC,
            CASE 
                WHEN rating IS NOT NULL THEN rating 
                ELSE 0 
            END DESC
            LIMIT 100
        """)
        
        releases = [dict(row) for row in cursor.fetchall()]
        print(f"Found {len(releases)} releases to create covers for")
        
        covers_created = 0
        for release in releases:
            try:
                # Create the cover
                cover_image = create_genre_based_cover(release)
                
                # Generate filename using available identifiers
                if release['discogs_id']:
                    filename = f"cover_discogs_{release['discogs_id']}.png"
                else:
                    filename = f"cover_release_{release['release_id']}.png"
                
                cover_path = COVERS_DIR / filename
                cover_image.save(cover_path)
                
                # Calculate hash
                image_hash = hashlib.md5(cover_image.tobytes()).hexdigest()
                
                # Store in database
                cursor = conn.execute("""
                    INSERT INTO album_covers 
                    (release_id, discogs_id, local_path, image_hash, width, height, file_size)
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                """, (
                    release['release_id'],
                    release['discogs_id'],
                    str(cover_path),
                    image_hash,
                    cover_image.width,
                    cover_image.height,
                    cover_path.stat().st_size if cover_path.exists() else 0
                ))
                
                covers_created += 1
                
                if covers_created % 20 == 0:
                    print(f"Created {covers_created} covers...")
                    
            except Exception as e:
                print(f"Failed to create cover for '{release['title']}': {e}")
                continue
        
        conn.commit()
        print(f"\nSuccessfully created {covers_created} album covers!")
        
        # Verify covers were created
        cursor = conn.execute("SELECT COUNT(*) FROM album_covers WHERE local_path IS NOT NULL")
        db_cover_count = cursor.fetchone()[0]
        
        file_cover_count = len(list(COVERS_DIR.glob("*.png")))
        
        print(f"Verification:")
        print(f"  Covers in database: {db_cover_count}")
        print(f"  Cover files on disk: {file_cover_count}")
        
        return covers_created

# Run the improved cover generation
covers_created = generate_covers_flexible()

if covers_created > 0:
    print(f"\n✅ SUCCESS! Created {covers_created} album covers")
    print(f"📁 Covers saved to: {COVERS_DIR}")
    print(f"\nNow you can re-run Cell 3 from the main notebook to extract visual features!")
    
    # Show a sample of what was created
    print(f"\nSample covers created:")
    with sqlite3.connect(str(DB_PATH)) as conn:
        conn.row_factory = sqlite3.Row
        cursor = conn.execute("""
            SELECT r.artist, r.title, r.genre, r.year, ac.local_path
            FROM album_covers ac
            JOIN releases r ON ac.release_id = r.release_id
            LIMIT 5
        """)
        
        for row in cursor.fetchall():
            print(f"  • {row['artist']} - {row['title']} ({row['year']}) [{row['genre']}]")
            print(f"    File: {Path(row['local_path']).name}")
else:
    print(f"\n❌ Still no covers created. Let's check what's in the releases table:")
    
    with sqlite3.connect(str(DB_PATH)) as conn:
        cursor = conn.execute("""
            SELECT COUNT(*) as total,
                   COUNT(CASE WHEN title IS NOT NULL THEN 1 END) as has_title,
                   COUNT(CASE WHEN artist IS NOT NULL THEN 1 END) as has_artist,
                   COUNT(CASE WHEN genre IS NOT NULL THEN 1 END) as has_genre
            FROM releases
        """)
        
        stats = cursor.fetchone()
        print(f"  Total releases: {stats[0]}")
        print(f"  With title: {stats[1]}")
        print(f"  With artist: {stats[2]}")  
        print(f"  With genre: {stats[3]}")

Investigating actual data structure...
Available columns in releases table: ['release_id', 'discogs_id', 'title', 'artist', 'album', 'year', 'genre', 'label', 'country', 'format', 'status', 'catalog_number', 'master_id', 'popularity_score', 'rating', 'plays', 'favorites', 'duration', 'data_quality', 'source', 'created_at', 'updated_at']

Sample data (first 3 records):
Record 1:
  release_id: 1
  title: Tentacle Hentaij - Core Noisetone
  artist: Ralph Brown
  album: Spettro Records Volume 2 - Decline
  year: 2015
  genre: Experimental
  label: FMA
  country: US
  format: Unknown
  status: Unknown
  catalog_number: Unknown
  popularity_score: 0.0002024291497975
  rating: 2.0
  plays: 0
  favorites: 1
  duration: 265
  data_quality: Unknown
  source: fma_data
  created_at: 2025-09-04 00:41:16
  updated_at: 2025-09-04 00:41:16
Record 2:
  release_id: 2
  title: Goldfish
  artist: Aoiroooasamusi
  album: Root Of Sorrow
  year: 2010
  genre: Unknown
  label: FMA
  country: US
  format: Unkn

In [4]:
# Cell 3 - Feature Extraction and Similarity Computation

import json
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from contextlib import contextmanager
import sqlite3
from pathlib import Path
from PIL import Image
import logging

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class VisualSimilarityEngine:
    """Compute visual similarities between album covers"""
    
    def __init__(self, db_path: Path, analyzer):
        self.db_path = db_path
        self.analyzer = analyzer
        
    @contextmanager
    def get_db_connection(self):
        """Database connection context manager"""
        conn = sqlite3.connect(str(self.db_path))
        conn.row_factory = sqlite3.Row
        try:
            yield conn
        finally:
            conn.close()
    
    def extract_features_for_all_covers(self) -> int:
        """Extract visual features for all album covers"""
        with self.get_db_connection() as conn:
            # Get all covers that don't have features yet
            cursor = conn.execute("""
                SELECT ac.cover_id, ac.local_path, ac.release_id
                FROM album_covers ac
                LEFT JOIN visual_features vf ON ac.cover_id = vf.cover_id
                WHERE vf.feature_id IS NULL AND ac.local_path IS NOT NULL
            """)
            
            covers_to_process = [dict(row) for row in cursor.fetchall()]
        
        logger.info(f"Processing features for {len(covers_to_process)} covers...")
        features_extracted = 0
        
        for cover in covers_to_process:
            try:
                # Load and analyze image
                image_path = Path(cover['local_path'])
                if not image_path.exists():
                    logger.warning(f"Image file not found: {image_path}")
                    continue
                    
                image = Image.open(image_path)
                features = self.analyzer.analyze_image(image, str(cover['cover_id']))
                
                # Store features in database
                self.store_visual_features(cover['cover_id'], features)
                features_extracted += 1
                
                if features_extracted % 10 == 0:
                    logger.info(f"Extracted features for {features_extracted} covers...")
                    
            except Exception as e:
                logger.error(f"Failed to extract features for cover {cover['cover_id']}: {e}")
                continue
        
        logger.info(f"Feature extraction complete: {features_extracted} covers processed")
        return features_extracted
    
    def store_visual_features(self, cover_id: int, features):
        """Store visual features in database"""
        with self.get_db_connection() as conn:
            conn.execute("""
                INSERT OR REPLACE INTO visual_features 
                (cover_id, color_histogram, texture_features, edge_features, 
                 dominant_colors, brightness, contrast_score, saturation, complexity_score)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                cover_id,
                json.dumps(features.color_histogram.tolist()),
                json.dumps(features.texture_features.tolist()),
                json.dumps(features.edge_features.tolist()),
                json.dumps(features.dominant_colors),
                features.brightness,
                features.contrast,
                features.saturation,
                features.complexity_score
            ))
            conn.commit()
    
    def compute_similarity_matrix(self) -> np.ndarray:
        """Compute similarity matrix for all covers"""
        # Load all features
        feature_data = self.load_all_features()
        
        if len(feature_data) < 2:
            logger.warning("Not enough covers with features for similarity computation")
            return np.array([])
        
        # Combine all feature types into feature vectors
        feature_vectors = []
        cover_ids = []
        
        for cover_id, features in feature_data.items():
            # Combine color, texture, and edge features
            combined_features = np.concatenate([
                features['color_histogram'],
                features['texture_features'],
                features['edge_features'],
                [features['brightness'] / 255, features['contrast_score'] / 100, 
                 features['saturation'] / 255, features['complexity_score']]
            ])
            
            feature_vectors.append(combined_features)
            cover_ids.append(cover_id)
        
        # Convert to numpy array and compute similarities
        feature_matrix = np.array(feature_vectors)
        similarity_matrix = cosine_similarity(feature_matrix)
        
        # Store similarities in database
        self.store_similarity_matrix(cover_ids, similarity_matrix)
        
        return similarity_matrix
    
    def load_all_features(self) -> dict:
        """Load all visual features from database"""
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    cover_id, color_histogram, texture_features, edge_features,
                    dominant_colors, brightness, contrast_score, saturation, complexity_score
                FROM visual_features
            """)
            
            feature_data = {}
            for row in cursor.fetchall():
                try:
                    feature_data[row['cover_id']] = {
                        'color_histogram': np.array(json.loads(row['color_histogram'])),
                        'texture_features': np.array(json.loads(row['texture_features'])),
                        'edge_features': np.array(json.loads(row['edge_features'])),
                        'dominant_colors': json.loads(row['dominant_colors']),
                        'brightness': row['brightness'],
                        'contrast_score': row['contrast_score'],
                        'saturation': row['saturation'],
                        'complexity_score': row['complexity_score']
                    }
                except (json.JSONDecodeError, TypeError) as e:
                    logger.warning(f"Failed to load features for cover {row['cover_id']}: {e}")
                    continue
            
            return feature_data
    
    def store_similarity_matrix(self, cover_ids: list, similarity_matrix: np.ndarray):
        """Store similarity scores in database"""
        with self.get_db_connection() as conn:
            # Clear existing similarities
            conn.execute("DELETE FROM visual_similarities")
            
            # Store top similarities for each cover
            for i, cover_id_1 in enumerate(cover_ids):
                similarities = similarity_matrix[i]
                
                # Get top 10 most similar covers (excluding self)
                similar_indices = np.argsort(similarities)[-11:-1][::-1]  # Top 10, excluding self
                
                for j in similar_indices:
                    if similarities[j] > 0.1:  # Only store meaningful similarities
                        cover_id_2 = cover_ids[j]
                        similarity_score = similarities[j]
                        
                        conn.execute("""
                            INSERT INTO visual_similarities 
                            (cover_id_1, cover_id_2, similarity_score, similarity_type)
                            VALUES (?, ?, ?, ?)
                        """, (cover_id_1, cover_id_2, similarity_score, 'overall'))
            
            conn.commit()
            logger.info(f"Stored similarity data for {len(cover_ids)} covers")

# Setup paths (make sure these match your setup)
PROJECT_ROOT = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
DATA_DIR = PROJECT_ROOT / 'data'
DB_PATH = DATA_DIR / 'database' / 'vinyl_catalog.db'

# You need the analyzer from Cell 1 - if you haven't run it, run this:
from PIL import Image, ImageFilter
import cv2
from sklearn.cluster import KMeans
import pandas as pd
from dataclasses import dataclass
from typing import List, Tuple

@dataclass
class ImageFeatures:
    """Container for extracted image features"""
    image_id: str
    color_histogram: np.ndarray
    texture_features: np.ndarray
    edge_features: np.ndarray
    dominant_colors: List[Tuple[int, int, int]]
    brightness: float
    contrast: float
    saturation: float
    complexity_score: float

class AlbumCoverAnalyzer:
    """Computer vision analysis for album covers"""
    
    def __init__(self, target_size: Tuple[int, int] = (224, 224)):
        self.target_size = target_size
        
    def extract_color_features(self, image: Image.Image) -> Tuple[np.ndarray, List[Tuple[int, int, int]]]:
        """Extract color histogram and dominant colors"""
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        image_resized = image.resize(self.target_size)
        img_array = np.array(image_resized)
        
        # Color histogram (8 bins per channel)
        hist_r = np.histogram(img_array[:,:,0], bins=8, range=(0, 256))[0]
        hist_g = np.histogram(img_array[:,:,1], bins=8, range=(0, 256))[0]
        hist_b = np.histogram(img_array[:,:,2], bins=8, range=(0, 256))[0]
        color_hist = np.concatenate([hist_r, hist_g, hist_b])
        color_hist = color_hist / np.sum(color_hist) if np.sum(color_hist) > 0 else color_hist
        
        # Dominant colors using K-means
        pixels = img_array.reshape(-1, 3)
        kmeans = KMeans(n_clusters=5, random_state=42, n_init=10)
        kmeans.fit(pixels)
        dominant_colors = [(int(c[0]), int(c[1]), int(c[2])) for c in kmeans.cluster_centers_]
        
        return color_hist, dominant_colors
    
    def extract_texture_features(self, image: Image.Image) -> np.ndarray:
        """Extract texture features"""
        gray = image.convert('L').resize(self.target_size)
        gray_array = np.array(gray)
        
        # Calculate gradients
        grad_x = cv2.Sobel(gray_array, cv2.CV_64F, 1, 0, ksize=3)
        grad_y = cv2.Sobel(gray_array, cv2.CV_64F, 0, 1, ksize=3)
        magnitude = np.sqrt(grad_x**2 + grad_y**2)
        
        # Texture histogram
        texture_hist = np.histogram(magnitude, bins=16, range=(0, 255))[0]
        texture_hist = texture_hist / np.sum(texture_hist) if np.sum(texture_hist) > 0 else texture_hist
        
        variance = np.var(gray_array)
        mean_gradient = np.mean(magnitude)
        
        return np.concatenate([texture_hist, [variance / 1000, mean_gradient / 100]])
    
    def extract_edge_features(self, image: Image.Image) -> np.ndarray:
        """Extract edge-based features"""
        gray = image.convert('L').resize(self.target_size)
        gray_array = np.array(gray)
        
        edges = cv2.Canny(gray_array, 50, 150)
        edge_density = np.sum(edges > 0) / edges.size
        
        grad_x = cv2.Sobel(gray_array, cv2.CV_64F, 1, 0, ksize=3)
        grad_y = cv2.Sobel(gray_array, cv2.CV_64F, 0, 1, ksize=3)
        angles = np.arctan2(grad_y, grad_x)
        angle_hist = np.histogram(angles, bins=8, range=(-np.pi, np.pi))[0]
        angle_hist = angle_hist / np.sum(angle_hist) if np.sum(angle_hist) > 0 else angle_hist
        
        return np.concatenate([angle_hist, [edge_density]])
    
    def extract_global_features(self, image: Image.Image) -> Tuple[float, float, float, float]:
        """Extract global image properties"""
        rgb_img = image.convert('RGB').resize(self.target_size)
        hsv_img = rgb_img.convert('HSV')
        
        rgb_array = np.array(rgb_img)
        hsv_array = np.array(hsv_img)
        
        brightness = np.mean(rgb_array)
        gray = np.mean(rgb_array, axis=2)
        contrast = np.std(gray)
        saturation = np.mean(hsv_array[:,:,1])
        
        edges = cv2.Canny(gray.astype(np.uint8), 50, 150)
        edge_density = np.sum(edges > 0) / edges.size
        color_variance = np.var(rgb_array.reshape(-1, 3), axis=0).mean()
        complexity_score = edge_density + (color_variance / 1000)
        
        return brightness, contrast, saturation, complexity_score
    
    def analyze_image(self, image: Image.Image, image_id: str):
        """Complete feature extraction pipeline"""
        try:
            color_hist, dominant_colors = self.extract_color_features(image)
            texture_features = self.extract_texture_features(image)
            edge_features = self.extract_edge_features(image)
            brightness, contrast, saturation, complexity = self.extract_global_features(image)
            
            return ImageFeatures(
                image_id=image_id,
                color_histogram=color_hist,
                texture_features=texture_features,
                edge_features=edge_features,
                dominant_colors=dominant_colors,
                brightness=brightness,
                contrast=contrast,
                saturation=saturation,
                complexity_score=complexity
            )
        except Exception as e:
            logger.error(f"Failed to analyze image {image_id}: {e}")
            return ImageFeatures(
                image_id=image_id,
                color_histogram=np.zeros(24),
                texture_features=np.zeros(18),
                edge_features=np.zeros(9),
                dominant_colors=[(0, 0, 0)] * 5,
                brightness=0.0,
                contrast=0.0,
                saturation=0.0,
                complexity_score=0.0
            )

# Initialize analyzer
analyzer = AlbumCoverAnalyzer()

# Initialize similarity engine
similarity_engine = VisualSimilarityEngine(DB_PATH, analyzer)

# Extract features for all covers
print("Extracting visual features from album covers...")
features_extracted = similarity_engine.extract_features_for_all_covers()

# Compute similarity matrix
print("Computing visual similarities...")
similarity_matrix = similarity_engine.compute_similarity_matrix()

print(f"Visual similarity computation complete!")
print(f"Feature vectors created for {features_extracted} album covers")

if len(similarity_matrix) > 0:
    print(f"Similarity matrix shape: {similarity_matrix.shape}")
    print(f"Ready for visual discovery!")
else:
    print("No similarities computed - need more covers with features")

2025-09-03 21:34:35,851 - INFO - Processing features for 13 covers...


Extracting visual features from album covers...


2025-09-03 21:34:37,314 - INFO - Extracted features for 10 covers...
2025-09-03 21:34:37,909 - INFO - Feature extraction complete: 13 covers processed
2025-09-03 21:34:37,917 - INFO - Stored similarity data for 13 covers


Computing visual similarities...
Visual similarity computation complete!
Feature vectors created for 13 album covers
Similarity matrix shape: (13, 13)
Ready for visual discovery!


In [5]:
# Cell 4 - Visual Discovery Interface

import sqlite3
from pathlib import Path
from contextlib import contextmanager
from typing import List, Dict, Tuple, Optional
import json

class VisualDiscoveryInterface:
    """Interface for visual album discovery"""
    
    def __init__(self, db_path: Path):
        self.db_path = db_path
        
    @contextmanager
    def get_db_connection(self):
        """Database connection context manager"""
        conn = sqlite3.connect(str(self.db_path))
        conn.row_factory = sqlite3.Row
        try:
            yield conn
        finally:
            conn.close()
    
    def find_similar_albums(self, release_id: int, limit: int = 10) -> List[Dict]:
        """Find visually similar albums to a given release"""
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    r2.release_id, r2.title, r2.artist, r2.album, r2.year, r2.genre,
                    ac2.local_path as cover_path,
                    vs.similarity_score,
                    vf2.brightness, vf2.saturation, vf2.complexity_score
                FROM visual_similarities vs
                JOIN album_covers ac1 ON vs.cover_id_1 = ac1.cover_id
                JOIN album_covers ac2 ON vs.cover_id_2 = ac2.cover_id
                JOIN releases r2 ON ac2.release_id = r2.release_id
                LEFT JOIN visual_features vf2 ON ac2.cover_id = vf2.cover_id
                WHERE ac1.release_id = ?
                ORDER BY vs.similarity_score DESC
                LIMIT ?
            """, (release_id, limit))
            
            return [dict(row) for row in cursor.fetchall()]
    
    def get_albums_by_visual_style(self, 
                                  brightness_range: Tuple[float, float] = None,
                                  saturation_range: Tuple[float, float] = None,
                                  complexity_range: Tuple[float, float] = None,
                                  genre: str = None,
                                  limit: int = 20) -> List[Dict]:
        """Find albums by visual characteristics"""
        
        where_clauses = []
        params = []
        
        if brightness_range:
            where_clauses.append("vf.brightness BETWEEN ? AND ?")
            params.extend(brightness_range)
            
        if saturation_range:
            where_clauses.append("vf.saturation BETWEEN ? AND ?")
            params.extend(saturation_range)
            
        if complexity_range:
            where_clauses.append("vf.complexity_score BETWEEN ? AND ?")
            params.extend(complexity_range)
            
        if genre:
            where_clauses.append("r.genre LIKE ?")
            params.append(f"%{genre}%")
        
        where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
        
        query = f"""
            SELECT 
                r.release_id, r.title, r.artist, r.album, r.year, r.genre,
                ac.local_path as cover_path,
                vf.brightness, vf.saturation, vf.complexity_score,
                vf.dominant_colors
            FROM releases r
            JOIN album_covers ac ON r.release_id = ac.release_id
            JOIN visual_features vf ON ac.cover_id = vf.cover_id
            WHERE {where_clause}
            ORDER BY r.popularity_score DESC
            LIMIT ?
        """
        
        params.append(limit)
        
        with self.get_db_connection() as conn:
            cursor = conn.execute(query, params)
            return [dict(row) for row in cursor.fetchall()]
    
    def get_color_clusters(self) -> Dict[str, List[Dict]]:
        """Get albums grouped by dominant color themes"""
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    r.release_id, r.title, r.artist, r.album, r.year, r.genre,
                    ac.local_path as cover_path,
                    vf.dominant_colors, vf.brightness, vf.saturation
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                WHERE vf.dominant_colors IS NOT NULL
                ORDER BY r.popularity_score DESC
                LIMIT 50
            """)
            
            albums = [dict(row) for row in cursor.fetchall()]
            
            # Group by color themes
            color_groups = {
                'Red/Warm': [],
                'Blue/Cool': [],
                'Green/Natural': [],
                'Dark/Monochrome': [],
                'Bright/Colorful': []
            }
            
            for album in albums:
                try:
                    dominant_colors = json.loads(album['dominant_colors'])
                    primary_color = dominant_colors[0]  # Most dominant color
                    
                    r, g, b = primary_color
                    brightness = album['brightness']
                    
                    # Simple color classification
                    if brightness < 80:  # Dark
                        color_groups['Dark/Monochrome'].append(album)
                    elif r > g and r > b:  # Red dominant
                        color_groups['Red/Warm'].append(album)
                    elif b > r and b > g:  # Blue dominant
                        color_groups['Blue/Cool'].append(album)
                    elif g > r and g > b:  # Green dominant
                        color_groups['Green/Natural'].append(album)
                    else:  # Mixed/colorful
                        color_groups['Bright/Colorful'].append(album)
                        
                except (json.JSONDecodeError, IndexError):
                    color_groups['Dark/Monochrome'].append(album)
            
            return color_groups
    
    def search_by_visual_keywords(self, keywords: List[str], limit: int = 20) -> List[Dict]:
        """Search albums by visual description keywords"""
        # Map keywords to visual characteristics
        keyword_mapping = {
            'dark': {'brightness_range': (0, 100)},
            'bright': {'brightness_range': (150, 255)},
            'colorful': {'saturation_range': (100, 255)},
            'monochrome': {'saturation_range': (0, 50)},
            'complex': {'complexity_range': (0.5, 2.0)},
            'simple': {'complexity_range': (0.0, 0.3)},
            'vibrant': {'saturation_range': (120, 255), 'brightness_range': (100, 200)},
            'muted': {'saturation_range': (0, 80), 'brightness_range': (50, 150)}
        }
        
        # Combine criteria from keywords
        criteria = {}
        for keyword in keywords:
            if keyword.lower() in keyword_mapping:
                for key, value in keyword_mapping[keyword.lower()].items():
                    if key not in criteria:
                        criteria[key] = value
                    else:
                        # Intersect ranges
                        if isinstance(value, tuple) and isinstance(criteria[key], tuple):
                            criteria[key] = (
                                max(value[0], criteria[key][0]),
                                min(value[1], criteria[key][1])
                            )
        
        return self.get_albums_by_visual_style(limit=limit, **criteria)

    def get_all_albums_with_covers(self) -> List[Dict]:
        """Get all albums that have visual features for testing"""
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    r.release_id, r.title, r.artist, r.album, r.year, r.genre,
                    ac.local_path as cover_path,
                    vf.brightness, vf.saturation, vf.complexity_score
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                ORDER BY r.popularity_score DESC
            """)
            
            return [dict(row) for row in cursor.fetchall()]

# Initialize paths
PROJECT_ROOT = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
DATA_DIR = PROJECT_ROOT / 'data'
DB_PATH = DATA_DIR / 'database' / 'vinyl_catalog.db'

# Initialize visual discovery interface
discovery = VisualDiscoveryInterface(DB_PATH)

print("Visual Discovery Interface Ready!")
print("Available features:")
print("  - find_similar_albums(): Find albums with similar visual style")
print("  - get_albums_by_visual_style(): Filter by brightness, saturation, complexity")
print("  - get_color_clusters(): Group albums by dominant colors")
print("  - search_by_visual_keywords(): Search using descriptive terms")

# Test the interface with your data
print("\nTesting the visual discovery system...")

# Get all albums with covers for testing
all_albums = discovery.get_all_albums_with_covers()
print(f"Found {len(all_albums)} albums with visual features")

if all_albums:
    # Test 1: Find similar albums
    test_album = all_albums[0]
    print(f"\nTEST 1: Finding albums similar to '{test_album['title']}' by {test_album['artist']}")
    
    similar_albums = discovery.find_similar_albums(test_album['release_id'], limit=5)
    if similar_albums:
        print("Similar albums found:")
        for album in similar_albums:
            print(f"  • {album['artist']} - {album['title']} "
                  f"(similarity: {album['similarity_score']:.3f})")
    else:
        print("  No similar albums found")
    
    # Test 2: Search by visual keywords
    print(f"\nTEST 2: Searching for 'dark' and 'complex' albums")
    keyword_results = discovery.search_by_visual_keywords(['dark', 'complex'], limit=5)
    if keyword_results:
        print("Albums matching 'dark' and 'complex':")
        for album in keyword_results:
            print(f"  • {album['artist']} - {album['title']} "
                  f"(brightness: {album['brightness']:.1f}, complexity: {album['complexity_score']:.3f})")
    else:
        print("  No albums found matching these criteria")
    
    # Test 3: Color clustering
    print(f"\nTEST 3: Color-based grouping")
    color_groups = discovery.get_color_clusters()
    for color_theme, albums in color_groups.items():
        if albums:
            print(f"{color_theme}: {len(albums)} albums")
            for album in albums[:2]:  # Show first 2 in each category
                print(f"  • {album['artist']} - {album['title']}")
    
    # Test 4: Visual style filtering
    print(f"\nTEST 4: Albums with high brightness (150-255)")
    bright_albums = discovery.get_albums_by_visual_style(
        brightness_range=(150, 255), 
        limit=5
    )
    if bright_albums:
        print("Bright albums:")
        for album in bright_albums:
            print(f"  • {album['artist']} - {album['title']} "
                  f"(brightness: {album['brightness']:.1f})")
    else:
        print("  No bright albums found")

else:
    print("No albums with visual features found. Make sure Cell 3 completed successfully.")

print(f"\nVisual Discovery System Status:")
print(f"  ✓ Interface initialized")
print(f"  ✓ {len(all_albums)} albums ready for visual search")
print(f"  ✓ Similarity matching operational")
print(f"  ✓ Keyword search operational")
print(f"  ✓ Color clustering operational")

Visual Discovery Interface Ready!
Available features:
  - find_similar_albums(): Find albums with similar visual style
  - get_albums_by_visual_style(): Filter by brightness, saturation, complexity
  - get_color_clusters(): Group albums by dominant colors
  - search_by_visual_keywords(): Search using descriptive terms

Testing the visual discovery system...
Found 13 albums with visual features

TEST 1: Finding albums similar to 'Hachiko (The Faithtful Dog)' by The Kyoto Connection
Similar albums found:
  • Charles Atlas - Photosphere (similarity: 1.000)
  • Jahzzar - Take Me Higher (similarity: 1.000)
  • Bitbasic - Be careful, I've Stood On It Too (similarity: 0.842)
  • Eric Skiff - Chibi Ninja (similarity: 0.841)
  • Latché Swing - Menilmontant (similarity: 0.818)

TEST 2: Searching for 'dark' and 'complex' albums
  No albums found matching these criteria

TEST 3: Color-based grouping
Red/Warm: 2 albums
  • Sleeping Policemen - Vogelbird
  • alright lover - Snow Wave
Green/Natural: 

In [6]:
# Cell 5 - Visual Analytics and Clustering

import numpy as np
import sqlite3
from pathlib import Path
from contextlib import contextmanager
import json
import time
from sklearn.cluster import KMeans
from typing import Dict, List, Any
import logging

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class VisualAnalytics:
    """Analytics and visualization for album cover patterns"""
    
    def __init__(self, db_path: Path):
        self.db_path = db_path
        
    @contextmanager
    def get_db_connection(self):
        """Database connection context manager"""
        conn = sqlite3.connect(str(self.db_path))
        conn.row_factory = sqlite3.Row
        try:
            yield conn
        finally:
            conn.close()
    
    def analyze_visual_trends_by_genre(self) -> Dict[str, Dict]:
        """Analyze visual characteristics by music genre"""
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    r.genre,
                    AVG(vf.brightness) as avg_brightness,
                    AVG(vf.saturation) as avg_saturation,
                    AVG(vf.complexity_score) as avg_complexity,
                    AVG(vf.contrast_score) as avg_contrast,
                    COUNT(*) as album_count
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                WHERE r.genre IS NOT NULL
                GROUP BY r.genre
                HAVING COUNT(*) >= 1
                ORDER BY album_count DESC
            """)
            
            genre_analysis = {}
            for row in cursor.fetchall():
                genre_analysis[row['genre']] = {
                    'avg_brightness': row['avg_brightness'],
                    'avg_saturation': row['avg_saturation'],
                    'avg_complexity': row['avg_complexity'],
                    'avg_contrast': row['avg_contrast'],
                    'album_count': row['album_count']
                }
            
            return genre_analysis
    
    def analyze_visual_trends_by_decade(self) -> Dict[str, Dict]:
        """Analyze how album cover styles changed over time"""
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    (r.year / 10) * 10 as decade,
                    AVG(vf.brightness) as avg_brightness,
                    AVG(vf.saturation) as avg_saturation,
                    AVG(vf.complexity_score) as avg_complexity,
                    AVG(vf.contrast_score) as avg_contrast,
                    COUNT(*) as album_count
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                WHERE r.year IS NOT NULL AND r.year >= 1960 AND r.year <= 2020
                GROUP BY decade
                ORDER BY decade
            """)
            
            decade_analysis = {}
            for row in cursor.fetchall():
                decade_analysis[f"{int(row['decade'])}s"] = {
                    'avg_brightness': row['avg_brightness'],
                    'avg_saturation': row['avg_saturation'],
                    'avg_complexity': row['avg_complexity'],
                    'avg_contrast': row['avg_contrast'],
                    'album_count': row['album_count']
                }
            
            return decade_analysis
    
    def create_visual_clusters(self, n_clusters: int = 5) -> Dict[int, List[Dict]]:
        """Create visual clusters using K-means"""
        # Load all visual features
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    ac.cover_id, r.release_id, r.title, r.artist, r.album, r.genre, r.year,
                    vf.color_histogram, vf.texture_features, vf.edge_features,
                    vf.brightness, vf.saturation, vf.complexity_score, vf.contrast_score,
                    ac.local_path
                FROM album_covers ac
                JOIN releases r ON ac.release_id = r.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
            """)
            
            data = [dict(row) for row in cursor.fetchall()]
        
        if len(data) < n_clusters:
            logger.warning(f"Not enough data for {n_clusters} clusters. Found {len(data)} albums.")
            n_clusters = max(2, len(data) // 2)
        
        # Prepare feature vectors
        feature_vectors = []
        for item in data:
            try:
                color_hist = np.array(json.loads(item['color_histogram']))
                texture_feat = np.array(json.loads(item['texture_features']))
                edge_feat = np.array(json.loads(item['edge_features']))
                
                # Combine features
                combined = np.concatenate([
                    color_hist,
                    texture_feat,
                    edge_feat,
                    [item['brightness'] / 255, 
                     item['saturation'] / 255,
                     item['complexity_score'],
                     item['contrast_score'] / 100]
                ])
                feature_vectors.append(combined)
            except (json.JSONDecodeError, ValueError) as e:
                logger.warning(f"Failed to parse features for {item['title']}: {e}")
                # Use default features
                feature_vectors.append(np.zeros(55))  # Approximate feature vector size
        
        # Perform K-means clustering
        feature_matrix = np.array(feature_vectors)
        kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
        cluster_labels = kmeans.fit_predict(feature_matrix)
        
        # Organize results by cluster
        clusters = {}
        for i, label in enumerate(cluster_labels):
            if label not in clusters:
                clusters[label] = []
            
            album_data = data[i].copy()
            album_data['cluster_label'] = int(label)
            album_data['distance_to_center'] = float(
                np.linalg.norm(feature_vectors[i] - kmeans.cluster_centers_[label])
            )
            clusters[label].append(album_data)
        
        # Store clustering results in database
        self.store_clustering_results(clusters, n_clusters)
        
        return clusters
    
    def store_clustering_results(self, clusters: Dict[int, List[Dict]], n_clusters: int):
        """Store clustering results in database"""
        with self.get_db_connection() as conn:
            # Create cluster record
            cursor = conn.execute("""
                INSERT INTO visual_clusters (cluster_method, n_clusters, cluster_params)
                VALUES (?, ?, ?)
            """, ('kmeans', n_clusters, json.dumps({'n_clusters': n_clusters, 'random_state': 42})))
            
            cluster_id = cursor.lastrowid
            
            # Store cluster assignments
            for cluster_label, albums in clusters.items():
                for album in albums:
                    conn.execute("""
                        INSERT OR REPLACE INTO cluster_assignments 
                        (cover_id, cluster_id, cluster_label, distance_to_center)
                        VALUES (?, ?, ?, ?)
                    """, (
                        album['cover_id'],
                        cluster_id,
                        cluster_label,
                        album['distance_to_center']
                    ))
            
            conn.commit()
            logger.info(f"Stored clustering results: {n_clusters} clusters, cluster_id {cluster_id}")
    
    def get_cluster_characteristics(self, clusters: Dict[int, List[Dict]]) -> Dict[int, Dict]:
        """Analyze the characteristics of each cluster"""
        cluster_stats = {}
        
        for cluster_id, albums in clusters.items():
            if not albums:
                continue
                
            # Calculate statistics for this cluster
            brightness_values = [a['brightness'] for a in albums if a['brightness'] is not None]
            saturation_values = [a['saturation'] for a in albums if a['saturation'] is not None]
            complexity_values = [a['complexity_score'] for a in albums if a['complexity_score'] is not None]
            
            genres = [a['genre'] for a in albums if a['genre']]
            years = [a['year'] for a in albums if a['year'] and a['year'] > 0]
            
            # Most common genre
            genre_counts = {}
            for genre in genres:
                genre_counts[genre] = genre_counts.get(genre, 0) + 1
            dominant_genre = max(genre_counts.items(), key=lambda x: x[1])[0] if genre_counts else "Mixed"
            
            cluster_stats[cluster_id] = {
                'album_count': len(albums),
                'dominant_genre': dominant_genre,
                'avg_brightness': np.mean(brightness_values) if brightness_values else 0,
                'avg_saturation': np.mean(saturation_values) if saturation_values else 0,
                'avg_complexity': np.mean(complexity_values) if complexity_values else 0,
                'avg_year': np.mean(years) if years else 0,
                'year_range': (min(years), max(years)) if years else (0, 0),
                'sample_albums': albums[:3]  # Show first 3 albums as examples
            }
        
        return cluster_stats
    
    def generate_visual_report(self) -> Dict[str, Any]:
        """Generate comprehensive visual analysis report"""
        report = {
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'genre_analysis': self.analyze_visual_trends_by_genre(),
            'decade_analysis': self.analyze_visual_trends_by_decade(),
            'clusters': self.create_visual_clusters(n_clusters=4)
        }
        
        # Add summary statistics
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    COUNT(*) as total_covers,
                    AVG(brightness) as avg_brightness,
                    AVG(saturation) as avg_saturation,
                    AVG(complexity_score) as avg_complexity,
                    MIN(brightness) as min_brightness,
                    MAX(brightness) as max_brightness,
                    MIN(saturation) as min_saturation,
                    MAX(saturation) as max_saturation
                FROM visual_features
            """)
            
            stats = dict(cursor.fetchone())
            report['summary_statistics'] = stats
        
        # Add cluster characteristics
        report['cluster_characteristics'] = self.get_cluster_characteristics(report['clusters'])
        
        return report

# Initialize paths
PROJECT_ROOT = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
DATA_DIR = PROJECT_ROOT / 'data'
DB_PATH = DATA_DIR / 'database' / 'vinyl_catalog.db'

# Initialize analytics
analytics = VisualAnalytics(DB_PATH)

print("Generating visual analytics report...")
visual_report = analytics.generate_visual_report()

print(f"\n{'='*60}")
print(f"VISUAL ANALYSIS REPORT")
print(f"Generated: {visual_report['timestamp']}")
print(f"{'='*60}")

# Display summary statistics
stats = visual_report['summary_statistics']
print(f"\nSUMMARY STATISTICS:")
print(f"  Total album covers analyzed: {stats['total_covers']}")
print(f"  Average brightness: {stats['avg_brightness']:.1f} (range: {stats['min_brightness']:.1f}-{stats['max_brightness']:.1f})")
print(f"  Average saturation: {stats['avg_saturation']:.1f} (range: {stats['min_saturation']:.1f}-{stats['max_saturation']:.1f})")
print(f"  Average complexity: {stats['avg_complexity']:.3f}")

# Display genre trends
print(f"\nVISUAL TRENDS BY GENRE:")
genre_analysis = visual_report['genre_analysis']
for genre, data in list(genre_analysis.items())[:10]:  # Show top 10 genres
    print(f"  {genre:15} ({data['album_count']:2d} albums): "
          f"brightness={data['avg_brightness']:5.1f}, "
          f"saturation={data['avg_saturation']:5.1f}, "
          f"complexity={data['avg_complexity']:.3f}")

# Display decade trends
print(f"\nVISUAL TRENDS BY DECADE:")
decade_analysis = visual_report['decade_analysis']
for decade, data in decade_analysis.items():
    print(f"  {decade:6} ({data['album_count']:2d} albums): "
          f"brightness={data['avg_brightness']:5.1f}, "
          f"saturation={data['avg_saturation']:5.1f}, "
          f"complexity={data['avg_complexity']:.3f}")

# Display cluster information
clusters = visual_report['clusters']
cluster_chars = visual_report['cluster_characteristics']
print(f"\nVISUAL CLUSTERS DISCOVERED:")
print(f"Found {len(clusters)} distinct visual styles:")

for cluster_id in sorted(cluster_chars.keys()):
    data = cluster_chars[cluster_id]
    print(f"\n  Cluster {cluster_id}: {data['album_count']} albums")
    print(f"    Dominant genre: {data['dominant_genre']}")
    print(f"    Visual profile: brightness={data['avg_brightness']:.1f}, "
          f"saturation={data['avg_saturation']:.1f}, "
          f"complexity={data['avg_complexity']:.3f}")
    
    if data['year_range'][0] > 0:
        print(f"    Time period: {int(data['year_range'][0])}-{int(data['year_range'][1])}")
    
    print(f"    Sample albums:")
    for album in data['sample_albums']:
        print(f"      • {album['artist']} - {album['title']} ({album['year']})")

print(f"\nKEY INSIGHTS:")

# Identify most distinctive genre characteristics
if genre_analysis:
    brightest_genre = max(genre_analysis.items(), key=lambda x: x[1]['avg_brightness'])
    darkest_genre = min(genre_analysis.items(), key=lambda x: x[1]['avg_brightness'])
    most_complex_genre = max(genre_analysis.items(), key=lambda x: x[1]['avg_complexity'])
    
    print(f"  • Brightest covers: {brightest_genre[0]} (avg brightness: {brightest_genre[1]['avg_brightness']:.1f})")
    print(f"  • Darkest covers: {darkest_genre[0]} (avg brightness: {darkest_genre[1]['avg_brightness']:.1f})")
    print(f"  • Most complex covers: {most_complex_genre[0]} (avg complexity: {most_complex_genre[1]['avg_complexity']:.3f})")

# Identify temporal trends
if len(decade_analysis) > 1:
    decades = sorted(decade_analysis.items())
    if len(decades) >= 2:
        earliest = decades[0]
        latest = decades[-1]
        brightness_change = latest[1]['avg_brightness'] - earliest[1]['avg_brightness']
        complexity_change = latest[1]['avg_complexity'] - earliest[1]['avg_complexity']
        
        print(f"  • Brightness trend: {'increased' if brightness_change > 0 else 'decreased'} by {abs(brightness_change):.1f} from {earliest[0]} to {latest[0]}")
        print(f"  • Complexity trend: {'increased' if complexity_change > 0 else 'decreased'} by {abs(complexity_change):.3f} from {earliest[0]} to {latest[0]}")

print(f"\nVISUAL ANALYTICS COMPLETE!")
print(f"  • Genre analysis: {len(genre_analysis)} genres analyzed")
print(f"  • Temporal analysis: {len(decade_analysis)} decades covered")
print(f"  • Clustering: {len(clusters)} visual styles identified")
print(f"  • Ready for advanced visual discovery features!")

2025-09-03 21:37:31,007 - INFO - Stored clustering results: 4 clusters, cluster_id 1


Generating visual analytics report...

VISUAL ANALYSIS REPORT
Generated: 2025-09-03 21:37:30

SUMMARY STATISTICS:
  Total album covers analyzed: 13
  Average brightness: 69.8 (range: 41.8-95.8)
  Average saturation: 165.9 (range: 0.0-255.0)
  Average complexity: 0.225

VISUAL TRENDS BY GENRE:
  Classical       ( 3 albums): brightness= 56.9, saturation=255.0, complexity=0.147
  Unknown         ( 3 albums): brightness= 55.4, saturation=  0.0, complexity=0.234
  Electronic      ( 2 albums): brightness= 95.8, saturation=175.8, complexity=0.392
  Pop             ( 2 albums): brightness= 94.8, saturation=237.0, complexity=0.312
  Folk            ( 1 albums): brightness= 41.8, saturation=196.3, complexity=0.101
  Jazz            ( 1 albums): brightness= 70.2, saturation=170.2, complexity=0.121
  Rock            ( 1 albums): brightness= 76.8, saturation=199.3, complexity=0.147

VISUAL TRENDS BY DECADE:
  2000s  ( 4 albums): brightness= 69.6, saturation=150.3, complexity=0.228
  2010s  ( 4 albu

In [None]:
# Cell 6 - Demo Interface and Usage Examples

import sqlite3
from pathlib import Path
from contextlib import contextmanager
import random
import json

class VisualDiscoveryDemo:
    """Demo interface showing visual discovery capabilities"""
    
    def __init__(self, discovery_interface):
        self.discovery = discovery_interface
    
    def demo_similar_albums(self):
        """Demonstrate finding similar albums"""
        print("\n🔍 VISUAL SIMILARITY DEMO")
        print("=" * 40)
        
        # Get a random album to start with
        with self.discovery.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT r.release_id, r.title, r.artist, r.genre, r.year,
                       vf.brightness, vf.saturation, vf.complexity_score
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                ORDER BY RANDOM()
                LIMIT 1
            """)
            
            result = cursor.fetchone()
            if result:
                seed_album = dict(result)
            else:
                print("No albums available for demo")
                return
        
        print(f"Finding albums visually similar to:")
        print(f"  '{seed_album['title']}' by {seed_album['artist']} ({seed_album['year']})")
        print(f"  Genre: {seed_album['genre']}")
        print(f"  Visual profile: brightness={seed_album['brightness']:.1f}, "
              f"saturation={seed_album['saturation']:.1f}, "
              f"complexity={seed_album['complexity_score']:.3f}")
        
        # Find similar albums
        similar = self.discovery.find_similar_albums(seed_album['release_id'], limit=5)
        
        if similar:
            print(f"\nVisually similar albums:")
            for i, album in enumerate(similar, 1):
                print(f"  {i}. {album['artist']} - {album['title']} ({album['year']})")
                print(f"     Similarity: {album['similarity_score']:.3f} | "
                      f"Brightness: {album['brightness']:.1f} | "
                      f"Genre: {album['genre']}")
        else:
            print("  No similar albums found")
    
    def demo_visual_search(self):
        """Demonstrate visual characteristic search"""
        print(f"\n🎨 VISUAL CHARACTERISTIC SEARCH DEMO")
        print("=" * 50)
        
        # Search by visual keywords
        search_terms = ['dark', 'complex']
        print(f"Searching for albums that are: {', '.join(search_terms)}")
        
        results = self.discovery.search_by_visual_keywords(search_terms, limit=5)
        
        if results:
            print(f"\nFound {len(results)} matching albums:")
            for album in results:
                print(f"  • {album['artist']} - {album['title']} ({album['genre']})")
                print(f"    Visual metrics: brightness={album['brightness']:.1f}, "
                      f"saturation={album['saturation']:.1f}, "
                      f"complexity={album['complexity_score']:.3f}")
        else:
            print("  No albums found matching these criteria")
        
        # Try another search
        print(f"\nSearching for 'bright' and 'colorful' albums:")
        bright_results = self.discovery.search_by_visual_keywords(['bright', 'colorful'], limit=3)
        
        if bright_results:
            print(f"Found {len(bright_results)} bright & colorful albums:")
            for album in bright_results:
                print(f"  • {album['artist']} - {album['title']} ({album['genre']})")
                print(f"    Visual metrics: brightness={album['brightness']:.1f}, "
                      f"saturation={album['saturation']:.1f}")
    
    def demo_color_clustering(self):
        """Demonstrate color-based grouping"""
        print(f"\n🌈 COLOR CLUSTERING DEMO")
        print("=" * 35)
        
        color_groups = self.discovery.get_color_clusters()
        
        for color_theme, albums in color_groups.items():
            if albums:
                print(f"\n{color_theme} Albums ({len(albums)}):")
                for album in albums[:3]:  # Show first 3 in each category
                    dominant_colors = json.loads(album['dominant_colors']) if album['dominant_colors'] else []
                    primary_color = dominant_colors[0] if dominant_colors else [0, 0, 0]
                    print(f"  • {album['artist']} - {album['title']}")
                    print(f"    Primary color: RGB{tuple(primary_color)} | "
                          f"Brightness: {album['brightness']:.1f}")
    
    def demo_advanced_filtering(self):
        """Demonstrate advanced visual filtering"""
        print(f"\n⚙️  ADVANCED VISUAL FILTERING DEMO")
        print("=" * 45)
        
        # Filter by brightness ranges
        print("Albums with medium brightness (60-100):")
        medium_bright = self.discovery.get_albums_by_visual_style(
            brightness_range=(60, 100), 
            limit=4
        )
        for album in medium_bright:
            print(f"  • {album['artist']} - {album['title']} "
                  f"(brightness: {album['brightness']:.1f})")
        
        # Filter by complexity
        print(f"\nHighly complex album covers (complexity > 0.3):")
        complex_albums = self.discovery.get_albums_by_visual_style(
            complexity_range=(0.3, 2.0),
            limit=4
        )
        for album in complex_albums:
            print(f"  • {album['artist']} - {album['title']} "
                  f"(complexity: {album['complexity_score']:.3f})")
        
        # Filter by genre and visual characteristics
        print(f"\nElectronic albums with high saturation:")
        electronic_colorful = self.discovery.get_albums_by_visual_style(
            genre="Electronic",
            saturation_range=(150, 255),
            limit=3
        )
        for album in electronic_colorful:
            print(f"  • {album['artist']} - {album['title']} "
                  f"(saturation: {album['saturation']:.1f})")
    
    def demo_statistical_insights(self):
        """Show statistical insights about the collection"""
        print(f"\n📊 COLLECTION INSIGHTS")
        print("=" * 30)
        
        with self.discovery.get_db_connection() as conn:
            # Genre distribution
            cursor = conn.execute("""
                SELECT r.genre, COUNT(*) as count,
                       AVG(vf.brightness) as avg_brightness,
                       AVG(vf.saturation) as avg_saturation,
                       AVG(vf.complexity_score) as avg_complexity
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                WHERE r.genre IS NOT NULL
                GROUP BY r.genre
                ORDER BY count DESC
            """)
            
            genres = [dict(row) for row in cursor.fetchall()]
            
            print("Visual characteristics by genre:")
            for genre in genres:
                print(f"  {genre['genre']} ({genre['count']} albums):")
                print(f"    Avg brightness: {genre['avg_brightness']:.1f}")
                print(f"    Avg saturation: {genre['avg_saturation']:.1f}")
                print(f"    Avg complexity: {genre['avg_complexity']:.3f}")
            
            # Extreme examples
            cursor = conn.execute("""
                SELECT r.artist, r.title, r.genre, vf.brightness
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                ORDER BY vf.brightness DESC
                LIMIT 1
            """)
            brightest = dict(cursor.fetchone())
            
            cursor = conn.execute("""
                SELECT r.artist, r.title, r.genre, vf.brightness
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                ORDER BY vf.brightness ASC
                LIMIT 1
            """)
            darkest = dict(cursor.fetchone())
            
            cursor = conn.execute("""
                SELECT r.artist, r.title, r.genre, vf.complexity_score
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                ORDER BY vf.complexity_score DESC
                LIMIT 1
            """)
            most_complex = dict(cursor.fetchone())
            
            print(f"\nExtreme Examples:")
            print(f"  Brightest cover: {brightest['artist']} - {brightest['title']} "
                  f"(brightness: {brightest['brightness']:.1f})")
            print(f"  Darkest cover: {darkest['artist']} - {darkest['title']} "
                  f"(brightness: {darkest['brightness']:.1f})")
            print(f"  Most complex cover: {most_complex['artist']} - {most_complex['title']} "
                  f"(complexity: {most_complex['complexity_score']:.3f})")

# Initialize demo with the discovery interface from Cell 4
# (Make sure you've run Cell 4 first)
PROJECT_ROOT = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
DATA_DIR = PROJECT_ROOT / 'data'
DB_PATH = DATA_DIR / 'database' / 'vinyl_catalog.db'

# Recreate discovery interface for demo
class VisualDiscoveryInterface:
    def __init__(self, db_path):
        self.db_path = db_path
    
    @contextmanager
    def get_db_connection(self):
        conn = sqlite3.connect(str(self.db_path))
        conn.row_factory = sqlite3.Row
        try:
            yield conn
        finally:
            conn.close()
    
    def find_similar_albums(self, release_id, limit=10):
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT 
                    r2.release_id, r2.title, r2.artist, r2.album, r2.year, r2.genre,
                    vs.similarity_score,
                    vf2.brightness, vf2.saturation, vf2.complexity_score
                FROM visual_similarities vs
                JOIN album_covers ac1 ON vs.cover_id_1 = ac1.cover_id
                JOIN album_covers ac2 ON vs.cover_id_2 = ac2.cover_id
                JOIN releases r2 ON ac2.release_id = r2.release_id
                LEFT JOIN visual_features vf2 ON ac2.cover_id = vf2.cover_id
                WHERE ac1.release_id = ?
                ORDER BY vs.similarity_score DESC
                LIMIT ?
            """, (release_id, limit))
            return [dict(row) for row in cursor.fetchall()]
    
    def search_by_visual_keywords(self, keywords, limit=20):
        keyword_mapping = {
            'dark': {'brightness_range': (0, 100)},
            'bright': {'brightness_range': (150, 255)},
            'colorful': {'saturation_range': (100, 255)},
            'complex': {'complexity_range': (0.3, 2.0)}
        }
        
        criteria = {}
        for keyword in keywords:
            if keyword.lower() in keyword_mapping:
                for key, value in keyword_mapping[keyword.lower()].items():
                    criteria[key] = value
        
        return self.get_albums_by_visual_style(limit=limit, **criteria)
    
    def get_albums_by_visual_style(self, brightness_range=None, saturation_range=None, 
                                 complexity_range=None, genre=None, limit=20):
        where_clauses = []
        params = []
        
        if brightness_range:
            where_clauses.append("vf.brightness BETWEEN ? AND ?")
            params.extend(brightness_range)
        if saturation_range:
            where_clauses.append("vf.saturation BETWEEN ? AND ?")
            params.extend(saturation_range)
        if complexity_range:
            where_clauses.append("vf.complexity_score BETWEEN ? AND ?")
            params.extend(complexity_range)
        if genre:
            where_clauses.append("r.genre LIKE ?")
            params.append(f"%{genre}%")
        
        where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
        
        query = f"""
            SELECT r.release_id, r.title, r.artist, r.album, r.year, r.genre,
                   vf.brightness, vf.saturation, vf.complexity_score, vf.dominant_colors
            FROM releases r
            JOIN album_covers ac ON r.release_id = ac.release_id
            JOIN visual_features vf ON ac.cover_id = vf.cover_id
            WHERE {where_clause}
            ORDER BY r.popularity_score DESC
            LIMIT ?
        """
        
        params.append(limit)
        
        with self.get_db_connection() as conn:
            cursor = conn.execute(query, params)
            return [dict(row) for row in cursor.fetchall()]
    
    def get_color_clusters(self):
        with self.get_db_connection() as conn:
            cursor = conn.execute("""
                SELECT r.release_id, r.title, r.artist, r.year, r.genre,
                       vf.dominant_colors, vf.brightness, vf.saturation
                FROM releases r
                JOIN album_covers ac ON r.release_id = ac.release_id
                JOIN visual_features vf ON ac.cover_id = vf.cover_id
                WHERE vf.dominant_colors IS NOT NULL
                ORDER BY r.popularity_score DESC
            """)
            
            albums = [dict(row) for row in cursor.fetchall()]
            
            color_groups = {
                'Red/Warm': [],
                'Blue/Cool': [],
                'Green/Natural': [],
                'Dark/Monochrome': [],
                'Bright/Colorful': []
            }
            
            for album in albums:
                try:
                    dominant_colors = json.loads(album['dominant_colors'])
                    primary_color = dominant_colors[0]
                    r, g, b = primary_color
                    brightness = album['brightness']
                    
                    if brightness < 80:
                        color_groups['Dark/Monochrome'].append(album)
                    elif r > g and r > b:
                        color_groups['Red/Warm'].append(album)
                    elif b > r and b > g:
                        color_groups['Blue/Cool'].append(album)
                    elif g > r and g > b:
                        color_groups['Green/Natural'].append(album)
                    else:
                        color_groups['Bright/Colorful'].append(album)
                except (json.JSONDecodeError, IndexError):
                    color_groups['Dark/Monochrome'].append(album)
            
            return color_groups

discovery = VisualDiscoveryInterface(DB_PATH)
demo = VisualDiscoveryDemo(discovery)

print(f"🎵 ALBUM COVER VISUAL DISCOVERY SYSTEM - COMPLETE DEMO")
print(f"=" * 70)
print(f"Computer vision system operational with {13} analyzed covers")

# Run all demonstrations
demo.demo_similar_albums()
demo.demo_visual_search()
demo.demo_color_clustering()
demo.demo_advanced_filtering()
demo.demo_statistical_insights()

print(f"\n🚀 VISUAL DISCOVERY FEATURES SUMMARY:")
print(f"  🔍 Similarity matching: Find albums that 'look like' a target")
print(f"  🎨 Visual keyword search: Search by descriptive terms")
print(f"  🌈 Color clustering: Browse by dominant color themes")
print(f"  ⚙️  Advanced filtering: Filter by brightness, saturation, complexity")
print(f"  📊 Analytics: Genre trends and statistical insights")

print(f"\n💡 POTENTIAL APPLICATIONS:")
print(f"  • Visual music discovery: 'Show me dark, complex albums'")
print(f"  • Mood-based browsing: Find albums that match a visual aesthetic")
print(f"  • Genre analysis: Compare visual styles across music genres")
print(f"  • Recommendation engine: Suggest based on visual preferences")
print(f"  • Collection insights: Understand your music's visual landscape")

print(f"\n🎯 NEXT STEPS FOR PRODUCTION:")
print(f"  • Download real album covers from Discogs API")
print(f"  • Implement CNN-based deep features for better accuracy")
print(f"  • Build interactive web interface for browsing")
print(f"  • Add user preference learning")
print(f"  • Create visual similarity API endpoints")

print(f"\n✅ VISUAL DISCOVERY SYSTEM COMPLETE!")
print(f"   Ready to revolutionize how people discover music through visual similarity!")