### Fetch wikitext and extract the genres for all rock artists

In [8]:
# End-to-end: fetch wikitext -> parse infobox genres -> normalize -> save genres.json

from tracemalloc import start
from matplotlib import lines
import re, html, json, time, unicodedata, requests
from typing import List, Optional

API = "https://en.wikipedia.org/w/api.php"
HEADERS = {"User-Agent": "GenreExtractor/1.0 (student project; email@example.com)"}

# ---------- Fetch (resilient) ----------
def canonicalize_title(s: str) -> str:
    s = unicodedata.normalize("NFC", s).strip()
    s = re.sub(r"\s+", " ", s)
    s = re.sub(r"\s*[-–—]\s*", "–", s)  # normalize hyphen/en/em-dash to en dash, no spaces
    return s.replace(" ", "_")

def search_best_title(query: str) -> Optional[str]:
    q = query.replace("_", " ")
    try:
        r = requests.get(API, params={
            "action":"opensearch", "search": q, "limit": 1, "namespace": 0, "format": "json"
        }, headers=HEADERS, timeout=20)
        r.raise_for_status()
        data = r.json()
        if len(data) > 1 and data[1]:
            return data[1][0].replace(" ", "_")
    except Exception:
        pass
    return None

def fetch_via_query(title: str) -> tuple[str, bool]:
    try:
        r = requests.get(API, params={
            "action":"query","format":"json","redirects":1,"converttitles":1,
            "prop":"revisions","rvslots":"main","rvprop":"content","formatversion":2,
            "titles": title,
        }, headers=HEADERS, timeout=30)
        r.raise_for_status()
        data = r.json()
        pages = data.get("query", {}).get("pages", [])
        if not pages: return "", True
        p = pages[0]
        revs = p.get("revisions", [])
        if revs:
            wikitext = revs[0].get("slots", {}).get("main", {}).get("content", "") or ""
        else:
            wikitext = ""
        return wikitext, bool(p.get("missing"))
    except Exception:
        return "", True

def fetch_via_parse(title: str) -> tuple[str, bool]:
    try:
        r = requests.get(API, params={
            "action":"parse","page":title,"prop":"wikitext","format":"json",
            "formatversion":2,"redirects":1,"converttitles":1,
        }, headers=HEADERS, timeout=30)
        r.raise_for_status()
        data = r.json()
        if "error" in data: return "", True
        wikitext = data.get("parse", {}).get("wikitext", "") or ""
        return wikitext, not bool(wikitext)
    except Exception:
        return "", True

def fetch_wikitext_resilient(raw_title: str, sleep_between=0.2) -> tuple[str, str]:
    t0 = canonicalize_title(raw_title)
    w, miss = fetch_via_query(t0)
    if w: return t0, w
    time.sleep(sleep_between)
    w, miss = fetch_via_parse(t0)
    if w: return t0, w
    best = search_best_title(t0)
    if not best: return t0, ""
    time.sleep(sleep_between)
    w, miss = fetch_via_query(best)
    if w: return best, w
    time.sleep(sleep_between)
    w, miss = fetch_via_parse(best)
    return best, (w or "")

# ---------- Parse infobox -> genre field ----------
def extract_infobox(wikitext: str) -> Optional[str]:
    name_re = re.compile(r"^\s*\{\{\s*infobox\s+([^\n{|}]*)", flags=re.I)
    preferred = re.compile(r"\b(musical|music|artist|singer|band|group)\b", flags=re.I)

    i, n = 0, len(wikitext)
    best = None

    while i < n:
        m = re.search(r"\{\{[Ii]nfobox", wikitext[i:])
        if not m:
            break
        start = i + m.start()

        # Find matching "}}" using brace depth
        depth, j = 0, start
        while j < n:
            if wikitext.startswith("{{", j):
                depth += 1; j += 2
                continue
            if wikitext.startswith("}}", j):
                depth -= 1; j += 2
                if depth == 0:
                    end = j
                    box = wikitext[start:end]

                    # Read the infobox name to rank it
                    name_line = wikitext[start:start+200].split("\n", 1)[0]
                    nm = name_re.search(name_line)
                    name = nm.group(1).strip().lower() if nm else ""

                    # Prefer ‘musical’ flavored infoboxes
                    if preferred.search(name):
                        return box
                    if best is None:
                        best = box
                    break
            j += 1
        i = start + 2

    return best


