In [0]:
import requests
from pyspark.sql.functions import col, sha2, concat_ws


class ReadAPI:
    def __init__(self, api_url: str, folder_path_silver: str, primary_key: str):
        """
        api_url: URL endpoint that returns JSON (a list of dicts)
        folder_path_silver: Target location for silver data (e.g., DBFS or local path)
        """
        self.api_url = api_url
        self.folder_path_silver = folder_path_silver
        self.spark = SparkSession.builder.appName("TIP").getOrCreate()
        # self.spark.sparkContext.setLogLevel("ERROR")

        self.username = dbutils.secrets.get(scope = "tip_challenge", key = "tip_challenge_username")
        self.password = dbutils.secrets.get(scope = "tip_challenge", key = "tip_challenge_password")
        
        self.primary_key = primary_key

        if not self.username or not self.password:
            raise ValueError("Missing environment variables: tip_challenge_username or tip_challenge_password")

    def fetch_api_data(self):
        try:
            response = requests.get(self.api_url, auth=(self.username, self.password))
            response.raise_for_status()
            data = response.json()
            df = self.spark.createDataFrame(data)


            return df 
        except Exception as e:
            print(f"Error fetching API data: {e}")
            return []
        
    def incremental_load(self):

        new_data = self.fetch_api_data()

        #add primary_key, this can also be imported from the config, and then used as part of the metadata ingestion framework
        if self.primary_key != '':
            new_data = new_data.withColumn("primary_key", col(self.primary_key))
        else:
            print("primary key not found, hashing on all columns")
            new_data = new_data.withColumn(
                "primary_key",
                sha2(concat_ws("||", *[col(c).cast("string") for c in new_data.columns]), 256)
            )

        new_data.createOrReplaceTempView("new_data")

        spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {self.folder_path_silver}
        USING DELTA
        AS SELECT * FROM new_data WHERE 1 = 0
        """)

        spark.sql(f"""
            MERGE INTO {self.folder_path_silver} AS t
            USING new_data AS s
            ON t.primary_key = s.primary_key
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """)


In [0]:
#metadata ingestion framework, configs can be passed through a database for example or config files

jobs = [
    {
        "api": "http://hiringchallenge.efdwfudnhvaebpej.northeurope.azurecontainer.io/WorkShopManagement",
        "folder_path_silver": "silver.workshop_management",
        "primary_key": "WSM_Key"
    },

    {
        "api": "http://hiringchallenge.efdwfudnhvaebpej.northeurope.azurecontainer.io/WOTLog",
        "folder_path_silver": "silver.wot_log",
        "primary_key": ""
    }

]

for j in jobs:

    print(f"loading table {j["folder_path_silver"]}")

    read_api = ReadAPI(
        api_url=j["api"],
        folder_path_silver=j["folder_path_silver"],
        primary_key = j["primary_key"]
    )
    read_api.fetch_api_data()
    try:
        read_api.incremental_load()
    except Exception as e:
        print(e)

loading table silver.wot_log
primary key not found, hashing on all columns
