In [0]:
import requests
import json
import time
from pyspark.sql.functions import col, count, isnan, when

# Configurations
url_base = "https://api.openbrewerydb.org/v1/breweries"
metadata_url = f"{url_base}/meta"
path_bronze = "abfss://lakehouse@projectabi.dfs.core.windows.net/bronze/open_brewery"
per_page = 50

# Retry function to handle API failures
def get_page_with_retry(url, retries=3, delay=2):
    for attempt in range(retries):
        try:
            response = requests.get(url, timeout=10)
            if response.status_code == 200:
                return response.json()
        except requests.RequestException as e:
            print(f"Attempt {attempt + 1} failed: {e}")
        time.sleep(delay)
    return []

#Get the total number of records
response = requests.get(metadata_url)
metadata = response.json()
total = metadata['total']
total_pages = (total // per_page) + (1 if total % per_page > 0 else 0)

# Collect paginated data
all_breweries = []
for page in range(1, total_pages + 1):
    page_data = get_page_with_retry(f"{url_base}?page={page}&per_page={per_page}")
    all_breweries.extend(page_data)

# Convert data to Spark DataFrame
df = spark.createDataFrame(all_breweries)

# Reorder columns for consistency
ordered_columns = [
    "id", "name", "brewery_type", "address_1", "address_2", "address_3", 
    "city", "state_province", "postal_code", "country", "longitude", 
    "latitude", "phone", "website_url", "state", "street"
]
df = df.select(*[c for c in ordered_columns if c in df.columns])

#Apply basic data quality rules
df_clean = df.filter(
    col("id").isNotNull() &
    col("state").isNotNull() 
)

# Generate Data Quality Report
def generate_quality_report(df, columns):
    total = df.count()
    for col_name in columns:
        nulls = df.filter(col(col_name).isNull() | isnan(col(col_name))).count()
        percent = round((nulls / total) * 100, 2)
        print(f"Column: {col_name} | Nulls: {nulls} | % Nulls: {percent}%")

print("===== Data Quality Report =====")
generate_quality_report(df, ordered_columns)

# Save as JSON to Bronze layer
df_clean.write.mode("overwrite").json(path_bronze)