In [None]:
import requests
import typing as t
import pandas as pd
import networkx as nx

from time import sleep
from pyvis.network import Network
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict

In [None]:
bearer = (
    input("Input bearer token: ").split("Bearer ")[-1].strip()
)  # See README.md for instructions

In [None]:
STARTING_PROFILE_ID = ""  # The account you want to start from (usually your own)
MAX_DEPTH = 3

In [None]:
@dataclass
class UserNode:
    uid: str
    pfp: str
    name: str

    def as_dict(self):
        return asdict(self)


class SpotifyClient:
    def __init__(self, bearer: str) -> None:
        self.bearer = bearer
        self._get_client_token()

    def _get_client_token(self):
        r = requests.post(
            "https://clienttoken.spotify.com/v1/clienttoken",
            headers={
                "Accept": "application/json",
                "content-type": "application/json",
            },
            json={
                "client_data": {
                    "client_version": "1.2.5.522.g838c1197",
                    "client_id": "d8a5ed958d274c2e8ee717e6a4b0971d",
                    "js_sdk_data": {
                        "device_brand": "unknown",
                        "device_model": "desktop",
                        "os": "Windows",
                        "os_version": "NT 10.0",
                    },
                }
            },
        ).json()
        self.client_token = r["granted_token"]["token"]
        self.exp_time = datetime.now() + timedelta(
            seconds=r["granted_token"]["refresh_after_seconds"] / 10_000
        )
        self.headers = {
            "authorization": f"Bearer {self.bearer}",
            "client-token": self.client_token,
        }

    def _req(self, *args, **kwargs):
        if datetime.now() > self.exp_time:
            self._get_client_token()
        return requests.get(*args, **kwargs, headers=self.headers)

    def _retry_json(self, *args, **kwargs):
        ret = None
        while ret is None:
            try:
                ret = self._req(*args, **kwargs).json()
            except Exception as e:
                sleep(8)
        return ret

    def get_following(self, user_id: str) -> list[UserNode | None]:
        r = self._retry_json(
            f"https://spclient.wg.spotify.com/user-profile-view/v3/profile/{user_id}/following?market=from_token"
        )
        pfs = r.get("profiles")
        if not pfs:
            return [None]
        return [
            UserNode(
                uid=pf["uri"].split(":")[2], pfp=pf.get("image_url"), name=pf["name"]
            )
            for pf in pfs
            if pf["uri"].split(":")[1] == "user"
        ]

    def get_profile_info(self, user_id: str) -> UserNode:
        r = self._retry_json(
            f"https://spclient.wg.spotify.com/user-profile-view/v3/profile/{user_id}?playlist_limit=10&artist_limit=10&episode_limit=10&market=from_token"
        )
        return UserNode(name=r["name"], pfp=r.get("image_url", ""), uid=user_id)

In [None]:
spotify = SpotifyClient(bearer)

ids_done = {}
depth = 1
profiles_to_scrape = [
    {"user": spotify.get_profile_info(STARTING_PROFILE_ID), "depth": depth}
]
while profiles_to_scrape and depth <= MAX_DEPTH:
    np = []
    for index, profile in enumerate(profiles_to_scrape):
        if ids_done.get(f'{profile["user"].name} - {profile["user"].uid}'):
            continue
        ids_done[f'{profile["user"].name} - {profile["user"].uid}'] = []
        if (index + 1) % 10 == 0:
            print(f"{index + 1}/{len(profiles_to_scrape)}")
        depth = profile.get("depth", 1)
        following = spotify.get_following(profile["user"].uid)
        for user in following:
            if not user:
                continue
            np.append({"user": user, "depth": depth + 1})
            ids_done[f'{profile["user"].name} - {profile["user"].uid}'].append(
                f"{user.name} - {user.uid}"
            )
    profiles_to_scrape = np
    depth += 1
    print("Depth", depth)

In [None]:
ds = []
for uid in ids_done:
    for mapped_id in ids_done[uid]:
        ds.append({"source": uid, "target": mapped_id, "Type": "Undirected"})
df = pd.DataFrame(ds)

In [None]:
df.to_csv(f"{STARTING_PROFILE_ID}.csv")

In [None]:
G = nx.from_pandas_edgelist(df, source="source", target="target")
net = Network(notebook=True)
net.from_nx(G)
net.show(f"{STARTING_PROFILE_ID}.html")