## Process SoundCloud
This notebook simply processes the large amount of parquet partitions downloaded from GCS into the format needed for training. It uses polars to process the partitions, and additionally created the required adjacency list containing the user-track graph

In [1]:
import polars as pl
import numpy as np
import pickle
import os
import math
from collections import defaultdict
import tqdm
import torch
from torch_geometric.data import HeteroData
from torch import nn, Tensor

### Process Listener embeddings

In [3]:
total_rows = (
        pl.scan_parquet("SoundCloud/listener_embeddings/*.parquet")
        .select(pl.len())
        .collect(streaming=True)
        .item(0, 0)
    )
print(f"Total rows: {total_rows}")

Total rows: 113849050


In [4]:
%%time # takes about 1h 35 min with 256GB RAM & 32 vcpu instance (uses 90% of memory)
embeddings_df = (
    pl.scan_parquet("SoundCloud/listener_embeddings/*.parquet", n_rows=total_rows)
    .select(["user", "embedding"]).collect(streaming=True).iter_slices(3000000)
)
user_embeddings = np.empty((total_rows, 150), dtype=np.float32)
user2idx = {}
offset = 0
for chunk_num, chunk in enumerate(embeddings_df, start=1):
    n_chunk_rows = chunk.shape[0]
    embeddings_chunk = np.vstack(chunk["embedding"].to_list())
    user_embeddings[offset:offset + n_chunk_rows, :] = embeddings_chunk
    user2idx.update(dict(zip(chunk["user"].to_list(), np.arange(offset, offset + n_chunk_rows))))
    offset += n_chunk_rows
np.save("SoundCloud/user_embeddings.npy", user_embeddings)
with open("SoundCloud/user2idx.pkl", "wb") as f:
    pickle.dump(user2idx, f)
del user_embeddings, embeddings_df
print("Saved user embeddings and user2idx mapping")

Saved user embeddings and user2idx mapping
CPU times: user 1h 1min 16s, sys: 17min 50s, total: 1h 19min 6s
Wall time: 1h 57min 48s


### Process Track embeddings

In [6]:
total_rows = (
        pl.scan_parquet("SoundCloud/embeddings/*.parquet")
        .select(pl.len())
        .collect(streaming=True)
        .item(0, 0)
    )
print(f"Total rows: {total_rows}")

Total rows: 25841871


In [9]:
%%time
track_embeddings_df = (
    pl.scan_parquet("SoundCloud/embeddings/*.parquet", n_rows=total_rows)
    .select(["track_id", "embedding"]).with_row_index().collect(streaming=True).iter_slices(3000000)
)
track_embeddings = np.empty((total_rows, 150), dtype=np.float32)
track2idx = {}
idx2track = {}
offset = 0
for chunk_num, chunk in enumerate(track_embeddings_df, start=1):
    n_chunk_rows = chunk.shape[0]
    embeddings_chunk = np.vstack(chunk["embedding"].to_list())
    track_embeddings[offset:offset + n_chunk_rows, :] = embeddings_chunk
    idx = np.arange(offset, offset + n_chunk_rows)
    track2idx.update(dict(zip(chunk["track_id"].to_list(), idx)))
    idx2track.update(dict(zip(idx, chunk["track_id"].to_list())))
    offset += n_chunk_rows
np.save("SoundCloud/track_embeddings.npy", track_embeddings)
with open("SoundCloud/track2idx.pkl", "wb") as f:
    pickle.dump(track2idx, f)
with open("SoundCloud/idx2track.pkl", "wb") as f:
    pickle.dump(idx2track, f)
del track_embeddings, idx2track, track_embeddings_df
print("Saved track embeddings, track2idx, and idx2track mappings")

Saved track embeddings, track2idx, and idx2track mappings
CPU times: user 19min 30s, sys: 3min, total: 22min 31s
Wall time: 27min 37s


### Process follow interactions

In [None]:
pip install polars-u64-idx

In [2]:
total_rows = (
        pl.scan_parquet("SoundCloud/follows_table/*.parquet")
        .select(pl.len())
        .collect(streaming=True)
        .item(0, 0)
    )
