
###  List of Table to pull in API and be loaded in Bronze layer:
- Address
- Customer
- Person
- Product
- ProductCategory
- ProductDescription
- ProductModel
- SalesOrderdetail
- SalesOrderHeader
- Store

Scripts for pulling data from public AdventureWorks API:

In [None]:
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import schema_of_json, from_json, col, lit

def fetch_api_data(endpoint: str):
    """
    Fetches data from a given public API endpoint and returns a Spark DataFrame with inferred schema.
    
    Args:
        endpoint (str): The API endpoint (e.g., 'addresses', 'customers', etc.)
    
    Returns:
        DataFrame: Spark DataFrame containing the API response data with inferred schema
    """
    base_url = "https://demodata.grapecity.com/adventureworks/api/v1/"
    url = f"{base_url}{endpoint}"
    headers = {
        "Accept": "application/json"
    }

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")
        return None

    # Extract column order from first record
    column_order = list(data[0].keys())

    # Initialize SparkSession
    spark = SparkSession.builder.appName("APIData").getOrCreate()

    # Convert JSON objects to strings
    json_strings = [json.dumps(record) for record in data]
    df_raw = spark.createDataFrame(json_strings, "string").toDF("raw_json")

    # Infer schema from a sample
    sample_json = df_raw.select("raw_json").head()[0]
    inferred_schema = spark.range(1).select(schema_of_json(lit(sample_json))).collect()[0][0]

    # Parse JSON strings into structured DataFrame
    df_parsed = df_raw.withColumn("parsed", from_json("raw_json", inferred_schema))
    df_structured = df_parsed.select("parsed.*")

    # Reorder columns to match original API order
    df = df_structured.select([col(c) for c in column_order if c in df_structured.columns])

    return df




Assigning values to the variables:

In [None]:
tables = [
    "addresses", "customers", "persons", "products", "productCategories", "productDescriptions",
    "productModels", "salesOrderDetails", "salesOrders", "stores" 
]

# tables = [
#     "persons"
# ]

schema = "bronze_adworks_rich"  # Bronze schema name
catalog = "adventureworks_dev"  # Unity Catalog name

# for table in tables:
#     df = fetch_api_data(table)
#     display(df)


Scripts for checking
if it has nested JSON, then create new columns; if it has not, then create it as is:

Then write the dataframe into the delta table:

In [None]:
%python
from pyspark.sql.functions import col, current_timestamp

for table in tables:
    df = fetch_api_data(table)

    if df.count() > 0:
        if table in ["stores", "persons"]:
            # Flatten the nested JSON data into new columns for tables with nested JSON
            if table == "stores":
                df = df.select(
                    col("storeId"),
                    col("name"),
                    col("salesPersonId"),
                    col("demographics.AnnualSales").alias("AnnualSales"),
                    col("demographics.AnnualRevenue").alias("AnnualRevenue"),
                    col("demographics.BankName").alias("BankName"),
                    col("demographics.BusinessType").alias("BusinessType"),
                    col("demographics.YearOpened").alias("YearOpened"),
                    col("demographics.Specialty").alias("Specialty"),
                    col("demographics.SquareFeet").alias("SquareFeet"),
                    col("demographics.Brands").alias("Brands"),
                    col("demographics.Internet").alias("Internet"),
                    col("demographics.NumberEmployees").alias("NumberEmployees"),
                    col("modifiedDate")
                )
            if table == "persons":
                df = df.select(
                    col("personId"),
                    col("personType"),
                    col("nameStyle"),
                    col("title"),
                    col("firstName"),
                    col("middleName"),
                    col("lastName"),
                    col("suffix"),
                    col("emailPromotion"),
                    col("additionalContactInfo"),
                    col("demographics.TotalPurchaseYTD").alias("TotalPurchaseYTD"),
                    col("modifiedDate")
                )
    
        table_full_name = f"{catalog}.{schema}.{table}"
    
        # display(df)
        df.write\
            .format("delta")\
            .mode("append")\
            .option("mergeSchema", "true")\
            .saveAsTable(table_full_name)
        print(f"Saved '{table}' to Delta table: {table_full_name}")
    else:
        print(f"No data found for '{table}', skipping save.")