# Part 1: Data Cleaning

For the prediction model


## Step 1: Isolate Debuts
Database: billboard_global_200.csv<br>
All of the Global 200 rankings on Billboard since it's beginning

In [None]:
import csv
import re

address = "../data/billboard_global_200.csv"
first_appearance_addy = "../data/clean1.csv"

seen = set()

def normalize(s: str) -> str:
    return re.sub(r'\s+', ' ', s.lower().strip())

def canonical_key(row):
    # row layout: [date, rank, title, main_artist, featured_artists]
    song = normalize(row[2])

    artist_blob = f"{row[3]},{row[4]}"
    artists = re.split(
        r',|&|\+|and|feat\.?|ft\.?|featuring|x|with|w/',
        artist_blob,
        flags=re.IGNORECASE
    )

    artists = sorted({
        normalize(a) for a in artists
        if a and a.strip() and a.strip().lower() not in {"n/a", "none"}
    })

    return (song, tuple(artists))


with open(address, 'r', encoding='utf-8') as file, \
     open(first_appearance_addy, 'w', newline='', encoding='utf-8') as write_file:

    reader = csv.reader(file)
    writer = csv.writer(write_file)

    header = next(reader)
    writer.writerow(header)

    for row in reader:
        if len(row) < 5:
            continue

        key = canonical_key(row)
        if key not in seen:
            seen.add(key)
            writer.writerow(row)

print(f"{len(seen)} unique songs written.")
print("Data cleaning phase 1 done.")


4325 unique songs written.
Data cleaning phase 1 done.


## Step 2: Sweep the Data
- Convert dates to numerical format (decades since September 5th, 2020, caps the value at 1 and still be accurate)
- Normalize ranking (keep b/w 0 and 1), w/ #1 rank being 1 and #200 rank being 0, so greater is better.
- Separate collaborations properly(!!!)
- Replace N/A values (when applicable) with "None"

In [None]:
import csv
import datetime
import math
import re

address = "../data/clean1.csv"
target = "../data/clean2.csv"

columns = "decades_since_2020_Sept,date,rank, rank_norm, title,main_artist,featured_artists"

day_one = datetime.datetime(2020, 9, 5)

# Helpers
def graft_exceptions(artist_list):
    def is_exception(candidate):
        # Exact-match exceptions
        if candidate in EXCEPTIONS:
            return True
        # Ends with "And His Orchestra"
        if candidate.endswith("And His Orchestra"):
            return True
        # Match &-based exceptions ignoring spaces around &
        candidate_norm = candidate.replace(' & ', '&')
        for ex in EXCEPTIONS:
            if ex.replace(' & ', '&') == candidate_norm:
                return True
        return False


    if len(artist_list) <= 1:
        return artist_list
    
    result = []
    i = 0
    while i < len(artist_list):
        max_look = min(i + 4, len(artist_list))
        found = False
        for j in range(max_look, i, -1):
            candidate = ' '.join(artist_list[i:j])  # <--- change here
            if is_exception(candidate):
                result.append(candidate)
                i = j
                found = True
                break
        if not found:
            result.append(artist_list[i])
            i += 1
    return result

def split_artists(main, sec):
    def normalize_artists(s, sec_column_nonempty=False):
        if not s or s.strip() in {'N/A', 'None'}:
            return ''
        if sec_column_nonempty:
            s = re.sub(r'(feat\.?|ft\.?|featuring|Featuring)', ' feat. ', s)
        s = re.sub(r'\s*x\s*', ' x ', s)
        s = re.sub(r'(?i)(\S)With(\S)', r'\1 With \2', s)
        s = re.sub(r'(?i)\s*With\s*', ' With ', s)
        s = re.sub(r'\s+', ' ', s)
        return s.strip()


    def split_names(names, is_feature=False):
        names = normalize_artists(names)
        if not names:
            return []

        # If the whole string matches an exception, skip splitting
        if is_feature:
            if names in EXCEPTIONS or names.endswith("And His Orchestra"):
                return [names]

        # otherwise, split normally
        result = [names]
        for splitter in SPLITTERS:
            temp = []
            for name in result:
                temp.extend([n.strip() for n in name.split(splitter)])
            result = temp

        seen = set()
        unique = []
        for n in result:
            if n and n not in seen:
                seen.add(n)
                unique.append(n)
        return graft_exceptions(unique)



    main_artists = split_names(normalize_artists(main, bool(sec.strip() and sec.strip() != "N/A")))
    sec_artists = split_names(normalize_artists(sec), is_feature=True)

    if len(main_artists) > 1:
        sec_artists = main_artists[1:] + sec_artists
        main_artists = [main_artists[0]]

    return main_artists, sec_artists


