In [0]:
%sh
export PYSPARK_PYTHON=python3.11

In [0]:
# Databricks Notebook - Load JSON Files into Bronze Layer and Delete Source Files

from pyspark.sql.functions import current_timestamp, input_file_name,col
from pyspark.sql.types import StructType
from delta.tables import DeltaTable
import os,json

# 🏗️ Step 1: Define Parameters
dbutils.widgets.text("file_path", "/Volumes/dev_catalog/default/landing_zone/game_analytics/Game/*")
dbutils.widgets.text("bronze_table_name", "game")

file_path = dbutils.widgets.get("file_path")  # Directory where JSON files are stored
file_name = dbutils.widgets.get("file_name")  # Specific file (optional)
bronze_table_name = dbutils.widgets.get("bronze_table_name")

# 📂 Construct full JSON path (single file or all files in the directory)
json_source_path = os.path.join(file_path, file_name)

# 🏗️ Step 2: Define Delta Table Metadata
catalog_name = "dev_catalog"
schema_name = "bronze"
bronze_table_path = f"{catalog_name}.{schema_name}.{bronze_table_name}"

# 📌 Set Active Catalog & Schema
spark.sql(f"USE CATALOG {catalog_name} ;")
spark.sql(f"USE SCHEMA {schema_name} ;")

# 🏗️ Step 3: Read JSON Files into DataFrame and create the table

# Define the expected JSON structure
if bronze_table_name == "game":
    schema_json = """
    {
        "type": "struct",
        "fields": [
            {"name": "GameID", "type": "string", "nullable": true},
            {"name": "Publisher", "type": "string", "nullable": true},
            {"name": "Rating", "type": "string", "nullable": true},
            {"name": "Genre", "type": "string", "nullable": true},
            {"name": "Game_Length", "type": "integer", "nullable": true},
            {"name": "Game_release_date", "type": "date", "nullable": true}
        ]
    }
    """

elif bronze_table_name == "player_activity":
    schema_json = """
    {
        "type": "struct",
        "fields": [
            {"name": "PlayerID", "type": "string", "nullable": true},
            {"name": "GameID", "type": "string", "nullable": true},
            {"name": "SessionID", "type": "string", "nullable": true},
            {"name": "Activity_type", "type": "string", "nullable": true},
            {"name": "Level", "type": "integer", "nullable": true},
            {"name": "ExperiencePoints", "type": "float", "nullable": true},
            {"name": "AchievementUnlocked", "type": "integer", "nullable": true},
            {"name": "CurrencyEarned", "type": "float", "nullable": true},
            {"name": "CurrencySpent", "type": "float", "nullable": true},
            {"name": "QuestCompleted", "type": "integer", "nullable": true},
            {"name": "EnemiesDefeated", "type": "integer", "nullable": true},
            {"name": "ItemsCollected", "type": "integer", "nullable": true},
            {"name": "Deaths", "type": "integer", "nullable": true},
            {"name": "DistanceTravelled", "type": "float", "nullable": true},
            {"name": "ChatMessagesSent", "type": "integer", "nullable": true},
            {"name": "TeamEventsParticipated", "type": "integer", "nullable": true},
            {"name": "PlayMode", "type": "string", "nullable": true}
        ]
    }
    """

else:
    raise ValueError(f"❌ Unknown JSON Schema of the bronze_table_name: {bronze_table_name}")

# Convert JSON schema string to StructType
schema_dict = json.loads(schema_json)  # Convert JSON string to dictionary
schema = StructType.fromJson(schema_dict)  # Convert dictionary to Spark StructType

# Read JSON using schema
df = (
    spark.read
    .option("multiline", "true")  # Support multi-line JSON
    .option("recursiveFileLookup", "true")
    .schema(schema)  # Apply defined schema
    .json(json_source_path)  # Read JSON file
    .withColumn("source_file", col("_metadata.file_path"))  # Track file origin
    .withColumn("dt_insert", current_timestamp())  # Track date of insert
)

if df.count() == 0:
    print(f"❌ No data found in {json_source_path}")
else:
    df.write.format("delta").mode("overwrite").saveAsTable(bronze_table_path)
    # 🏗️ Step 6: Delete JSON Files After Successful Load
    if file_name:
        # Delete single file
        dbutils.fs.rm(json_source_path, True)
        print(f"🗑️ Deleted file: {json_source_path}")
    else:
        # Delete all files in the directory
        files = dbutils.fs.ls(file_path)
        for file in files:
            dbutils.fs.rm(file.path, True)
            print(f"🗑️ Deleted files in the folder: {file.path}")

# 🏗️ Step 7: Verify Data
# display(spark.table(bronze_table_path))