In [None]:
import polars as pl
import torch
from torch import nn, optim
from torch.utils.data import DataLoader

# Specify features and target
FEATURES = [
    "acousticness",
    "energy",
    "key",
    "loudness",
    "speechiness",
    "time_signature",
    "danceability",
    "instrumentalness",
    "liveness",
    "mode",
    "tempo",
    "valence",
]
TARGET = "playlist_name"

In [None]:
# Load data
features = pl.read_parquet("features.parquet")
playlists = pl.read_parquet("playlists.parquet")

# Join
df = playlists.join(features, left_on=["track_id", "track_name"], right_on=["id", "track"], how="left")

# Label target
df = df.with_columns(target=pl.col(TARGET).rank("dense") - 1)

In [None]:
# Get info
num_features = len(FEATURES)
num_classes = df["target"].unique().shape[0]

# Get device
# TODO: enable CUDA
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

# Define model
model = nn.Sequential(
    nn.Linear(num_features, 16),
    nn.ReLU(),
    nn.Linear(16, 32),
    nn.ReLU(),
    # nn.Linear(32, 32),
    # nn.ReLU(),
    nn.Linear(32, 16),
    nn.ReLU(),
    nn.Linear(16, num_classes),
).to(device)

# Define criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

model

In [None]:
# Get dataset
train_dataset = df[[*FEATURES, "target"]].to_torch("dataset", label="target", dtype=pl.Float32)

In [None]:
# Initialize dataloader
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Train model
num_epochs = 128
for epoch in range(num_epochs):
    for features, targets in train_dataloader:
        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward + backward + optimize
        outputs = model(features)
        loss = criterion(outputs, targets.long())
        loss.backward()
        optimizer.step()

    # Print statistics
    if epoch % 16 == 0:
        print(f"{epoch} loss: {loss.item():.3f}")