In [0]:
import requests

# uploaded stackoverflow api key using databricks-cli (docker image)
api_key = dbutils.secrets.get('so_api_scope', 'so_api_key')

def fetch_page(page):
    url = "https://api.stackexchange.com/2.3/questions"
    params = {
        "page": page,
        "pagesize": 100,
        "order": "desc",
        "sort": "creation",
        "site": "stackoverflow",
        "key": api_key
    }
    data = requests.get(url, params=params).json()
    return data.get("items", [])

In [0]:
# example
fetch_page(1)

In [0]:
from pyspark.sql import functions as F
import json

def process_page(page):
    items = fetch_page(page)
    if not items:
        return
    
    # convert problematic nested fields to JSON strings
    for item in items:
        complex_fields = ['migrated_to', 'migrated_from', 'posted_by_collectives', 'owner', 'closed_details']
        
        for field in complex_fields:
            if field in item and item[field] is not None:
                item[field] = json.dumps(item[field])

    df = spark.createDataFrame(items) \
            .withColumn("question_id", F.col("question_id").cast("long")) \
            .withColumn("creation_date", F.col("creation_date").cast("long")) \
            .withColumn("creation_period", F.floor(F.col("creation_date") / 1e6).cast("long")) \
            .withColumn("last_activity_date", F.col("last_activity_date").cast("long"))
    # dates are in unix time (unix epoch, seconds since 1970-01-01), so we divide by 1e7 ~= 100 days, to partition data later using this column
    df = df.select("question_id", "creation_date", "creation_period", "last_activity_date",
                   *[c for c in df.columns if c not in ["question_id", "creation_date", "creation_period", "last_activity_date"]])
    
    return df

In [0]:
process_page(1).display()

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS bronze;
CREATE DATABASE IF NOT EXISTS silver;
CREATE DATABASE IF NOT EXISTS gold;

CREATE TABLE IF NOT EXISTS bronze.questions
USING DELTA
LOCATION '/mnt/bronze/questions';

CREATE TABLE IF NOT EXISTS silver.questions
USING DELTA
LOCATION '/mnt/silver/questions';

With a given (API key, IP) pair, we have 1000 API calls per day.

Each call gives us 100 entries (questions).

To avoid hitting the limit (since we've made some calls already), let's say we have 900.

What we could do is to use 10 workers in parallel...

In [0]:
from concurrent.futures import ThreadPoolExecutor
from typing import List
from functools import reduce

def fetch_and_write_parallel(
    pages: List[int],
    table_name: str = "bronze.questions",
    max_workers: int = 10
):
    
    # fetch  in parallel
    dfs = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = executor.map(process_page, pages)
        dfs = [df for df in results if df is not None]
    
    # combine using unionByName (handles different columns)
    combined_df = reduce(
        lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True),
        dfs
    )
    
    # write to delta lake
    combined_df.write \
        .format("delta") \
        .mode("append") \
        .partitionBy("creation_period") \
        .saveAsTable("bronze.questions")

fetch_and_write_parallel(
    pages=range(1,901),
    table_name="bronze.questions",
    max_workers=10
)

In [0]:
%sql
DESCRIBE DETAIL bronze.questions;

In [0]:
%sql
OPTIMIZE bronze.questions 
ZORDER BY (question_id)

In [0]:
%sql
DESCRIBE DETAIL bronze.questions

In [0]:
# now the silver layer
from pyspark.sql import functions as F

bronze_df = spark.table("bronze.questions")

silver_df = bronze_df.select(
    F.col("question_id"),
    F.from_unixtime(F.col("creation_date")).cast("timestamp").alias("creation_date"),
    F.col("creation_period"),
    F.from_unixtime(F.col("last_activity_date")).cast("timestamp").alias("last_activity_date"),
    F.col("tags"),
    F.col("answer_count").cast("int"),
    F.col("is_answered").cast("boolean"),
    F.col("view_count").cast("int")
).filter(
    F.col("question_id").isNotNull()
).dropDuplicates(["question_id"])

silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("creation_period") \
    .saveAsTable("silver.questions")

In [0]:
%sql SELECT * FROM silver.questions LIMIT 20