## Simulate ingestion of new fitness data row-by-row into the S3 landing zone for Autoloader to pick up

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, to_date, date_format, col

In [0]:
# Source backup table (data March–April 2016)
source_table = "hive_metastore.default.daily_activity_merged_march_april"

# Landing path (where Autoloader watches for new files)
landing_path = "s3://databricks-745bwkyiddeq9fthttjahg-cloud-storage-bucket/ohio-prod/3903799048317088/landing/bronze/daily_activity_stream/"

# Assign control table name
control_table = "workspace.fitness_dlt.ingestion_control"

# Delta table storage path
control_table_path = "s3://databricks-745bwkyiddeq9fthttjahg-cloud-storage-bucket/dlt-control-tables/ingestion_control"

In [0]:
# Create control table 
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {control_table} (
    last_inserted_row INT
)
USING DELTA
LOCATION '{control_table_path}'
COMMENT 'Tracks last inserted row for simulated data ingestion.'
""")

# Initialize control table if empty
if spark.table(control_table).count() == 0:
    spark.sql(f"INSERT INTO {control_table} VALUES (0)")

In [0]:
# Read last inserted row number
last_row_df = spark.sql(f"SELECT last_inserted_row FROM {control_table}")
last_inserted_row = last_row_df.collect()[0][0]

print(f"Last inserted row index: {last_inserted_row}")

# Read full dataset from Hive Metastore
df_all = spark.table(source_table)

# Check if more rows are available
total_rows = df_all.count()

if last_inserted_row >= total_rows:
    raise Exception("All rows have already been inserted! No new data available.")

# Get the next row to insert
df_next_row = df_all.limit(last_inserted_row + 1).tail(1)

# Recreate as DataFrame (because tail() gives you list of Row objects)
df_next = spark.createDataFrame(df_next_row, schema=df_all.schema)

# Convert date format to "M/d/yyyy" to ensure consisent format in Bronze table
df_next = (
    df_next
    .withColumn("ActivityDate", date_format(to_date(col("ActivityDate")), "M/d/yyyy"))
)

In [0]:
# Write the next record to the landing path
(df_next.write
    .format("csv")
    .mode("append")
    .save(landing_path))

print("Inserted new row successfully into landing zone.")

In [0]:
# Update control table to increment last inserted row
spark.sql(f"DELETE FROM {control_table}")  # Clear old record
spark.sql(f"INSERT INTO {control_table} VALUES ({last_inserted_row + 1})")

print(f"Control table updated to last_inserted_row = {last_inserted_row + 1}")