In [0]:
dbutils.widgets.text("catalog_name","")
dbutils.widgets.text("schema_name","")
dbutils.widgets.text("table_name","")
dbutils.widgets.text("s3_bucket","")


In [0]:
catalog_name=dbutils.widgets.get("catalog_name")
schema_name=dbutils.widgets.get("schema_name")
table_name=dbutils.widgets.get("table_name")
s3_bucket=dbutils.widgets.get("s3_bucket")

In [0]:
s3_key=dbutils.secrets.get(scope="geekcoders_llm",key="s3_key")
s3_secret=dbutils.secrets.get(scope="geekcoders_llm",key="s3_secret")

In [0]:
spark.conf.set("fs.s3a.access.key", s3_key)
spark.conf.set("fs.s3a.secret.key", s3_secret)

In [0]:
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# S3 bucket details
checkpoint_path = f"/Volumes/{catalog_name}/{schema_name}/metadata_files/checkpoints/{table_name}"
bronze_table_path = f"{catalog_name}.{schema_name}.{table_name}"


if(spark.catalog.tableExists(bronze_table_path)):
    includeExistingFiles_value="False"
else:
    includeExistingFiles_value="True"


# Read data from S3 using AutoLoader
df = (
    spark.readStream.format("cloudFiles")
    .option(
        "cloudFiles.format", "csv"
    )  # Change format as needed (csv, json, parquet, etc.)
    .option(
        "cloudFiles.inferSchema", "true"
    )  # Set to "false" if using a predefined schema
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .option("cloudFiles.includeExistingFiles", includeExistingFiles_value)
    .load(s3_bucket)
    .withColumn("source_file", input_file_name())
)

# Write data to Bronze Delta Table in append mode
(
    df.writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .trigger(once=True)
    .toTable(f'{bronze_table_path}')
    .awaitTermination()
    
)
