In [0]:
import requests, json
from pyspark.sql.functions import *
from pyspark.sql.types import *

bronze_table = "github_issues_bronze"
per_page = 30
token = dbutils.secrets.get(scope="my_scope", key="github_token")

def fetch_issues(pages=5):
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/vnd.github.v3+json"
    }
    
    all_data = []
    for page in range(1, pages + 1):
        print(f"🔄 Fetching page {page}...")
        res = requests.get(
            "https://api.github.com/repos/apache/spark/issues",
            params={"state": "all", "page": page, "per_page": per_page},
            headers=headers
        )
        
        if res.status_code != 200:
            raise Exception(f"GitHub API error: {res.status_code} - {res.text}")
        
        try:
            batch = res.json()
        except Exception as e:
            raise Exception(f"Failed to parse JSON: {e}")
        
        if not isinstance(batch, list):
            print(f"Unexpected response structure: {batch}")
            break

        all_data.extend(batch)
        
        if len(batch) < per_page:
            print("Reached last page.")
            break
    
    print(f"Total issues fetched: {len(all_data)}")
    return all_data


In [0]:
try:
    last_updated = spark.read.table(bronze_table).agg({"updated_at": "max"}).collect()[0][0]
    if last_updated is None:
        since_ts = "2000-01-01T00:00:00Z"
    else:
        since_ts = last_updated.strftime("%Y-%m-%dT%H:%M:%SZ")
except:
    since_ts = "2000-01-01T00:00:00Z"

print(f"Loading issues updated after: {since_ts}")


Loading issues updated after: 2000-01-01T00:00:00Z


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([
    StructField("id", LongType()),
    StructField("number", LongType()),
    StructField("title", StringType()),
    StructField("state", StringType()),
    StructField("updated_at", StringType()),
    StructField("user", StructType([
        StructField("login", StringType())
    ]))
])

# Fetch data
raw_data = fetch_issues(pages=5)

if not raw_data:
    print("No data fetched")
else:
    df = spark.createDataFrame(raw_data, schema)

    df_filtered = df \
        .withColumn("updated_at", to_timestamp("updated_at")) \
        .filter(col("updated_at") > lit(since_ts)) \
        .withColumn("user_login", col("user.login")) \
        .drop("user") \
        .withColumn("ingest_ts", current_timestamp())

    if df_filtered.count() == 0:
        print("No new issues to ingest after filtering.")
    else:
        df_filtered.write.format("delta").mode("append").saveAsTable(bronze_table)
        print(f"Ingested {df_filtered.count()} new records")


🔄 Fetching page 1...
🔄 Fetching page 2...
🔄 Fetching page 3...
🔄 Fetching page 4...
🔄 Fetching page 5...
Total issues fetched: 150
Ingested 150 new records


In [0]:
display(spark.read.table("github_issues_bronze").orderBy(col("updated_at").desc()))

id,number,title,state,updated_at,user_login,ingest_ts
3199460855,51364,[SPARK-52675][ML][CONNECT] Interrupt hanging ML handlers in tests,open,2025-07-03T13:44:39.000Z,WeichenXu123,2025-07-03T13:44:46.381Z
3198911743,51361,[WIP][SPARK-52618][SQL] Casting TIME(n) to TIME(m),open,2025-07-03T13:08:37.000Z,MaxGekk,2025-07-03T13:44:46.381Z
3197564924,51354,[SPARK-52660][SQL] Add time type to `CodeGenerator#javaClass`,closed,2025-07-03T12:23:07.000Z,bersprockets,2025-07-03T13:44:46.381Z
3198964388,51363,[SPARK-52673][CONNECT][CLIENT] Add grpc RetryInfo handling to Spark Connect retry policies,open,2025-07-03T12:22:06.000Z,khakhlyuk,2025-07-03T13:44:46.381Z
3198921208,51362,[SPARK-52674][SQL] Clean up the usage of deprecated APIs related to `RandomStringUtils`,open,2025-07-03T10:49:51.000Z,LuciferYang,2025-07-03T13:44:46.381Z
3161928074,51227,[SPARK-52535][SQL] Improve code readability of rule ApplyColumnarRulesAndInsertTransitions,closed,2025-07-03T10:33:52.000Z,zhztheplayer,2025-07-03T13:44:46.381Z
3198817544,51360,[SPARK-52671][SQL] RowEncoder shall not lookup a resolved UDT,open,2025-07-03T10:04:31.000Z,yaooqinn,2025-07-03T13:44:46.381Z
3198708344,51359,[SPARK-52672][SQL] Don't replace Sort/Having expressions with aliases if expression exists in Aggregate,open,2025-07-03T09:47:59.000Z,mihailotim-db,2025-07-03T13:44:46.381Z
3187748015,51323,[SPARK-52381][CORE][3.5] JsonProtocol: Only accept subclasses of SparkListenerEvent,closed,2025-07-03T09:35:46.000Z,pjfanning,2025-07-03T13:44:46.381Z
3197859941,51355,[SPARK-52665][BUILD] Fix make-distribution.sh [: missing `]',open,2025-07-03T09:24:50.000Z,cxzl25,2025-07-03T13:44:46.381Z


In [0]:
expected_count = len(raw_data)
actual_count = df_filtered.count()

assert expected_count == actual_count, f"Mismatch! Expected: {expected_count}, Got: {actual_count}"
print(f"Row count validation passed: {expected_count}")

Row count validation passed: 150
