### Phase 1: Metadata-Driven Ingestion (1â€“1.5 Hours) 

In [0]:
import json

# Define control table data for 4 files from the volume
control_data = [
    {
        "file_path": "/Volumes/databricks_catalog/default/databricks_project_volume/accounts.json",
        "table_name": "accounts",
        "schema_info": "account_id STRING, customer_id STRING, account_type STRING, account_number STRING, branch_code STRING, opening_date DATE, account_status STRING, current_balance FLOAT, available_balance FLOAT, interest_rate FLOAT, minimum_balance FLOAT, overdraft_limit FLOAT, last_transaction_date DATE, is_joint_account BOOLEAN, nominee_name STRING, nominee_relationship STRING, created_timestamp TIMESTAMP",
        "target_path": "/Volumes/databricks_catalog/default/databricks_project_volume/accounts_data"
    },
    {
        "file_path": "/Volumes/databricks_catalog/default/databricks_project_volume/branches.json",
        "table_name": "branches",
        "schema_info": "branch_code STRING, branch_name STRING, branch_type STRING, ifsc_code STRING, micr_code STRING, street_address STRING, city STRING, state STRING, pincode STRING, landmark STRING, phone_numbers ARRAY<STRING>, email STRING, fax STRING, opening_time STRING, closing_time STRING, working_days STRING, atm_available BOOLEAN, parking_available BOOLEAN, wheelchair_accessible BOOLEAN, branch_manager STRING, total_employees INT, customer_service_officers INT, services_offered ARRAY<STRING>, region STRING, establishment_date DATE, license_number STRING, is_active BOOLEAN, last_audit_date DATE, compliance_score FLOAT, created_timestamp TIMESTAMP, last_updated TIMESTAMP",
        "target_path": "/Volumes/databricks_catalog/default/databricks_project_volume/branches_data"
    },
    {
        "file_path": "/Volumes/databricks_catalog/default/databricks_project_volume/customers.csv",
        "table_name": "customers",
        "schema_info": "customer_id STRING, first_name STRING, last_name STRING, email STRING, phone STRING, date_of_birth DATE, gender STRING, annual_income FLOAT, pan_number STRING, aadhar_number STRING, city STRING, state STRING, pincode STRING, customer_since DATE, kyc_status STRING, credit_score INT, risk_category STRING, is_active BOOLEAN",
        "target_path": "/Volumes/databricks_catalog/default/databricks_project_volume/customers_data"
    },
    {
        "file_path": "/Volumes/databricks_catalog/default/databricks_project_volume/transactions.csv",
        "table_name": "transactions",
        "schema_info": "transaction_id STRING, from_account_id STRING, to_account_id STRING, transaction_type STRING, amount FLOAT, transaction_date DATE, transaction_timestamp TIMESTAMP, channel STRING, merchant_name STRING, merchant_category STRING, description STRING, reference_number STRING, status STRING, currency STRING, exchange_rate FLOAT, fee_amount FLOAT, location_city STRING, location_state STRING, device_type STRING, ip_address STRING",
        "target_path": "/Volumes/databricks_catalog/default/databricks_project_volume/transactions_data"
    }
]

# Create DataFrame with simple schema_info as DDL string
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType

df = spark.createDataFrame([Row(**item) for item in control_data], schema=StructType([
    StructField('file_path', StringType(), True),
    StructField('table_name', StringType(), True),
    StructField('schema_info', StringType(), True),
    StructField('target_path', StringType(), True)
]))

df.write.mode("overwrite").saveAsTable("default.control_table")

In [0]:
df.display()

In [0]:
%python
# Read control table
control_df = spark.table("default.control_table")

from pyspark.sql.types import _parse_datatype_string

def get_default_value(data_type):
    dtype = str(data_type)
    if dtype == "StringType":
        return ""
    elif dtype in ["IntegerType", "LongType", "ShortType", "ByteType"]:
        return 0
    elif dtype in ["FloatType", "DoubleType", "DecimalType"]:
        return 0.0
    elif dtype == "BooleanType":
        return False
    elif dtype == "DateType":
        return "1970-01-01"
    elif dtype == "TimestampType":
        return "1970-01-01 00:00:00"
    else:
        return None  # Only allow types supported by fillna

for row in control_df.collect():
    file_path = row['file_path']
    table_name = row['table_name']
    schema_info = row['schema_info']
    target_path = row['target_path']
    
    # Only process CSV files
    if file_path.endswith('.csv'):
        schema = _parse_datatype_string(schema_info)
        
        df = (
            spark.read
            .option("header", "true")
            .option("mode", "PERMISSIVE")
            .option("inferSchema", "false")
            .schema(schema)
            .csv(file_path)
        )
        
        # Only include supported types in fill_dict
        fill_dict = {
            field.name: get_default_value(field.dataType)
            for field in schema.fields
            if get_default_value(field.dataType) is not None
        }
        if fill_dict:
            df_clean = df.fillna(fill_dict)
        else:
            df_clean = df
        
        # Validate schema: check column names
        if set(df_clean.columns) == set([field.name for field in schema.fields]):
            df_clean.write.mode("overwrite").saveAsTable(f"default.raw_{table_name}")
        else:
            print(f"Schema mismatch for {table_name}: skipping ingestion.")

metadata_df = spark.createDataFrame(
    [row.asDict() for row in control_df.collect()]
)
metadata_df.write.mode("overwrite").saveAsTable("default.metadata_config")

In [0]:
metadata_df.display()

In [0]:

df_accounts = (
    spark.read
    .option("multiline", "true")
    .json("/Volumes/databricks_catalog/default/databricks_project_volume/accounts.json")
)

df_accounts.write.mode("overwrite").saveAsTable("default.raw_accounts")

In [0]:

df_branches = (
    spark.read
    .option("multiline", "true")
    .json("/Volumes/databricks_catalog/default/databricks_project_volume/branches.json")
)

df_branches.write.mode("overwrite").saveAsTable("default.raw_branches")

In [0]:

df_customers = (
    spark.read
    .option("inferSchema", "true")
    .option("header", "true")
    .csv("/Volumes/databricks_catalog/default/databricks_project_volume/customers.csv")
)


df_customers.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("default.raw_customers")

In [0]:
df_transactions = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/databricks_catalog/default/databricks_project_volume/transactions.csv")
)
df_transactions.write.mode("overwrite").saveAsTable("default.raw_transactions")