# SpotiSplit MVP: Розкид треків з плейліста на N плейлістів за спорідненістю

Цей ноутбук:
1) тягне треки з вихідного плейліста Spotify,
2) забирає їх **audio features**,
3) кластеризує у **N** груп (KMeans),
4) створює **N** нових плейлістів та розкладає треки по них.

> ⚠️ Потрібен Spotify Developer App і права `playlist-read-private` та `playlist-modify-private` (або `playlist-modify-public` якщо хочете публічні).

In [None]:
# ⬇️ Інсталяції (Colab: запускати цей блок один раз)
!pip -q install spotipy==2.23.0 scikit-learn==1.5.1 pandas==2.2.2 numpy==1.26.4 matplotlib==3.9.0

In [None]:
# ⚙️ Налаштування
# 1) Створіть застосунок на https://developer.spotify.com/dashboard/
# 2) Додайте Redirect URI, напр.: http://localhost:8080/callback
# 3) Заповніть змінні нижче та запустіть блок авторизації

CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
REDIRECT_URI = "http://localhost:8080/callback"
USERNAME = "your_spotify_username"  # можна лишити порожнім, якщо oauth сам підставить
SOURCE_PLAYLIST_URL = "https://open.spotify.com/playlist/37i9dQZF1DXcBWIGoYBM5M"
N_CLUSTERS = 5  # скільки плейлістів створювати
MAKE_PUBLIC = False  # True -> публічні плейлісти

# Додаткові опції:
PLAYLIST_NAME_PREFIX = "SpotiSplit"
RANDOM_STATE = 42


In [None]:
# 🔐 Авторизація
import os
import re
import time
from typing import List, Dict, Any

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import spotipy
from spotipy.oauth2 import SpotifyOAuth

SCOPES = [
    "playlist-read-private",
    "playlist-read-collaborative",
    "playlist-modify-private",
]
if MAKE_PUBLIC:
    SCOPES.append("playlist-modify-public")

os.environ["SPOTIPY_CLIENT_ID"] = CLIENT_ID
os.environ["SPOTIPY_CLIENT_SECRET"] = CLIENT_SECRET
os.environ["SPOTIPY_REDIRECT_URI"] = REDIRECT_URI

auth_manager = SpotifyOAuth(scope=" ".join(SCOPES), show_dialog=True, cache_path=".cache-spotisplit")
sp = spotipy.Spotify(auth_manager=auth_manager)

me = sp.me()
USER_ID = me["id"]
print(f"✅ Увійшли як: {me['display_name']} ({USER_ID})")

In [None]:
# 🧰 Хелпери
def extract_playlist_id(url_or_id: str) -> str:
    m = re.search(r"playlist/([a-zA-Z0-9]+)", url_or_id)
    if m:
        return m.group(1)
    return url_or_id.strip()

def get_all_playlist_tracks(sp, playlist_id: str) -> List[Dict[str, Any]]:
    results = sp.playlist_items(playlist_id, additional_types=["track"], market=None)
    items = results.get("items", [])
    while results.get("next"):
        results = sp.next(results)
        items.extend(results.get("items", []))
    # Фільтруємо треки (без episodes/local)
    items = [it for it in items if it.get("track") and it["track"].get("id")]
    return items

def batched(iterable, n=100):
    batch = []
    for x in iterable:
        batch.append(x)
        if len(batch) == n:
            yield batch
            batch = []
    if batch:
        yield batch

def fetch_audio_features(sp, track_ids: List[str]) -> Dict[str, Dict[str, Any]]:
    feats = {}
    for chunk in batched(track_ids, 100):
        af = sp.audio_features(chunk)
        for t_id, f in zip(chunk, af):
            if f:
                feats[t_id] = f
    return feats

def track_row(item, features_map):
    t = item["track"]
    t_id = t["id"]
    f = features_map.get(t_id, {})
    return {
        "track_id": t_id,
        "track_name": t["name"],
        "artist": ", ".join([a["name"] for a in t["artists"]]),
        "album": t["album"]["name"] if t.get("album") else None,
        "added_at": item.get("added_at"),
        "duration_ms": t.get("duration_ms"),
        "popularity": t.get("popularity"),
        # audio features:
        "danceability": f.get("danceability"),
        "energy": f.get("energy"),
        "speechiness": f.get("speechiness"),
        "acousticness": f.get("acousticness"),
        "instrumentalness": f.get("instrumentalness"),
        "liveness": f.get("liveness"),
        "valence": f.get("valence"),
        "tempo": f.get("tempo"),
        "loudness": f.get("loudness"),
        "key": f.get("key"),
        "mode": f.get("mode"),
        "time_signature": f.get("time_signature"),
        "uri": t.get("uri"),
        "external_url": t.get("external_urls", {}).get("spotify"),
    }