def extract_field_from_infobox(infobox: str, field_names: List[str]) -> Optional[str]:
    # Accept genre, genres, and genre(s)
    field_pat = re.compile(r"^\s*\|\s*genre(?:s|\(s\))?\s*=", flags=re.IGNORECASE)

    # NEW: split into lines but strip any leading HTML comments before checking
    lines = infobox.replace("\r\n", "\n").replace("\r", "\n").split("\n")
    start = None
    for idx, raw_line in enumerate(lines):
        line = raw_line
        # Strip any number of leading comments on this line (e.g., `<!-- ... -->| genre =`)
        # Do it repeatedly in case there are multiple
        while True:
            new_line = re.sub(r"^\s*<!--.*?-->\s*", "", line)
            if new_line == line:
                break
            line = new_line
        if field_pat.match(line):
            start = idx
            # keep only the value after '=' from the cleaned line
            value_after_eq = line.split("=", 1)[1]
            lines[idx] = value_after_eq  # replace just for buffer assembly
            break

    if start is None:
        return None

    buf = [lines[start].lstrip()]

    # NEW: unconditional stop at the next real field (prevents bleed from e.g., instruments=)
    next_field = re.compile(r"^\s*\|\s*[^|=}{\n]+\s*=", flags=re.IGNORECASE)

    for j in range(start + 1, len(lines)):
        line = lines[j]
        if next_field.match(line):
            break
        buf.append(line)

    return "\n".join(buf).strip()



# ---------- Cleanup & normalization ----------
def _unwrap_list_templates(t: str) -> str:
    # unwrap simple wrappers inside (nowrap/nobr/small/span)
    for wrapper in ("nowrap","nobr","small","span"):
        t = re.sub(rf"\{{\{{\s*{wrapper}\s*\|(.+?)\}}\}}", r"\1",
                   t, flags=re.IGNORECASE | re.DOTALL)

    # hlist/ubl/unbulleted list  -> join positional items with ';'
    pat_pipe = re.compile(
        r"\{\{\s*(h\s*list|ubl|unbulleted\s*list)\s*\|(.+?)\}\}",
        flags=re.IGNORECASE | re.DOTALL
    )
    def repl_pipe(m):
        content = m.group(2)
        # keep only positional args (no '=')
        parts = [p.strip() for p in content.split("|") if p.strip() and "=" not in p]
        return ";".join(parts)

    # flatlist/plainlist -> take bullet lines or positional args
    pat_bul = re.compile(
        r"\{\{\s*(flat\s*list|plain\s*list)\s*\|(.+?)\}\}",
        flags=re.IGNORECASE | re.DOTALL
    )

    def repl_bul(m):
        content = m.group(2)
        items = []
        for ln in content.splitlines():
            ln = ln.strip()
            if ln.startswith("*"):
                items.append(re.sub(r"^\*\s*", "", ln))
        if not items:
            items = [p.strip() for p in content.split("|") if p.strip() and "=" not in p]
        return ";".join(items)

    # repeat until nothing changes (nested templates)
    while True:
        new = pat_pipe.sub(repl_pipe, t)
        new = pat_bul.sub(repl_bul, new)
        if new == t:
            break
        t = new

    return t



def strip_wiki_markup(text: str) -> str:
    t = html.unescape(text).replace("\xa0", " ")
    t = re.sub(r"<!--.*?-->", " ", t, flags=re.DOTALL)

    # refs
    t = re.sub(r"<ref[^>]*>.*?</ref>", " ", t, flags=re.IGNORECASE | re.DOTALL)
    t = re.sub(r"<ref[^/]*/>", " ", t, flags=re.IGNORECASE)

    # drop cite/sfn templates
    t = re.sub(r"\{\{\s*cite[^{}]*\}\}", " ", t, flags=re.IGNORECASE | re.DOTALL)
    t = re.sub(r"\{\{\s*sfn[^{}]*\}\}", " ", t, flags=re.IGNORECASE | re.DOTALL)

    # unwrap lists (improved)
    t = _unwrap_list_templates(t)

    # external links
    t = re.sub(r"\[https?://[^\s\]]+\s+([^\]]+)\]", r"\1", t)  # [url label] -> label
    t = re.sub(r"https?://\S+", " ", t)                        # bare urls -> drop

    t = re.sub(r"\{\{\s*lang\s*\|\s*[\w-]+\s*\|\s*([^{}|]+)(?:\|[^{}]*)?\}\}", r"\1", t, flags=re.I)
    t = re.sub(r"\{\{\s*nowrap\s*\|\s*([^{}]+?)\s*\}\}", r"\1", t, flags=re.I)

    # html + wiki links
    t = re.sub(r"<[^>]+>", " ", t)
    t = re.sub(r"\[\[([^|\]]+)\|([^\]]+)\]\]", r"\2", t)
    t = re.sub(r"\[\[([^\]]+)\]\]", r"\1", t)

    # line/item separators
    t = t.replace("<br />",";").replace("<br/>",";").replace("<br>",";")
    t = t.replace("*",";")
    t = re.sub(r"[·•]", ";", t)

    # remove any remaining one-line templates
    t = re.sub(r"\{\{[^{}]*\}\}", " ", t, flags=re.DOTALL)

    # rare leftover: naked 'hlist|' / 'flatlist|' beginnings
    t = re.sub(r"\b(?:h\s*list|flat\s*list|plain\s*list|ubl)\s*\|", " ", t, flags=re.IGNORECASE)

    t = t.replace("\n", ";")  # treat residual line breaks as item separators

    t = re.sub(r"\s+", " ", t).strip()
    return t


