In [0]:
# Databricks notebook source
import requests
import json
import time
from pyspark.sql.types import StructType, StringType, FloatType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json

# API configuration
API_URL = "http://10.223.106.19:8000"
MAX_RETRIES = 3
RETRY_DELAY = 2  # seconds

# Retrieve API key from Databricks Secret

try:
    API_KEY = dbutils.secrets.get(scope="dev_secret_scope", key="fast-api-key")
    print("API key retrieved from secrets")
except Exception as e:
    print(f"Error retrieving API key: {e}")
# Get the workflow name from the notebook context or widget
workflow_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("jobName")
if not workflow_name:
    workflow_name = dbutils.widgets.get("workflow_name")

# Set script_name based on workflow name or fall back to notebook name
script_name = str(workflow_name) + ".py"
# Create widgets to receive parameters (these will be populated by the workflow)
# dbutils.widgets.text("script_name", "", "Script Name")
dbutils.widgets.text("parameters", "", "Parameters (comma-separated key=value pairs)")
# Add this line to debug parameter reception
# print(f"Raw widget values - script_name: '{dbutils.widgets.get('script_name')}', parameters: '{dbutils.widgets.get('parameters')}'")

parameters_str = dbutils.widgets.get("parameters")

# Parse parameters string into a dictionary
parameters = {}
# if parameters_str:
#     for pair in parameters_str.split(","):
#         if "=" in pair:
#             key, value = pair.split("=", 1)
#             parameters[key.strip()] = value.strip()

print(f"Running script: {script_name}")
print(f"With parameters: {json.dumps(parameters, indent=2)}")

# COMMAND ----------
# Check if API is accessible
def check_api_health():
    try:
        response = requests.get(
            f"{API_URL}/health",
            headers={"X-API-Key": API_KEY}
        )
        if response.status_code == 200:
            print("API is accessible!")
            return True
        else:
            print(f"API returned status code: {response.status_code}")
            print(f"Response: {response.text}")
            return False
    except Exception as e:
        print(f"Error connecting to API: {e}")
        return False

# COMMAND ----------
# Run a script with parameters and get results with retry logic
def run_script(script_name, parameters=None):
    retries = 0
    while retries < MAX_RETRIES:
        try:
            payload = {
                "script_name": script_name
            }
            
            if parameters:
                payload["parameters"] = parameters
                
            response = requests.post(
                f"{API_URL}/run-script",
                json=payload,
                headers={"X-API-Key": API_KEY},
                timeout=300  # 5-minute timeout for the client connection
            )
            
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 401:
                # Authentication error
                print("API key authentication failed. Please check your API key.")
                return None
            elif response.status_code == 429:  # Rate limit exceeded
                retry_after = int(response.headers.get('Retry-After', RETRY_DELAY))
                print(f"Rate limit exceeded. Retrying in {retry_after} seconds...")
                time.sleep(retry_after)
                retries += 1
                continue
            else:
                print(f"Error: API returned status code {response.status_code}")
                print(f"Response: {response.text}")
                return None
        except Exception as e:
            print(f"Error running script: {e}")
            retries += 1
            if retries < MAX_RETRIES:
                print(f"Retrying ({retries}/{MAX_RETRIES})...")
                time.sleep(RETRY_DELAY)
            continue
    
    print(f"Maximum retries ({MAX_RETRIES}) exceeded. Unable to execute script.")
    return None

# COMMAND ----------
# Convert JSON result to DataFrame
def json_to_dataframe(json_data):
    if not json_data or not isinstance(json_data, dict):
        print("Invalid JSON data")
        return None
    
    spark = SparkSession.builder.getOrCreate()
    
    # Handle various JSON structures generically
    if "output" in json_data and isinstance(json_data["output"], dict):
        output = json_data["output"]
        
        # If there's a data key with a list of records
        if "data" in output and isinstance(output["data"], list) and output["data"]:
            # Convert to DataFrame
            df = spark.createDataFrame(output["data"])
            return df
            
        # If there's a summary key
        elif "summary" in output and isinstance(output["summary"], dict):
            # Convert summary to a single-row DataFrame
            summary_df = spark.createDataFrame([output["summary"]])
            return summary_df
    
    # If we can't find a specific structure, create a generic DataFrame from the output
    json_str = json.dumps(json_data["output"]) if "output" in json_data else json.dumps(json_data)
    df = spark.createDataFrame([(json_str,)], ["json_data"])
    
    # Try to parse the JSON structure
    try:
        schema = spark.read.json(spark.sparkContext.parallelize([json_str])).schema
        return df.withColumn("parsed_data", from_json(col("json_data"), schema))
    except:
        # If parsing fails, return the raw JSON
        return df

# COMMAND ----------
# Main execution logic
if not script_name:
    print("No script name provided. Please provide a script_name parameter.")
else:
    # Check API health
    if check_api_health():
        # Run the script with parameters
        print(f"Running script: {script_name} with parameters: {parameters}")
        result = run_script(script_name, parameters)
        
        if result:
            print(f"\nScript execution successful: {result['success']}")
            
            # Display any errors
            if result.get("error"):
                print(f"\nExecution Errors:\n{result['error']}")
            
            # Display summary information if available
            if "output" in result and isinstance(result["output"], dict) and "summary" in result["output"]:
                summary = result["output"]["summary"]
                print("\nSummary:")
                for key, value in summary.items():
                    print(f"  {key}: {value}")
            
            # Convert to DataFrame and display
            df = json_to_dataframe(result)
            if df is not None:
                print("\nData from script execution:")
                display(df)
                
                # Optional: Save to a Delta table
                # table_name = f"fast_api_results_{script_name.replace('.', '_')}"
                # df.write.format("delta").mode("append").saveAsTable(table_name)
                # print(f"Results saved to table: {table_name}")
            else:
                print("\nRaw output:")
                print(json.dumps(result["output"], indent=2))
        else:
            print("Failed to execute script or get results.")
    else:
        print("Cannot run script. API is not accessible.")

API key retrieved from secrets
Raw widget values - script_name: 'budget_finance.py', parameters: 'category=Sales,month=January'
Running script: budget_finance.py
With parameters: {
  "category": "Sales",
  "month": "January"
}
API is accessible!
Running script: budget_finance.py with parameters: {'category': 'Sales', 'month': 'January'}

Script execution successful: True

Summary:
  total_budget: 75000
  total_actual: 78200
  total_variance: 3200
  variance_percent: 4.27
  record_count: 1
  run_timestamp: 2025-03-25 10:06:50

Data from script execution:


actual,budget,category,month,quarter,variance
78200,75000,Sales,January,Q1,3200
