Paginated Export API to Databricks

In [None]:
import requests
import json
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
import urllib.parse

# Initialize Spark session
spark = SparkSession.builder.appName("API Data Fetch").getOrCreate()

The page size is about 50000 records and as it's large amount of data, have to filter it so the cluster not crash and have a better performance of the process. 

In [None]:
url_intersections = "https://api/public/v1/models/12121212/"

headers = {
    "accept": "application/json",
    "authorization": "Basic MTIxMjEyMTIxMjExMjE6MTIxMjEyMTEx"
}

def fetch_data_by_scenario(scenario, page_size=50000):
    """
    Fetches data for a given scenario from the API and returns a list of Spark DataFrames in smaller sets.
    """
    # Define filters for the scenario
    filters = [{"field": "Scenario", "eq": str(scenario)}]
    filters_param = urllib.parse.quote(json.dumps(filters))

    # Define start URL
    next_page_url = f"{url_intersections}?filters={filters_param}&pageSize={page_size}"
    all_batches = []  # List to store batches as DataFrames
    batch_count = 0  # Counter to track batches

    while next_page_url:
        response = requests.get(next_page_url, headers=headers)
        response.raise_for_status()
        data = response.json()

        # Extract the data
        if "data" in data:
            df_batch = pd.DataFrame(data["data"])
            batch_count += 1

            # Convert Pandas DataFrame to Spark DataFrame
            spark_df_batch = spark.createDataFrame(df_batch)

            # Append the Spark DataFrame batch to the list
            all_batches.append(spark_df_batch)
            
            print(f"Fetched and added batch {batch_count} with {len(df_batch)} records to memory.")

        else:
            print("No 'data' field in response, stopping.")
            break

        # Update the next page URL
        metadata = data.get("metadata", {})
        next_page_url = metadata.get("nextPage")
        if not next_page_url:
            print("No more pages to fetch.")

    print(f"Finished processing scenario '{scenario}' in {batch_count} batches.")
    return all_batches  # Return the list of DataFrame batches

# Fetch data for the "Live" scenario and store it in memory
data_batches = fetch_data_by_scenario("Live")

# Example: Combine all batches into a single Spark DataFrame if needed
if data_batches:
    combined_df = data_batches[0]
    for df in data_batches[1:]:
        combined_df = combined_df.union(df)

# Print schema and preview of the combined DataFrame
if data_batches:
    print("\nSchema of the combined DataFrame:")
    combined_df.printSchema()

    print("\nPreview of the combined DataFrame:")
    combined_df.show(n=5, truncate=False)



In [None]:
print(combined_df.head())  # Display the first 5 rows of the df

In this case had to set up the column names manually but usually are inherited.

In [None]:
from pyspark.sql.functions import current_timestamp

# Specify column names
column_names = ['Values', 'Account', 'Entity', 'Department', 'Project', 'Year', 'Period', 'Scenario', 'Curency']  # Replace with your desired column names

# Assign the specified column names to the master DataFrame
combined_df = combined_df.toDF(*column_names)

# Display the updated DataFrame with new column names
print("\nPreview of the master DataFrame after assigning column names:")
display(combined_df.limit(5))

# Add a new column with the current timestamp, to have records of the data
combined_df = combined_df.withColumn('timeseries', current_timestamp())

# Print the schema to verify the new column
print("\nSchema after adding the 'timeseries' column:")
combined_df.printSchema()

# Display the DataFrame to verify the new data
print("\nMaster DataFrame after adding the 'timeseries' column:")
display(combined_df.limit(5))

In [None]:
# Cell 2: Define the target table name
target_table = "andrey.default.live"  # Replace with your table name

# Write the data to the table in overwrite mode with schema merging
combined_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(target_table)

print(f"Data successfully appended to Delta table: {target_table}")

# Load the table and display the data
appended_data = spark.table(target_table)

# Show schema and preview of the updated table
print("\nSchema of the updated Delta table:")
appended_data.printSchema()

print("\nPreview of the updated Delta table:")
appended_data.show(n=10, truncate=False)

Query to Check the results 

In [None]:
%sql 

SELECT count(*)
FROM andrey.default.live


In [None]:
%sql
SELECT *
FROM andrey.default.live
WHERE WHERE timeseries LIKE '2024-12-03%'
LIMIT 100