CANON_MAP = {
    "pop music": "pop",
    "adult contemporary music": "adult contemporary",
    "rnb": "r&b", "r&b/soul": "r&b",
    "hip hop music": "hip hop",
    "electronic music": "electronic",
    "dance music": "dance",
    "rock & roll": "rock and roll", "rock 'n' roll": "rock and roll",
    "rock'n'roll": "rock and roll", "rock n roll": "rock and roll",
    "middle-of-the-road": "middle of the road",
    "middle of the road (music)": "middle of the road",
}

NON_GENRE_TRASH = re.compile(
    r"^\s*(?:h\s*list|flat\s*list|plain\s*list|ubl|artist)\s*$", re.IGNORECASE)

INSTRUMENT_WORDS = {
    "guitar","vocals","voice","drums","bass","keyboards","piano","organ","synthesizer",
    "harmonica","violin","cello","saxophone","trumpet","percussion"
}

def postfilter_parts(parts: list[str]) -> list[str]:
    out = []
    for p in parts:
        q = p.strip()
        if not q: 
            continue
        if "=" in q:                   # key=value junk
            continue
        if NON_GENRE_TRASH.match(q):   # naked template names etc.
            continue
        if q.lower() in INSTRUMENT_WORDS:  # instruments (e.g., "guitar")
            continue
        out.append(q)
    return out


def split_genres(raw: str) -> list[str]:
    s = raw
    for sep in [";", "•", "·", "—", "–", "/", ","]:
        s = s.replace(sep, ";")
    # optional nicety to split "... rock ... rock"
    s = re.sub(r"\brock\s+(?=[a-z]+\s+rock\b)", " ; ", s, flags=re.I)
    return [p.strip().strip(",;") for p in s.split(";") if p.strip().strip(",;")]


def normalize_genre(g: str) -> Optional[str]:
    if not g: return None
    g0 = g.strip().strip("{}").lower()
    # remove common disambiguators
    g0 = re.sub(r"\s*\((?:music|genre|band|musical group|style)\)\s*", "", g0)
    g0 = g0.replace("&", " and ")
    g0 = re.sub(r"\bmusic\b", "", g0)
    g0 = g0.replace("–", "-").replace("—", "-")
    g0 = re.sub(r"\s*-\s*", "-", g0)
    g0 = CANON_MAP.get(g0, g0)
    g0 = g0.replace("-", " ")
    g0 = re.sub(r"\s+", " ", g0).strip(" ;,")
    return g0 or None

def normalize_list(genres: list[str]) -> list[str]:
    seen, out = set(), []
    for g in genres:
        n = normalize_genre(g)
        if n and n not in seen:
            seen.add(n); out.append(n)
    return out

# ---------- Run ----------
INPUT_FILE = "../rock_artists.txt"
artists = [line.strip() for line in open(INPUT_FILE, encoding="utf-8") if line.strip()]

artist_to_genres = {}
for i, raw in enumerate(artists, 1):
    title, wtxt = fetch_wikitext_resilient(raw, sleep_between=0.25)
    genres = []
    if wtxt:
        ib = extract_infobox(wtxt)
        if ib:
            raw_field = extract_field_from_infobox(ib, ["genre", "genres"])
            if raw_field:
                clean = strip_wiki_markup(raw_field)
                parts = split_genres(clean)
                parts = postfilter_parts(parts)  
                genres = normalize_list(parts)
    artist_to_genres[raw] = genres

# --- Manual fallback for pages that are hard to scrape ---

SKIP_TITLES = {
    "AllMusic",             # not an artist/band
}