print(f"Total rows: {total_rows}")

Total rows: 5448194410


In [3]:
# Read in user2idx
with open("SoundCloud/user2idx.pkl", "rb") as f:
    user2idx = pickle.load(f)

In [None]:
%%time
follows_df = (
    pl.scan_parquet("SoundCloud/follows_table/*.parquet", n_rows=total_rows)
    .select(["fan", "contact"]).collect(streaming=True).iter_slices(500000000)
)
print('loaded')
chunks_edge = []

# Process each chunk efficiently
for chunk in follows_df:
    # Map user IDs to indices (efficient and parallel)
    chunk = chunk.with_columns([
        pl.col("fan").replace_strict(user2idx, default=-1).alias("fan_idx"),
        pl.col("contact").replace_strict(user2idx, default=-1).alias("contact_idx")
    ])
    
    # Drop rows where any ID is unknown (-1)
    chunk = chunk.filter((pl.col("fan_idx") >= 0) & (pl.col("contact_idx") >= 0))

    # Convert to NumPy and store
    chunks_edge.append(chunk.select(["fan_idx", "contact_idx"]).to_numpy())
    print('chunk')
    
# Concatenate all valid edges into a final array
edge_index_uu = np.concatenate(chunks_edge, axis=0)
np.save("SoundCloud/edge_index_uu.npy", edge_index_uu)
print("Saved edge_index_uu")

### Process track interactions

In [2]:
total_rows = (
        pl.scan_parquet("SoundCloud/track_interactions_90_days/*.parquet")
        .select(pl.len())
        .collect(streaming=True)
        .item(0, 0)
    )
print(f"Total rows: {total_rows}")

Total rows: 3180251027


In [3]:
# Read in user2idx & track2idx
with open("SoundCloud/user2idx.pkl", "rb") as f:
    user2idx = pickle.load(f)
with open("SoundCloud/track2idx.pkl", "rb") as f:
    track2idx = pickle.load(f)

In [None]:
%%time
# -------- Get track interaction edge index --------
interactions_df = (
    pl.scan_parquet("SoundCloud/track_interactions_90_days/*.parquet", n_rows=total_rows)
      .select(["user", "track", "score"])
      .collect(streaming=True)
      .iter_slices(700000000)
)
print('loaded df')

chunks_edge = []
chunks_weight = []
for chunk in interactions_df:
    # Replace user and track values using the provided dictionaries:
    chunk = chunk.with_columns([
        pl.col("user").replace_strict(user2idx, default=-1).alias("user_idx"),
        pl.col("track").replace_strict(track2idx, default=-1).alias("track_idx")
    ]).filter((pl.col("user_idx") >= 0) & (pl.col("track_idx") >= 0))

    # Select the columns for edge index and convert to NumPy:
    chunk_edge = chunk.select(["user_idx", "track_idx"]).to_numpy()
    chunks_edge.append(chunk_edge)
    
    # Select the score column (as edge weights) and convert to NumPy:
    chunk_weight = chunk.select(["score"]).to_numpy()
    chunks_weight.append(chunk_weight)
    print('chunk')

# Concatenate all chunks into final arrays:
edge_index_ut = np.concatenate(chunks_edge, axis=0)
edge_weight_ut = np.concatenate(chunks_weight, axis=0)
np.save("SoundCloud/edge_index_ut_90.npy", edge_index_ut)
np.save("SoundCloud/edge_weight_ut_90.npy", edge_weight_ut)
del edge_index_ut, edge_weight_ut, interactions_df
print("Saved edge_index_ut, edge_weight_ut")

In [6]:
total_rows = (
        pl.scan_parquet("SoundCloud/track_interactions_180_days/*.parquet")
        .select(pl.len())
        .collect(streaming=True)
        .item(0, 0)
    )
print(f"Total rows: {total_rows}")

Total rows: 6054214850


In [7]:
%%time
# -------- Get track interaction edge index --------
interactions_df = (
    pl.scan_parquet("SoundCloud/track_interactions_180_days/*.parquet", n_rows=total_rows)
      .select(["user", "track", "score"])
      .collect(streaming=True)
      .iter_slices(900000000)
)
print('loaded df')