with open(address, 'r', encoding='utf-8') as file, \
     open(target, 'w', newline='', encoding='utf-8') as write_file:

    cursor = csv.reader(file)
    writer = csv.writer(write_file)
    writer.writerow(columns.split(','))
    next(cursor)

    SPLITTERS = [',', '&', ' + ', ' and ',
                 ' feat. ', ' ft. ', ' featuring ', ' Featuring ',
                 ' x ', ' X ', ' with ', ' With ', 'Duet With', 'w/', "/"]

    EXCEPTIONS = {
        "Tyler, the Creator",
        "Tyler, The Creator",

        "John Scott Trotter & His Orchestra",
        "Ralph Carmichael Orchestra and Chorus",
        "Georgie Stoll & His Orchestra",

        "AC/DC",
        "Earth, Wind & Fire",
        "TWS: 24/7",
        "HUNTR/X",
    }

    for row in cursor:
        try:
            date = datetime.datetime.strptime(row[0], "%Y-%m-%d")
        except ValueError:
            continue

        diff = ((date - day_one).days / 3652) + 0.000001

        try:
            rank = int(row[1])
            rank_norm = 1 / math.sqrt(rank)
        except ValueError:
            continue

        main = row[3]
        sec = row[4]

        n1, n2 = split_artists(main, sec)

        main_out = f"'{n1[0]}'" if n1 else "'None'"
        sec_out = '"' + ", ".join([f"'{artist}'" for artist in n2]) + '"' if n2 else '"None"'

        writer.writerow([diff, row[0], row[1], rank_norm, row[2], main_out, sec_out])

print("Finished cleaning phase 2")


Finished cleaning phase 2


## Step 3: Engineer Some Features

- Use embeddings for artist names to see which artists are closer to each other
- Create charting recency bias (more recent data weighted more heavily), using numerical format of date

**Feature Engineering**
- For each song, aggregate the past performance of the main artist up to that point
- For each song, aggregate the combined past performance of collaborators (again, w/ recency bias)

Keep in mind release date and song title will be ignored but kept in case in the future I want to use this data.

* In the future, possibly use non-debut data to examine 'longevity' of artists (might be a waste of time). That is, don't ONLY consider the debut of a given artist's songs, just consider it more heavily.

In [6]:
import csv
import math
import re

address = "../data/clean2.csv"
target = "../data/clean3.csv"

new_cols = "decades_since_2020_Sept,date,rank,rank_norm,title,main_artist,featured_artists,artist_past_performance,features_past_performance"

DECAY_LAMBDA = 0.1
FEATURES_DEFAULT = 0.5

artist_pat = re.compile(r"'([^']*)'")
COMMA_SPLIT = re.compile(r'\s*,\s*')


def parse_time(s: str):
    if not s:
        return None
    s = s.strip()
    try:
        return float(s)
    except ValueError:
        if s.startswith("[") and s.endswith("]"):
            try:
                return float(s[1:-1].strip())
            except:
                return None
        return None


def parse_main(name: str) -> str:
    if not name:
        return ""
    return name.strip().strip("'").strip()


def parse_features(s: str) -> list[str]:
    if not s or s in ('None', '"None"'):
        return []
    found = artist_pat.findall(s)
    if found:
        return [f.strip() for f in found if f.strip()]
    inner = s[1:-1].strip() if s.startswith('"') and s.endswith('"') else s
    return [p.strip().strip("'").strip().strip('"') for p in COMMA_SPLIT.split(inner) if p.strip()]


def decay_to(t_now: float, state_tuple):
    S, W, t0 = state_tuple
    if t_now is None:
        return S, W, t0
    if t_now <= t0:
        return S, W, t0
    decay = math.exp(-DECAY_LAMBDA * (t_now - t0))
    return S * decay, W * decay, t_now


