In [None]:
# If data not stored locally run this cell to fetch it
"""

import kagglehub
import os
import shutil

# Define paths
download_dir = os.path.join("data_full", "raw")
os.makedirs(download_dir, exist_ok=True)

# Download dataset
path = kagglehub.dataset_download("steubk/wikiart", path=download_dir)

def flatten_directory(directory):
    # Move all files from subdirectories to parent directory and delete subdirs
    for root, dirs, files in os.walk(directory, topdown=False):
        # Skip the parent directory itself
        if root == directory:
            continue
            
        for file in files:
            src_path = os.path.join(root, file)
            dest_path = os.path.join(directory, file)
            
            # Handle potential filename conflicts
            if os.path.exists(dest_path):
                base, extension = os.path.splitext(file)
                counter = 1
                # Generate unique filename by appending _n
                while os.path.exists(dest_path):
                    new_name = f"{base}_{counter}{extension}"
                    dest_path = os.path.join(directory, new_name)
                    counter += 1
                    
            shutil.move(src_path, dest_path)
        
        # Remove the emptied directory
        shutil.rmtree(root)

# Flatten the directory structure
flatten_directory(download_dir)
print(f"All files consolidated in: {download_dir}")

"""

In [None]:
import kagglehub
import os
import shutil
import random

def ensure_flat_directory(directory):
    """Check for subdirectories and flatten if needed"""
    # Check if any subdirectories exist
    has_subdirs = any(os.path.isdir(os.path.join(directory, item)) 
                    for item in os.listdir(directory))
    
    if has_subdirs:
        print("Subdirectories detected - flattening structure...")
        flatten_directory(directory)

def flatten_directory(directory):
    """Flatten directory structure by moving all files to root"""
    for root, dirs, files in os.walk(directory, topdown=False):
        # Skip the root directory itself
        if root == directory:
            continue
            
        for file in files:
            src = os.path.join(root, file)
            dst = os.path.join(directory, file)
            
            # Handle duplicates
            if os.path.exists(dst):
                base, ext = os.path.splitext(file)
                counter = 1
                while os.path.exists(dst):
                    new_name = f"{base}_{counter}{ext}"
                    dst = os.path.join(directory, new_name)
                    counter += 1
                    
            shutil.move(src, dst)
        
        # Remove emptied directory
        if os.path.exists(root):
            shutil.rmtree(root)

def create_sample_dataset(source_dir, dest_dir, n=500):
    """Create sample dataset with n random files"""
    # Get all files after flattening
    all_files = [f for f in os.listdir(source_dir) 
                 if os.path.isfile(os.path.join(source_dir, f))]
    
    # Adjust sample size
    sample_size = min(n, len(all_files))
    if sample_size < n:
        print(f"Warning: Only {len(all_files)} files available")
    
    # Create destination
    os.makedirs(dest_dir, exist_ok=True)
    
    # Copy random sample
    for file in random.sample(all_files, sample_size):
        src = os.path.join(source_dir, file)
        dst = os.path.join(dest_dir, file)
        shutil.copy2(src, dst)

# Main execution flow
if __name__ == "__main__":
    # Setup paths
    raw_dir = os.path.join("data_full", "raw")
    sample_dir = os.path.join("data_sample", "raw")
    
    # Ensure flat structure
    ensure_flat_directory(raw_dir)
    
    # Create sample dataset
    create_sample_dataset(raw_dir, sample_dir)
    print(f"Sample dataset created with files from {raw_dir} to {sample_dir}")

In [None]:
import os
from PIL import Image
import torchvision.transforms as transforms
from torchvision.utils import save_image

# Define paths
input_dir = "data_sample\raw"  # Folder containing raw artwork images
output_dir = "data_sample\preprocessed_images"  # Output folder for preprocessed images

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Define transformations
preprocess = transforms.Compose([
    transforms.Resize(256),          # Resize shorter side to 256
    transforms.CenterCrop(224),      # Crop center to 224x224
    transforms.ToTensor(),           # Convert to tensor [0, 255] → [0, 1]
    # Choose ONE normalization method below:
    
    # Method 1: Scale to [0, 1] (divide by 255)
    # (No additional step needed; ToTensor() already scales to [0, 1])
    
    # Method 2: ImageNet Normalization (mean/std)
    # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Process all images
for root, _, files in os.walk(input_dir):
    for file in files:
        if file.lower().endswith(('.png', '.jpg', '.jpeg')):
            img_path = os.path.join(root, file)
            try:
                # Load image
                img = Image.open(img_path).convert("RGB")  # Ensure RGB format
                
                # Apply preprocessing
                img_tensor = preprocess(img)
                
                # Save preprocessed image
                output_path = os.path.join(output_dir, file)
                save_image(img_tensor, output_path)
                
            except Exception as e:
                print(f"Error processing {img_path}: {e}")

In [None]:
# Load a preprocessed image to check values
test_img = Image.open(os.path.join(output_dir, "sample_image.jpg"))
test_tensor = transforms.ToTensor()(test_img)

print("Tensor shape:", test_tensor.shape)  # Should be (3, 224, 224)
print("Min/Max pixel value:", test_tensor.min(), test_tensor.max())  # Should be ~[0, 1] or [-2.1, 2.5] for ImageNet

In [None]:
# Convert images to embeddings.
import torch
from torchvision import models, transforms

# Load pre-trained ResNet-50 and remove the last layer
model = models.resnet50(pretrained=True)
model = torch.nn.Sequential(*(list(model.children())[:-1]))
model.eval()

# Example transformation pipeline
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Extract embeddings for an image
image = transform(Image.open("path/to/image.jpg")).unsqueeze(0)
embedding = model(image).squeeze().detach().numpy()

In [None]:
# EDA
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# Reduce embeddings to 2D for visualization
pca = PCA(n_components=2)
embeddings_2d = pca.fit_transform(embeddings)

plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1], alpha=0.5)
plt.title("PCA of Artwork Embeddings")
plt.xlabel("PC1"); plt.ylabel("PC2")
plt.show()

In [None]:
# Dimensionality Reduction
import umap
reducer = umap.UMAP(n_components=2, random_state=42)
embeddings_umap = reducer.fit_transform(embeddings)