###1. Custom API Ingestion
API → Raw JSON files (Custom Ingestion)  -> Load to Datalake/Cloud Buckets -> Auto Loader reads API files incrementally -> Incremental load from Datalake to Bronze

**Story:**
I implemented Lakeflow custom ingestion by pulling data from a 3rd party (Inceptezlabs/yext) REST API service (offers the driver registration information) using Python, landing raw JSON files into Datalake/Cloud storage, and using Auto Loader with schema evolution to incrementally ingest and normalize(structurize) data into a Bronze Delta table/File (as it is)

###2. Auto Loader is Databricks
**Auto Loader is Databricks’ cloud-native file ingestion engine for ingesting new files incrementally from object storage.**

Supported Sources:
- AWS S3
- Azure ADLS Gen2
- Google Cloud Storage (GCS)

Modes:
- Directory listing - Directory listing scans storage paths to detect new files
- File notification - Processes files as soon as they arrive at scale

**Directory listing**
1. Spark lists directory (pull model)
2. Detects new CSV file
3. Infers schema / evolves if needed
4. Processes the file
5. Updates checkpoint (file1 is processed...)
6. Waits for next trigger

**File Notification**
1. Cloud storage emits file-create event (S3 Event, ADLS Event Grid, GCS Pub/Sub)
2. Event is delivered to Databricks queue
3. Auto Loader receives notification (push model)
4. New file is registered
5. Schema is inferred / evolved if needed
6. File is processed immediately
7. Checkpoint is updated
8. Stream stays idle until next event arrives

![](/Workspace/Users/infoblisstech@gmail.com/databricks-code-repo/6_lakeflow_pipelines/api_autoloader.png)

####1. Custom Ingesting data from REST API to Datalake

In [0]:
#We didn't used Spark at all in this cell
import requests
import json
from datetime import datetime

url = "http://inceptezlabs.com/api.php"

'''
#Short version of our below code
resp = requests.get(url)#get the raw json data from api
data = resp.json()#convert the raw json data to python dictionary
proper_jsondata=json.dumps(data) #convert the python dictionary to json string to keep in our datalake for everyone's usage
ts = datetime.now().strftime("%Y%m%d%H%M%S")
output_path = f"/Volumes/lakehousecat1/deltadb/datalake/apidata/posts_{ts}.json"
dbutils.fs.put(output_path,proper_jsondata,overwrite=True)
'''

def fetch_and_save_data():#inline function to pull data from REST API to datalake(Cloud storage)
    try:        
        resp = requests.get(url)

        if resp.status_code != 200:
            print(f"Failed to fetch data. Status Code: {resp.status_code}")
            print(f"Response: {resp.text}")
            return

        try:
            data = resp.json()
            print("Data received successfully:")
            print(data)
        except json.JSONDecodeError:
            print("Error: The response is not valid JSON.")
            return

        ts = datetime.now().strftime("%Y%m%d%H%M%S")
        output_path = f"/Volumes/lakehousecat1/deltadb/datalake/apidata/posts_{ts}.json"#assume as a cloud storage
        
        try:
            dbutils.fs.put(
                output_path,
                json.dumps(data),
                overwrite=True
            )
            print(f"Successfully wrote to {output_path}")
        except NameError:
            print("Error: 'dbutils' is not defined. This code must run in a Databricks notebook.")

    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    fetch_and_save_data()

####2. Auto loader from Datalake to Bronze Layer (Datalake & Lakehouse)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

user_schema = StructType([
    StructField("uid", StringType()),
    StructField("user", StructType([
        StructField("name", StringType()),
        StructField("email", StringType()),
        StructField("location", StringType()),
        StructField("registered", StringType()),])),])

df_raw = (spark.readStream
        .format("cloudFiles")#.schema(user_schema)
        .option("cloudFiles.format", "json")
        #.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.schemaLocation","/Volumes/catalog3_we47/schema3_we47/datalake/bronze/streamwrite41/_schema")
        .option("cloudFiles.maxFilesPerTrigger", 1)
        .load("/Volumes/lakehousecat1/deltadb/datalake/apidata/"))

#Making the data in a semi structured format using from_json
parsed_df = df_raw.withColumn("data", F.from_json(F.col("data"), user_schema))#important function from_json to convert string data in the data column to the custom schema applied json data

#Structurize the semi structured json data to delimited structure format
df_user = (
    parsed_df
        .select(
            F.col("data.uid").alias("uid"),
            F.col("data.user.name").alias("user_name"),
            F.col("data.user.email").alias("user_email"),
            F.col("data.user.location").alias("user_location"),
            F.to_timestamp(
                F.col("data.user.registered")
            ).alias("user_registered_ts"),
            F.current_timestamp().alias("ingestion_ts")
        )
)

(
    df_user.writeStream
        .format("delta")
        .trigger(availableNow=True)
        .option(
            "checkpointLocation",
            "/Volumes/catalog3_we47/schema3_we47/datalake/bronze/streamwrite4/_checkpoint")
        .start(
            "/Volumes/catalog3_we47/schema3_we47/datalake/bronze/streamwrite45"
        )
)

(df_user.writeStream
        .format("delta")
        .trigger(availableNow=True)
        .option(
            "checkpointLocation",
            "/Volumes/catalog3_we47/schema3_we47/datalake/bronze/streamwrite41/_checkpoint"
        )
        .toTable("catalog3_we47.schema3_we47.bronze_user_api2"))


In [0]:
display(spark.sql("select * from catalog3_we47.schema3_we47.bronze_user_api2 order by user_registered_ts desc"))

In [0]:
display(spark.read.format("delta").load("/Volumes/catalog3_we47/schema3_we47/datalake/bronze/streamwrite45").orderBy("user_registered_ts", ascending=False))