with open(address, "r", newline="", encoding="utf-8") as f_in, \
     open(target, "w", newline="", encoding="utf-8") as f_out:

    reader = csv.reader(f_in)
    writer = csv.writer(f_out)
    writer.writerow([c.strip() for c in new_cols.split(",")])
    next(reader, None)  # skip header

    rows = list(reader)

    # stable chronological sort
    def sort_key(row):
        t = parse_time(row[0])
        t_sort = t if t is not None else float("inf")
        date = row[1] if len(row) > 1 else ""
        try:
            rank_int = int(row[2]) if len(row) > 2 and row[2].strip() != "" else 9999
        except:
            rank_int = 9999
        return (t_sort, date, rank_int)

    rows.sort(key=sort_key)

    # per-artist state: artist -> (S, W, last_time)
    state = {}

    for row in rows:
        if len(row) < 7:
            continue

        t = parse_time(row[0])
        if t is None:
            continue

        # normalized rank fallback
        try:
            r_norm = float(row[3])
        except:
            try:
                rank_raw = int(row[2])
                r_norm = 1.0 / math.sqrt(rank_raw) if rank_raw > 0 else 0.5
            except:
                r_norm = 0.5

        main = parse_main(row[5])
        feats = parse_features(row[6])

        # **compute past performance using slower decay**
        S_m, W_m, _ = decay_to(t, state.get(main, (0.0, 0.0, t)))
        main_perf = (S_m / W_m) if W_m > 1e-8 else 0.5

        feat_perfs = []
        for a in feats:
            S_a, W_a, _ = decay_to(t, state.get(a, (0.0, 0.0, t)))
            feat_perfs.append((S_a / W_a) if W_a > 1e-8 else 0.5)
        feats_perf = (sum(feat_perfs) / len(feat_perfs)) if feat_perfs else FEATURES_DEFAULT

        writer.writerow(row + [main_perf, feats_perf])

        # update states **incrementally without collapsing too aggressively**
        for a in {main, *feats}:
            S0, W0, t0 = state.get(a, (0.0, 0.0, t))
            S0, W0, _ = decay_to(t, (S0, W0, t0))
            S0 += r_norm
            W0 += 1.0
            last_time = max(t, t0)
            state[a] = (S0, W0, last_time)

print("Finished Phase 3")


Finished Phase 3


## Step 4: Centrality & Rank

"Nicer" features that actually capture the connectednes the way I want it to.


In [None]:
# NOTE: This takes a long time to run (~2 minutes), will vectorize, clean up, and freshen later.

import csv
import math
import re
from collections import defaultdict
from typing import Dict, List, Tuple

import networkx as nx

INPUT = "../data/clean3.csv"
OUTPUT = "../data/clean4.csv"

DECAY_LAMBDA = 0.15          # temporal decay for edges
PR_ALPHA = 0.85              # PageRank damping
MIN_EDGE_WEIGHT = 1e-6

artist_pat = re.compile(r"'([^']*)'")
COMMA_SPLIT = re.compile(r'\s*,\s*')

def parse_time(x):
    try:
        return float(x)
    except:
        return None

def parse_main(x: str) -> str:
    return x.strip().strip("'") if x else ""

def parse_features(x: str) -> List[str]:
    if not x or x in ("None", '"None"'):
        return []
    found = artist_pat.findall(x)
    if found:
        return [f.strip() for f in found]
    return [
        p.strip().strip("'").strip('"')
        for p in COMMA_SPLIT.split(x)
        if p.strip()
    ]

# (a, b) -> (weight, last_time)
EdgeState = Dict[Tuple[str, str], Tuple[float, float]]

def decay(w, dt):
    return w * math.exp(-DECAY_LAMBDA * dt)


with open(INPUT, newline="", encoding="utf-8") as f:
    reader = csv.reader(f)
    header = next(reader)
    rows = list(reader)

def sort_key(row):
    t = parse_time(row[0])
    return t if t is not None else float("inf")

rows.sort(key=sort_key)

#State
edges: EdgeState = {}
last_global_time = None

out_rows = []

for row in rows:
    t = parse_time(row[0])
    if t is None:
        continue

    main = parse_main(row[5])
    feats = parse_features(row[6])
    artists = [main] + feats
    artists = [a for a in artists if a]

    # --- decay all edges forward in time ---
    if last_global_time is not None:
        dt = t - last_global_time
        if dt > 0:
            for k, (w, t0) in list(edges.items()):
                w2 = decay(w, dt)
                if w2 < MIN_EDGE_WEIGHT:
                    del edges[k]
                else:
                    edges[k] = (w2, t)

    last_global_time = t

    G = nx.Graph()
    for (a, b), (w, _) in edges.items():
        G.add_edge(a, b, weight=w)

    if len(G) > 0:
        deg = dict(G.degree(weight="weight"))
        pr = nx.pagerank(G, alpha=PR_ALPHA, weight="weight")
    else:
        deg = {}
        pr = {}

    deg_main = deg.get(main, 0.0)
    pr_main = pr.get(main, 0.0)

    out_rows.append(row + [deg_main, pr_main])

    for i in range(len(artists)):
        for j in range(i + 1, len(artists)):
            a, b = sorted((artists[i], artists[j]))
            w0, _ = edges.get((a, b), (0.0, t))
            edges[(a, b)] = (w0 + 1.0, t)


with open(OUTPUT, "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(header + ["deg_centrality", "pagerank"])
    writer.writerows(out_rows)

print("Graph feature build complete.")