MANUAL_MAP = {
    "City and Colour": [
        "folk", "indie folk", "acoustic", "alternative rock", "post hardcore", "screamo", "melodic hardcore"
    ],
    "Eagles (band)": [
        "rock", "country rock", "soft rock", "folk rock", "pop rock"
    ],
    "Electric Light Orchestra": [
        "art rock", "progressive rock", "symphonic rock", "pop rock", "glam rock"
    ],
    "Jack White": [
        "blues rock", "garage rock revival", "alternative rock", "punk blues", "experimental rock"
    ],
    "R.E.M.": [
        "alternative rock", "jangle pop", "college rock", "folk rock", "post punk"
    ],
    "The Monkees": [
        "pop rock", "rock", "bubblegum", "psychedelia"
    ],
    "W.A.S.P. (band)": [
        "heavy metal", "glam metal", "hard rock", "shock rock"
    ],
}

# 1) Drop non-artist pages (so they don't show up as "empty")
for k in SKIP_TITLES:
    artist_to_genres.pop(k, None)

# 2) Fill empties from MANUAL_MAP (only if your scraper returned nothing)
for artist, fallback_genres in MANUAL_MAP.items():
    if not artist_to_genres.get(artist):  # empty or missing
        artist_to_genres[artist] = fallback_genres


with open("genres.json", "w", encoding="utf-8") as f:
    json.dump(artist_to_genres, f, ensure_ascii=False, indent=2)

print("\nSaved genres.json")



Saved genres.json


### *Build alias index for network node matching*

In [11]:
import re, unicodedata, json

# --- simple normalizer used for index keys (short & stable) ---
QUOTES = {"'":"'", "'":"'", """:'"', """:'"', "´":"'", "`":"'"}
DASHES = {"–":"-", "—":"-", "−":"-"}

def nn(s: str) -> str:
    s = unicodedata.normalize("NFKC", s).strip()
    for a,b in QUOTES.items(): s = s.replace(a,b)
    for a,b in DASHES.items(): s = s.replace(a,b)
    s = s.replace("_", " ")
    s = s.replace(" & ", " and ")
    s = re.sub(r"\s*-\s*", "-", s)      # collapse spaces around hyphens
    s = re.sub(r"\s+", " ", s)
    return s.lower()

def strip_the(s: str) -> str:
    return re.sub(r"^\s*the\s+", "", s, flags=re.I)

def drop_paren_suffix(s: str) -> str:
    return re.sub(r"\s*\([^)]*\)\s*$", "", s).strip()

def variants(name: str):
    base = nn(name)
    yield base
    yield strip_the(base)
    no_paren = drop_paren_suffix(base)
    yield no_paren
    yield strip_the(no_paren)

# --- build alias index: normalized variant -> canonical JSON key (original title) ---
alias_index = {}
for title, gl in artist_to_genres.items():
    # skip pages with empty genres
    if not gl: 
        continue
    for v in set(variants(title)):
        alias_index[v] = title

# --- hard aliases you discovered (graph label -> canonical title) ---
# (we store them in the same index by normalizing the LEFT side)
hard_aliases = {
    "Bachman-Turner_Overdrive": "Bachman –Turner Overdrive",
    "The_Go-Go's":              "The Go -Go's",
    "Parliament-Funkadelic":    "Parliament -Funkadelic",
    "The_All-American_Rejects": "The All -American Rejects",
    "Dallas_Green_(musician)":  "City and Colour",
    "Dallas Green":             "City and Colour",
    "AllMusic":                 None,                         # skip
}
for left, right in hard_aliases.items():
    alias_index[nn(left)] = right

# --- save the alias index for name matching ---
with open("alias_index.json", "w", encoding="utf-8") as f:
    json.dump(alias_index, f, ensure_ascii=False, indent=2)

### *Create subgraph with genres*

In [None]:
# --- Create and save processed network subgraph ---
import networkx as nx

# Load the original network
G = nx.read_gexf("../graph/rock_network.gexf")

# Create resolved mapping (network node -> canonical artist name)
resolved = {}
for n in G.nodes():
    canonical = alias_index.get(nn(n))
    if canonical and canonical in artist_to_genres and artist_to_genres[canonical]:
        resolved[n] = canonical

# Create filtered subgraph with only matched nodes that have genres
H = G.subgraph(list(resolved.keys())).copy()

# Save the processed subgraph and resolved mapping
nx.write_gexf(H, "rock_network_with_genres.gexf")
with open("resolved_mapping.json", "w", encoding="utf-8") as f:
    json.dump(resolved, f, ensure_ascii=False, indent=2)

print(f"\n✅ Saved processed network:")
print(f"rock_network_with_genres.gexf: {H.number_of_nodes():,} nodes, {H.number_of_edges():,} edges")
print(f"resolved_mapping.json: {len(resolved):,} node mappings")


✅ Saved processed network:
   • rock_network_with_genres.gexf: 485 nodes, 8,019 edges
   • resolved_mapping.json: 485 node mappings