def create_playlist(sp, user_id: str, name: str, description: str = "", public: bool = False) -> str:
    pl = sp.user_playlist_create(user=user_id, name=name, public=public, description=description)
    return pl["id"]

def add_tracks_to_playlist(sp, playlist_id: str, uris: List[str]):
    for chunk in batched(uris, 100):
        sp.playlist_add_items(playlist_id, chunk)

In [None]:
# 1) Завантажуємо треки та audio features
playlist_id = extract_playlist_id(SOURCE_PLAYLIST_URL)
src_playlist = sp.playlist(playlist_id, fields="name,owner,tracks.total,external_urls")
print(f"🎧 Джерело: '{src_playlist['name']}' | Треків: {src_playlist['tracks']['total']}")

items = get_all_playlist_tracks(sp, playlist_id)
track_ids = [it["track"]["id"] for it in items]
features_map = fetch_audio_features(sp, track_ids)

df = pd.DataFrame([track_row(it, features_map) for it in items])
print(f"✅ Отримано {len(df)} треків з features.")
df.head(3)

In [None]:
# 2) Підготовка даних і кластеризація (KMeans)
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

feature_cols = [
    "danceability","energy","speechiness","acousticness","instrumentalness",
    "liveness","valence","tempo","loudness"
]

# Приберемо рядки з відсутніми features
X = df[feature_cols].dropna().copy()
valid_idx = X.index
if len(X) < N_CLUSTERS:
    print(f"⚠️ Треків з валідними features менше, ніж N_CLUSTERS={N_CLUSTERS}. Зменшу N_CLUSTERS до {max(1, len(X))}.")
    N_CLUSTERS = max(1, len(X))

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

kmeans = KMeans(n_clusters=int(N_CLUSTERS), random_state=RANDOM_STATE, n_init=10)
labels = kmeans.fit_predict(X_scaled)

df["cluster"] = -1
df.loc[valid_idx, "cluster"] = labels

sil = None
if int(N_CLUSTERS) > 1 and len(np.unique(labels)) > 1:
    sil = silhouette_score(X_scaled, labels)
print(f"✅ Кластерів: {N_CLUSTERS} | Silhouette: {sil:.3f}" if sil is not None else f"✅ Кластерів: {N_CLUSTERS}")

In [None]:
# 3) Візуалізація (PCA 2D)
from sklearn.decomposition import PCA

if len(X_scaled) >= 2 and len(np.unique(labels)) >= 1:
    pca = PCA(n_components=2, random_state=RANDOM_STATE)
    pts = pca.fit_transform(X_scaled)

    plt.figure(figsize=(6, 5))
    plt.scatter(pts[:,0], pts[:,1])
    plt.title("PCA 2D проекція треків")
    plt.xlabel("PC1")
    plt.ylabel("PC2")
    plt.show()
else:
    print("Недостатньо даних для PCA-проекції.")

In [None]:
# 4) Створення плейлістів та розкладка треків
created = {}
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
base_name = f"{PLAYLIST_NAME_PREFIX}: {src_playlist['name']}"

for c in sorted(df["cluster"].unique()):
    if c == -1:
        continue
    name = f"{base_name} · Cluster {int(c)} / {int(N_CLUSTERS)}"
    desc = f"Створено SpotiSplit {timestamp}. Джерело: {src_playlist['external_urls']['spotify']}"
    pl_id = create_playlist(sp, USER_ID, name=name, description=desc, public=MAKE_PUBLIC)
    created[int(c)] = pl_id
    cluster_uris = df.loc[df["cluster"] == c, "uri"].dropna().tolist()
    add_tracks_to_playlist(sp, pl_id, cluster_uris)
    print(f"📦 {name}: додано {len(cluster_uris)} треків")

total_assigned = (df["cluster"] != -1).sum()
print(f"🎉 Готово. Розкладено {total_assigned}/{len(df)} треків у {len(created)} плейлістів.")

In [None]:
# 5) Експорт результатів (CSV)
out_csv = "spotisplit_clusters.csv"
df.to_csv(out_csv, index=False)
print(f"Збережено: {out_csv}")