chunks_edge = []
chunks_weight = []
for chunk in interactions_df:
    # Replace user and track values using the provided dictionaries:
    chunk = chunk.with_columns([
        pl.col("user").replace_strict(user2idx, default=-1).alias("user_idx"),
        pl.col("track").replace_strict(track2idx, default=-1).alias("track_idx")
    ]).filter((pl.col("user_idx") >= 0) & (pl.col("track_idx") >= 0))
    # Select the columns for edge index and convert to NumPy:
    chunk_edge = chunk.select(["user_idx", "track_idx"]).to_numpy()
    chunks_edge.append(chunk_edge)
    
    # Select the score column (as edge weights) and convert to NumPy:
    chunk_weight = chunk.select(["score"]).to_numpy()
    chunks_weight.append(chunk_weight)

# Concatenate all chunks into final arrays:
edge_index_ut = np.concatenate(chunks_edge, axis=0)
edge_weight_ut = np.concatenate(chunks_weight, axis=0)
np.save("SoundCloud/edge_index_ut_180.npy", edge_index_ut)
np.save("SoundCloud/edge_weight_ut_180.npy", edge_weight_ut)
del edge_index_ut, edge_weight_ut, interactions_df
print("Saved edge_index_ut, edge_weight_ut")

loaded df
Saved edge_index_ut, edge_weight_ut
CPU times: user 1h 22min 25s, sys: 22min 3s, total: 1h 44min 28s
Wall time: 38min 57s


### Create val and test set

In [3]:
total_rows = (
        pl.scan_parquet("SoundCloud/offline_eval_interactions/*.parquet")
        .select(pl.len())
        .collect(streaming=True)
        .item(0, 0)
    )
print(f"Total rows: {total_rows}")

Total rows: 258232772


In [4]:
with open("SoundCloud/user2idx.pkl", "rb") as f:
    user2idx = pickle.load(f)
with open("SoundCloud/track2idx.pkl", "rb") as f:
    track2idx = pickle.load(f)

val_ratio = 0.2

evaluation_df = (
    pl.scan_parquet("SoundCloud/offline_eval_interactions/*.parquet", n_rows=total_rows).select(["user_id", "track_id"])
    .collect(streaming=True).iter_slices(100000000)
)

val_data = defaultdict(set)
test_data = defaultdict(set)
offset = 0
for chunk_num, chunk in enumerate(evaluation_df, start=1):
    n_chunk_rows = chunk.shape[0]
    chunk = chunk.with_columns([
        pl.col("user_id").map_elements(lambda x: user2idx.get(x, -1), return_dtype=pl.Int64).alias("user_idx"),
        pl.col("track_id").map_elements(lambda x: track2idx.get(x, -1), return_dtype=pl.Int64).alias("track_idx"),
        pl.Series("rnd", np.random.rand(n_chunk_rows))
    ])
    val_chunk = chunk.filter((pl.col("rnd") < val_ratio) & (pl.col("user_idx") >= 0) & (pl.col("track_idx") >= 0)).select(["user_idx", "track_idx"]).group_by("user_idx").agg(pl.col("track_idx"))
    test_chunk = chunk.filter((pl.col("rnd") >= val_ratio) & (pl.col("user_idx") >= 0) & (pl.col("track_idx") >= 0)).select(["user_idx", "track_idx"]).group_by("user_idx").agg(pl.col("track_idx"))
    del chunk
    val_data.update({u: val_data[u] | set(t) for u, t in zip(val_chunk['user_idx'].to_list(), val_chunk['track_idx'].to_list())})
    test_data.update({u: test_data[u] | set(t) for u, t in zip(test_chunk['user_idx'].to_list(), test_chunk['track_idx'].to_list())})
    offset += n_chunk_rows
    print(n_chunk_rows)

with open("SoundCloud/val_data.pkl", "wb") as f:
    pickle.dump(val_data, f)
