The following code below is all of the Network Rock Band exercise implementation

In [1]:
import re
import json
import requests
import networkx as nx
import os
from concurrent import futures
from threading import Lock
from urllib.parse import quote
import matplotlib.pyplot as plt

In [10]:
# Find all band names from the list of mainstream rock performers
band_pattern = r"\[{2}([^#\[\]<>{}|_]*)[|#]?[^#\[\]<>{}|_]*\]{2}"

baseurl = "https://en.wikipedia.org/w/api.php?"
action = "action=query"
content = "prop=revisions&rvprop=content"
dataformat = "format=json"
headers = {"User-Agent": "MyApp/1.0 (your_email@example.com)"}


def fetch_wiki_rock_bands_content() -> list[str]:
    title = "titles=List_of_mainstream_rock_performers"
    query = "{}{}&{}&{}&{}".format(baseurl, action, content, title, dataformat)
    wikiresponse = requests.get(query, headers=headers)
    wikiJSON = json.loads(wikiresponse.content.decode())
    return next(iter(wikiJSON["query"]["pages"].values()))["revisions"][0]["*"]


def generate_band_query_url(band_name: str):
    title = f"titles={quote(band_name, safe='_')}"
    return "{}{}&{}&{}&{}".format(baseurl, action, content, title, dataformat)


ignore_matches = [
    "AllMusic",
    "rock music",
    "Category:",
    "Template:",
    "Help:",
    "File:",
    "Special:",
    "Wikipedia:",
    "Portal:",
    "Draft:",
    "Talk:",
]
bands: list[str] = re.findall(band_pattern, fetch_wiki_rock_bands_content())

bands = [b for b in bands if not any(b.startswith(prefix) for prefix in ignore_matches)]
bands_query_urls = [generate_band_query_url(band) for band in bands]

In [11]:
len(bands)

488

In [None]:
# Download Wikipedia pages in parallel
if not os.path.exists("wiki_pages"):
    os.makedirs("wiki_pages")


def wiki_page_file_exists(page_name: str):
    filename = f"wiki_pages/{page_name.replace(' ', '_').replace('/', '_')}.txt"
    return os.path.exists(filename) and os.path.getsize(filename) > 0


def download_wiki_page(band_query: str, band_name: str):
    # Check if file already exists and is non-empty
    if wiki_page_file_exists(band_name):
        with open(
            f"wiki_pages/{band_name.replace(' ', '_').replace('/', '_')}.txt",
            "r",
            encoding="utf-8",
        ) as f:
            content = f.read()
        return {"page_name": band_name, "data": content}
    wikiresponse = requests.get(band_query, headers=headers)
    wikiJSON = json.loads(wikiresponse.content.decode())
    page = next(iter(wikiJSON["query"]["pages"].values()))
    if "revisions" not in page:
        return {"page_name": band_name, "data": None}
    content = page["revisions"][0]["*"]
    return {"page_name": band_name, "data": content}


def save_page_content(page_name: str, content: str):
    """Save page content to a file"""
    filename = f"wiki_pages/{page_name.replace(' ', '_').replace('/', '_')}.txt"
    with open(filename, "w", encoding="utf-8") as f:
        f.write(content)
    return filename


# band - wiki-content mapping
downloaded_pages = {}
max_workers = 100
download_lock = Lock()

with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_to_page = {
        executor.submit(
            download_wiki_page,
            band_query,
            bands[bands_query_urls.index(band_query)],
        ): bands[bands_query_urls.index(band_query)]
        for band_query in bands_query_urls
    }

    for i, future in enumerate(futures.as_completed(future_to_page), 1):
        band_name = future_to_page[future]
        result = future.result()
        if result["data"] is not None:
            downloaded_pages[result["page_name"]] = result["data"]
            # Only save if file does not already exist and is non-empty
            if not wiki_page_file_exists(result["page_name"]):
                save_page_content(result["page_name"], result["data"])

print(f"{len(downloaded_pages)} out of {len(bands)} band wiki pages downloaded")

In [None]:
def find_related_bands(band_name: str):
    bands_text = downloaded_pages[band_name]
    related_bands = re.findall(band_pattern, bands_text)
    return related_bands


G_bands = nx.DiGraph()
for band in bands:
    G_bands.add_node(band)
    related_bands = find_related_bands(band)
    for related_band in related_bands:
        if related_band in bands and related_band != band:
            G_bands.add_edge(band, related_band)

In [None]:
nx.write_adjlist(
    G_bands,
    "data/rock_band_network.gz",
    comments="#",
    delimiter="|",
)

G_read = nx.read_adjlist(
    "data/rock_band_network.gz",
    comments="#",
    delimiter="|",
    encoding="utf-8",
    create_using=nx.DiGraph(),
)
print(G_read)

In [None]:
import gzip
import os

os.makedirs("data", exist_ok=True)
node_link = nx.readwrite.node_link_data(G_bands)
out_path = "data/rock_graph_node_link.json.gz"
with gzip.open(out_path, "wt", encoding="utf-8") as fh:
    json.dump(node_link, fh, separators=(",", ":"), ensure_ascii=False)
print("wrote", out_path)