In [0]:

import json
from botocore.exceptions import NoCredentialsError
from pyspark.sql import SparkSession
from datetime import datetime, timezone
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
import boto3

# Step 1 - Establish AWS connections
def establish_aws_connection():
    # Step 1: Get AWS credentials from Databricks Secrets
    aws_access_key = dbutils.secrets.get(scope="aws-secrets", key="aws-access-key")
    aws_secret_key = dbutils.secrets.get(scope="aws-secrets", key="aws-secret-key")

    # Step 2: Initialize boto3 client for S3
    s3 = boto3.client(
        's3',
        aws_access_key_id=aws_access_key,
        aws_secret_access_key=aws_secret_key
    )

    return s3

# Step 2
def load_config_txt(path):
    config = {}
    with open(path, "r") as f:
        for line in f:
            # Skip empty lines and comments
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            # Split key=value
            if '=' in line:
                key, value = line.split("=", 1)
                config[key.strip()] = value.strip()
    return config


# Step 2 - create tracking tables for files which have been ingested
def create_table(table_name, schema):

    # Create an empty DataFrame with this schema
    empty_df = spark.createDataFrame([], schema)

    # Check if table exists
    if not spark.catalog.tableExists(table_name):
        empty_df.write.format("delta").saveAsTable(table_name)
        print("Tracking table created.")


        # Special handling: insert very old timestamp for last_ingested_times
        if table_name == "workspace.silver_schema.last_ingested_times":
            very_old_timestamp = datetime(1900, 1, 1, 0, 0, 0)
            placeholder_row = Row(last_ingested_times=very_old_timestamp)
            init_df = spark.createDataFrame([placeholder_row], schema)
            init_df.write.format("delta").mode("append").saveAsTable(table_name)
            print("Inserted very old timestamp into last_ingested_times.")
    else:
        print("Tracking table already exists.")

# Step 3: Get already ingested file names
def get_ingested_files(tracking_table):
    try:
        ingested_files = set(row["file_name"] for row in spark.table(tracking_table).collect())
    except:
        ingested_files = set()
        print("Tracking table not found. Assuming no files ingested yet.")
    return ingested_files


# step 4: get s3 data
def get_s3_data(s3, bucket, prefix, files_tracking, main_schema, last_ingested_times, files_tracking_schema):
    all_data = [] # to store all new data from json
    new_ingested_json = [] 
    try:
        response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
        for obj in response.get('Contents', []):
            key = obj['Key']
            last_modified = obj['LastModified']  # this is a datetime object in UTC
            print("hello_world")
            # Only ingest files modified after the last_ingested_times
            if key.endswith('.json') and last_modified > last_ingested_times:
                # Download and parse
                file_obj = s3.get_object(Bucket=bucket, Key=key)
                json_data = file_obj['Body'].read().decode('utf-8')
                data_dict = json.loads(json_data)
                all_data.append(data_dict)

                # Log the file for tracking
                new_ingested_json.append((key, datetime.now(timezone.utc)))

            if new_ingested_json:
                latest_modified_time = max(modified_time for _, modified_time in new_ingested_json)

                updated_row = Row(last_ingested_times=latest_modified_time)
                updated_df = spark.createDataFrame([updated_row])

                updated_df.write.format("delta").mode("overwrite").saveAsTable("workspace.silver_schema.last_ingested_times")

                print(f"Updated last_ingested_times to {latest_modified_time}")
            else:
                print("No new files ingested. Timestamp not updated.")

        print(f"New JSON files to ingest: {len(new_ingested_json)}")

    except NoCredentialsError:
        print("AWS credentials not found!")
        all_data = []


    if all_data:
        silver_df = spark.createDataFrame(all_data, schema=main_schema)

        # Append new data
        silver_df.write.format("delta").mode("append").saveAsTable("workspace.silver_schema.silver_delta_table")
        print("New data appended to Delta table.")

        # Step 7: Update tracking table
        print("new files is:", new_ingested_json)
        new_ingested_json_df = spark.createDataFrame(new_ingested_json, schema=files_tracking_schema)
        new_ingested_json_df.write \
        .format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable(files_tracking)


    else:
        print("No new files to process.")