with open("SoundCloud/test_data.pkl", "wb") as f:
    pickle.dump(test_data, f)

100000000
100000000
58232772


In [5]:
with open("SoundCloud/val_data.pkl", "rb") as f:
    val_data = pickle.load(f)
max(val_data.keys())

113849049

### Push to GCP

In [1]:
!gsutil -o "GSUtil:parallel_composite_upload_threshold=150M" -m cp SoundCloud/*.npy SoundCloud/*.pkl gs://sc-reco-stage-sachin/

Copying file://SoundCloud/edge_index_ut_180.npy [Content-Type=application/octet-stream]...
Copying file://SoundCloud/edge_index_ut_90.npy [Content-Type=application/octet-stream]...
Copying file://SoundCloud/edge_index_uu.npy [Content-Type=application/octet-stream]...
Copying file://SoundCloud/edge_weight_ut_180.npy [Content-Type=application/octet-stream]...
Copying file://SoundCloud/edge_weight_ut_90.npy [Content-Type=application/octet-stream]...
Copying file://SoundCloud/user_embeddings.npy [Content-Type=application/octet-stream]...
Copying file://SoundCloud/val_data.pkl [Content-Type=application/octet-stream]...
Copying file://SoundCloud/track_embeddings.npy [Content-Type=application/octet-stream]...
Copying file://SoundCloud/idx2track.pkl [Content-Type=application/octet-stream]...
Copying file://SoundCloud/test_data.pkl [Content-Type=application/octet-stream]...
Copying file://SoundCloud/track2idx.pkl [Content-Type=application/octet-stream]...
Copying file://SoundCloud/user2idx.pkl 

In [16]:
!gsutil ls gs://sc-reco-stage-sachin/

gs://sc-reco-stage-sachin/creator_interactions/
gs://sc-reco-stage-sachin/embeddings/
gs://sc-reco-stage-sachin/follows_table/
gs://sc-reco-stage-sachin/listener_embeddings/
gs://sc-reco-stage-sachin/offline_eval_interactions/
gs://sc-reco-stage-sachin/track_interactions/


### Create Adjacency list

In [2]:
%%time
# Get number of nodes
with open("SoundCloud/user2idx.pkl", "rb") as f:
    user2idx = pickle.load(f) # 2.6 Gb
with open("SoundCloud/track2idx.pkl", "rb") as f:
    track2idx = pickle.load(f) # 0.6 Gb
num_users = len(user2idx)
num_tracks = len(track2idx)
del user2idx, track2idx

CPU times: user 55.9 s, sys: 13.8 s, total: 1min 9s
Wall time: 1min 11s


In [5]:
%%time
# Full graph
# Requires ~175 Gb RAM
# Create graph
data = HeteroData()
data['user'].num_nodes = num_users
data['track'].num_nodes = num_tracks
# Add user-track interactions
edge_index_ut = np.load('SoundCloud/edge_index_ut_90.npy') # 36 Gb
edge_index_ut = edge_index_ut[edge_index_ut[:, 0] < data['user'].num_nodes]
data['user', 'listens', 'track'].edge_index = torch.tensor(edge_index_ut, dtype=torch.long)
# Include reverse edges too:
data['track', 'listened_by', 'user'].edge_index = torch.tensor(edge_index_ut[:, [1,0]], dtype=torch.long)
del edge_index_ut
# Add edge weights
edge_weight_ut = torch.tensor(np.load('SoundCloud/edge_weight_ut_90.npy'), dtype=torch.float32)
data['user', 'listens', 'track'].edge_weight = edge_weight_ut
data['track', 'listened_by', 'user'].edge_weight = edge_weight_ut
del edge_weight_ut
# Add user-user interactions
edge_index_uu = np.load('SoundCloud/edge_index_uu.npy') # 19 Gb
data['user', 'follows', 'user'].edge_index = torch.tensor(edge_index_uu, dtype=torch.long)
del edge_index_uu

CPU times: user 184 μs, sys: 47 μs, total: 231 μs
Wall time: 235 μs


In [3]:
import torch
import pickle
import numpy as np
import numba
from torch_geometric.data import HeteroData

@numba.njit(parallel=True)
def fill_col(src, dst, insertion_ptr, col):
    """
    Numba JIT function:
    For each edge i, we look up src[i], find insertion_ptr for that src node,
    and place dst[i] in col[pos], then increment insertion_ptr.
    """
    for i in numba.prange(len(src)):
        s = src[i]
        pos = insertion_ptr[s]
        col[pos] = dst[i]
        insertion_ptr[s] = pos + 1

def build_csr_two_pass_numba(src: np.ndarray, dst: np.ndarray, num_src: int):
    """
    Build (rowptr, col) for edges (src[i], dst[i]) in two passes:
      1) Count how many edges each source node has (np.bincount).
      2) Prefix sum -> rowptr, then fill col array via fill_col.

    Returns:
      rowptr: shape [num_src + 1]
      col: shape [len(src)]
    """
    # 1) row_count: how many edges each source node has
    row_count = np.bincount(src, minlength=num_src)

    # 2) rowptr = prefix sum
    rowptr = np.zeros(num_src + 1, dtype=np.int64)
    np.cumsum(row_count, out=rowptr[1:])

    # 3) Build col array
    col = np.zeros(len(src), dtype=np.int64)
    insertion_ptr = rowptr.copy()

    # 4) Fill
    fill_col(src, dst, insertion_ptr, col)

    return rowptr, col

def build_and_save_adjacency_numba(data: HeteroData, save_path: str):
    """
    For each edge_type in data.edge_index_dict,
    builds a CSR adjacency (rowptr, col, rowcount) using a two-pass approach
    optimized by Numba. Assumes edge_index is shape [E, 2] (i.e., each row is [src, dst]).

    Saves adjacency dicts to 'save_path' for quick future loading.
    """
    # Gather node counts for each node type
    num_nodes_dict = {}
    for node_type in data.node_types:
        num_nodes_dict[node_type] = data[node_type].num_nodes

    rowptr_dict = {}
    col_dict = {}
    rowcount_dict = {}

    # Iterate over each edge_type in data
    for keys, edge_index in data.edge_index_dict.items():
        src_type, rel_type, dst_type = keys
        src_size = num_nodes_dict[src_type]
        dst_size = num_nodes_dict[dst_type]

        # edge_index: shape (E, 2)
        edge_index_cpu = edge_index.cpu().numpy()  # shape [E, 2]
        # src = first column, dst = second column
        src = edge_index_cpu[:, 0]
        dst = edge_index_cpu[:, 1]

        # Build adjacency
        rowptr_np, col_np = build_csr_two_pass_numba(src, dst, num_src=src_size)
        rowcount_np = rowptr_np[1:] - rowptr_np[:-1]

        # Convert to torch tensors
        rowptr_t = torch.from_numpy(rowptr_np)
        col_t = torch.from_numpy(col_np)
        rowcount_t = torch.from_numpy(rowcount_np)

        # Store in dict
        rowptr_dict[keys] = rowptr_t
        col_dict[keys] = col_t
        rowcount_dict[keys] = rowcount_t

        print(f"Processed edge type {keys} with {len(src)} edges.")
    edge_weight_dict = {
            ('user','listens','track'): data['user','listens','track'].edge_weight,
            ('track','listened_by','user'): data['track','listened_by','user'].edge_weight,
            ('user','follows','user'): None,
    }

    # Build final output
    save_dict = {
        "rowptr_dict": rowptr_dict,
        "col_dict": col_dict,
        "rowcount_dict": rowcount_dict,
        "num_nodes_dict": num_nodes_dict,
        "edge_weights_dict": edge_weight_dict
    }

    # Save to disk
    with open(save_path, "wb") as f:
        pickle.dump(save_dict, f, protocol=pickle.HIGHEST_PROTOCOL)
    print(f"Saved adjacency structures (Numba two-pass) to: {save_path}")

In [5]:
%%time
build_and_save_adjacency_numba(
    data,
    save_path="SoundCloud/adj_data_90.pkl"
)

Processed edge type ('user', 'listens', 'track') with 2397045181 edges.
Processed edge type ('track', 'listened_by', 'user') with 2397045181 edges.
Processed edge type ('user', 'follows', 'user') with 1213157842 edges.
Saved adjacency structures (Numba two-pass) to: SoundCloud/adj_data_90.pkl
CPU times: user 11min 50s, sys: 1min 58s, total: 13min 49s
Wall time: 4min 34s


### Sampled dataset

In [4]:
# 10% Sampled graph
data = HeteroData()
data['user'].num_nodes = int(num_users*0.1)
data['track'].num_nodes = num_tracks
# Load data
edge_index_ut = np.load('SoundCloud/edge_index_ut_90.npy')  # Shape: [num_edges, 2]
edge_weight_ut = np.load('SoundCloud/edge_weight_ut_90.npy')  # Shape: [num_edges]
mask = edge_index_ut[:, 0] < data['user'].num_nodes
filtered_edge_index_ut = edge_index_ut[mask]
filtered_edge_weight_ut = edge_weight_ut[mask]  # Keep only corresponding weights
data['user', 'listens', 'track'].edge_index = torch.tensor(filtered_edge_index_ut, dtype=torch.long)
data['user', 'listens', 'track'].edge_weight = torch.tensor(filtered_edge_weight_ut, dtype=torch.float32)
# Include reverse edges too:
data['track', 'listened_by', 'user'].edge_index = torch.tensor(filtered_edge_index_ut[:, [1, 0]], dtype=torch.long)
data['track', 'listened_by', 'user'].edge_weight = torch.tensor(filtered_edge_weight_ut, dtype=torch.float32)
del edge_index_ut, edge_weight_ut, filtered_edge_index_ut, filtered_edge_weight_ut
# Add user-user interactions
edge_index_uu = np.load('SoundCloud/edge_index_uu.npy') # 19 Gb
mask = (edge_index_uu[:, 0] < data['user'].num_nodes) & (edge_index_uu[:, 1] < data['user'].num_nodes)
filtered_edge_index_uu = edge_index_uu[mask]
data['user', 'follows', 'user'].edge_index = torch.tensor(filtered_edge_index_uu, dtype=torch.long)
del edge_index_uu, filtered_edge_index_uu

In [5]:
%%time
build_and_save_adjacency_numba(
    data,
    save_path="SoundCloud/adj_data_90_sampled.pkl"
)

Processed edge type ('user', 'listens', 'track') with 239689843 edges.
Processed edge type ('track', 'listened_by', 'user') with 239689843 edges.
Processed edge type ('user', 'follows', 'user') with 12303491 edges.
Saved adjacency structures (Numba two-pass) to: SoundCloud/adj_data_90_sampled.pkl
CPU times: user 51.8 s, sys: 10.3 s, total: 1min 2s
Wall time: 43.6 s


In [6]:
# Sampled User embeddings
user_embeddings = torch.tensor(np.load('SoundCloud/user_embeddings.npy'), dtype=torch.float32) # 64 Gb
user_embeddings_filtered = user_embeddings[:data['user'].num_nodes]
np.save("SoundCloud/user_embeddings_sampled.npy", user_embeddings_filtered.numpy())

In [7]:
# Sampled users Val and Test set
# Load validation & test set (30Gb)
with open("SoundCloud/val_data.pkl", "rb") as f:
    val_data = pickle.load(f)
with open("SoundCloud/test_data.pkl", "rb") as f:
    test_data = pickle.load(f)

In [8]:
def filter_users(data_dict, num_users):
    return {int(user): tracks for user, tracks in data_dict.items() if int(user) < int(num_users)}
val_data_filtered = filter_users(val_data, data['user'].num_nodes)
test_data_filtered = filter_users(test_data, data['user'].num_nodes)
print(f"Original Val Users: {len(val_data)}, Filtered: {len(val_data_filtered)}")
print(f"Original Test Users: {len(test_data)}, Filtered: {len(test_data_filtered)}")

Original Val Users: 12159002, Filtered: 1215355
Original Test Users: 17615887, Filtered: 1760029


In [9]:
# Save filtered data (optional)
with open("SoundCloud/val_data_sampled.pkl", "wb") as f:
    pickle.dump(val_data_filtered, f)
with open("SoundCloud/test_data_sampled.pkl", "wb") as f:
    pickle.dump(test_data_filtered, f)

### Filtered Sampled Graph

In [4]:
edge_index_ut = np.load('SoundCloud/edge_index_ut_90.npy')  # Shape: [num_edges, 2]

In [5]:
edge_weight_ut = np.load('SoundCloud/edge_weight_ut_90.npy')  # Shape: [num_edges]

In [6]:
edge_index_uu = np.load('SoundCloud/edge_index_uu.npy')

In [7]:
# User activity (listens)
user_listens = np.bincount(edge_index_ut[:, 0], minlength=num_users)
active_users_mask = user_listens > 10  # >10 plays
active_users_idx = np.where(active_users_mask)[0]
print(f"Users with >10 listens: {len(active_users_idx)}/{num_users} "
      f"({len(active_users_idx) / num_users * 100:.2f}%)")

Users with >10 listens: 30615287/113849050 (26.89%)


In [8]:
track_listeners = np.bincount(edge_index_ut[:, 1], minlength=num_tracks)
active_tracks_mask = track_listeners > 5  # >5 listeners
active_tracks_idx = np.where(active_tracks_mask)[0]
print(f"Tracks with >5 listeners: {len(active_tracks_idx)}/{num_tracks} "
      f"({len(active_tracks_idx) / num_tracks * 100:.2f}%)")

Tracks with >5 listeners: 13033110/25841871 (50.43%)


In [9]:
target_users = int(0.3*len(active_users_idx))  # Sample 30%
sampled_users_idx = np.random.choice(active_users_idx, size=target_users, replace=False)
sampled_users_mask = np.zeros(num_users, dtype=bool)
sampled_users_mask[sampled_users_idx] = True

In [10]:
ut_mask = sampled_users_mask[edge_index_ut[:, 0]] & active_tracks_mask[edge_index_ut[:, 1]]
filtered_edge_index_ut = edge_index_ut[ut_mask]
filtered_edge_weight_ut = edge_weight_ut[ut_mask]
uu_mask = sampled_users_mask[edge_index_uu[:, 0]] & sampled_users_mask[edge_index_uu[:, 1]]
filtered_edge_index_uu = edge_index_uu[uu_mask]

In [11]:
user_old_to_new = {old: new for new, old in enumerate(sorted(sampled_users_idx))}
track_old_to_new = {old: new for new, old in enumerate(sorted(active_tracks_idx))}

In [12]:
filtered_edge_index_ut[:, 0] = np.vectorize(user_old_to_new.get)(filtered_edge_index_ut[:, 0])
filtered_edge_index_ut[:, 1] = np.vectorize(track_old_to_new.get)(filtered_edge_index_ut[:, 1])
filtered_edge_index_uu[:, 0] = np.vectorize(user_old_to_new.get)(filtered_edge_index_uu[:, 0])
filtered_edge_index_uu[:, 1] = np.vectorize(user_old_to_new.get)(filtered_edge_index_uu[:, 1])

In [14]:
data = HeteroData()
data['user'].num_nodes = len(sampled_users_idx)
data['track'].num_nodes = len(active_tracks_idx)
data['user', 'listens', 'track'].edge_index = torch.tensor(filtered_edge_index_ut, dtype=torch.long)
data['user', 'listens', 'track'].edge_weight = torch.tensor(filtered_edge_weight_ut, dtype=torch.float32)
data['track', 'listened_by', 'user'].edge_index = torch.tensor(filtered_edge_index_ut[:, [1, 0]], dtype=torch.long)
data['track', 'listened_by', 'user'].edge_weight = torch.tensor(filtered_edge_weight_ut, dtype=torch.float32)
data['user', 'follows', 'user'].edge_index = torch.tensor(filtered_edge_index_uu, dtype=torch.long)

In [15]:
del edge_index_ut, edge_weight_ut, filtered_edge_index_ut, filtered_edge_weight_ut, ut_mask
del edge_index_uu, filtered_edge_index_uu, uu_mask, track_listeners, user_listens

In [16]:
build_and_save_adjacency_numba(data, "SoundCloud/adj_data_90_active_sampled.pkl")

Processed edge type ('user', 'listens', 'track') with 687379887 edges.
Processed edge type ('track', 'listened_by', 'user') with 687379887 edges.
Processed edge type ('user', 'follows', 'user') with 21620345 edges.
Saved adjacency structures (Numba two-pass) to: SoundCloud/adj_data_90_active_sampled.pkl


In [17]:
del data

In [18]:
# Sampled User embeddings
user_embeddings = torch.tensor(np.load('SoundCloud/user_embeddings.npy'), dtype=torch.float32) # 64 Gb
user_embeddings_filtered = user_embeddings[sampled_users_idx]
del user_embeddings
np.save("SoundCloud/user_embeddings_active_sampled.npy", user_embeddings_filtered.numpy())
del user_embeddings_filtered

In [19]:
track_embeddings = torch.tensor(np.load('SoundCloud/track_embeddings.npy'), dtype=torch.float32) # 64 Gb
track_embeddings_filtered = track_embeddings[active_tracks_idx]
del track_embeddings
np.save("SoundCloud/track_embeddings_active.npy", track_embeddings_filtered.numpy())
del track_embeddings_filtered

In [20]:
with open("SoundCloud/val_data.pkl", "rb") as f:
    val_data = pickle.load(f)
with open("SoundCloud/test_data.pkl", "rb") as f:
    test_data = pickle.load(f)
sampled_users_set = set(sampled_users_idx)
active_tracks_set = set(active_tracks_idx)

In [21]:
def reindex_and_filter_data(data_dict, user_map, track_map, sampled_users, active_tracks, min_listens=1):
    new_dict = {}
    for old_user, old_tracks in data_dict.items():
        if old_user in sampled_users:  # Check if user is in sampled set
            # Filter and reindex tracks
            new_tracks = [track_map[old_track] for old_track in old_tracks if old_track in active_tracks]
            if len(new_tracks) >= min_listens:  # Keep only if enough tracks remain
                new_user = user_map[old_user]
                new_dict[new_user] = new_tracks
    return new_dict

In [22]:
val_data_filtered = reindex_and_filter_data(
    val_data,
    user_old_to_new,
    track_old_to_new,
    sampled_users_set,
    active_tracks_set,
    min_listens=1
)

In [24]:
test_data_filtered = reindex_and_filter_data(
    test_data,
    user_old_to_new,
    track_old_to_new,
    sampled_users_set,
    active_tracks_set,
    min_listens=1
)

In [25]:
print(f"Original Val Users: {len(val_data)}, Filtered: {len(val_data_filtered)}")
print(f"Original Test Users: {len(test_data)}, Filtered: {len(test_data_filtered)}")

val_interactions_before = sum(len(tracks) for tracks in val_data.values())
test_interactions_before = sum(len(tracks) for tracks in test_data.values())

print(f"Total interactions in original validation set: {val_interactions_before}")
print(f"Total interactions in original test set: {test_interactions_before}")

val_interactions_after = sum(len(tracks) for tracks in val_data_filtered.values())
test_interactions_after = sum(len(tracks) for tracks in test_data_filtered.values())
print(f"Total interactions in reindexed validation set: {val_interactions_after}")
print(f"Total interactions in reindexed test set: {test_interactions_after}")

Original Val Users: 12159002, Filtered: 3160384
Original Test Users: 17615887, Filtered: 4325513
Total interactions in original validation set: 49693105
Total interactions in original test set: 192883774
Total interactions in reindexed validation set: 13781882
Total interactions in reindexed test set: 53620557


In [26]:
with open("SoundCloud/val_data_active_sampled.pkl", "wb") as f:
    pickle.dump(val_data_filtered, f)
with open("SoundCloud/test_data_active_sampled.pkl", "wb") as f:
    pickle.dump(test_data_filtered, f)