In [0]:
%sql
CREATE CATALOG IF NOT EXISTS casino_ctg;
USE catalog casino_ctg;
 CREATE SCHEMA IF NOT EXISTS log_db;
 USE schema log_db;
 CREATE VOLUME if not exists sourcefiles;
 --TRUNCATE TABLE IF EXISTS transactions_bronze;

In [0]:
# Define the target location in a Unity Catalog Volume
volume_path = "dbfs:/Volumes/casino_ctg/log_db/sourcefiles"
dbutils.fs.mkdirs(volume_path)

True

In [0]:
import pyspark.pandas as ps
import re
from pyspark.sql.functions import col

# --- CONFIGURATION ---
# Base path where all the raw CSVs were downloaded
BASE_VOLUME_PATH = "/Volumes/casino_ctg/log_db/sourcefiles"
TARGET_CATALOG = "casino_ctg"
TARGET_SCHEMA = "log_db"
DATE_COLUMN = 'date'
TS_COLUMN ='timestamp'
# 1. List all CSV files in the raw_files directory
try:
    # dbutils.fs.ls returns a list of FileInfo objects
    file_list = dbutils.fs.ls(BASE_VOLUME_PATH)
   # print(file_list)
    trans_list =[ll for ll in file_list if ll.name.startswith('transactions')]
except Exception as e:
    print(f"Error listing files in {BASE_VOLUME_PATH}: {e}")
    file_list = []



Tried to attach usage logger `pyspark.databricks.pandas.usage_logger`, but an exception was raised: JVM wasn't initialised. Did you call it on executor side?


In [0]:
filename = 'games_metadata.csv'
table_name = f"games_bronze"
full_table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{table_name}"
raw_csv_path = f"{BASE_VOLUME_PATH}/{filename}"

try:
    # Load data using Spark DataFrame API to avoid /dbfs path access issues
    df_spark = spark.read.option("header", True).csv(raw_csv_path)
    
    # Check for the existence of the 'date' column and perform conversion if present
    if DATE_COLUMN in df_spark.columns:
        print(f"  Transforming column '{DATE_COLUMN}' to datetime type.")
        df_spark = df_spark.withColumn(
            DATE_COLUMN,
            col(DATE_COLUMN).cast("timestamp")
        )
    else:
        print(f"  Column '{DATE_COLUMN}' not found, skipping datetime transformation.")

    if TS_COLUMN in df_spark.columns:
        print(f"  Transforming column '{TS_COLUMN}' to datetime type.")
        df_spark = df_spark.withColumn(
            TS_COLUMN,
            col(TS_COLUMN).cast("timestamp")
        )
    else:
        print(f"  Column '{TS_COLUMN}' not found, skipping datetime transformation.")

    # Write to the Bronze Delta Table using the UC naming convention
    df_spark.write.format("delta").mode("overwrite").saveAsTable(full_table_name)
    
    print(f"  Successfully wrote {df_spark.count()} rows to {full_table_name}.")

except Exception as e:
    print(f"  FAILED to process {filename}. Error: {e}")

  Column 'date' not found, skipping datetime transformation.
  Column 'timestamp' not found, skipping datetime transformation.
  Successfully wrote 4 rows to casino_ctg.log_db.games_bronze.


In [0]:
# 2. Iterate over each file, process, and write to Bronze Delta Table
for file_info in trans_list:
    filename=file_info.name
    table_name = f"transactions_bronze"
    full_table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{table_name}"
    raw_csv_path = f"{BASE_VOLUME_PATH}/{filename}"
    
    print(f"\nProcessing file: {filename} -> Target table: {full_table_name}")
    
    try:
        # Load data using Spark DataFrame API to avoid /dbfs path access issues
        df_spark = spark.read.option("header", True).csv(raw_csv_path)
        
        # Check for the existence of the 'date' column and perform conversion if present
        if DATE_COLUMN in df_spark.columns:
            print(f"  Transforming column '{DATE_COLUMN}' to datetime type.")
            df_spark = df_spark.withColumn(DATE_COLUMN, col(DATE_COLUMN).cast("timestamp"))
        else:
            print(f"  Column '{DATE_COLUMN}' not found, skipping datetime transformation.")

        if TS_COLUMN in df_spark.columns:
            print(f"  Transforming column '{TS_COLUMN}' to datetime type.")
            df_spark = df_spark.withColumn(TS_COLUMN, col(TS_COLUMN).cast("timestamp"))
        else:
            print(f"  Column '{TS_COLUMN}' not found, skipping datetime transformation.")

        # Write to the Bronze Delta Table using the UC naming convention
        df_spark.write.format("delta").mode("appendcasino_ctg.log_db.transactions_bronze").saveAsTable(full_table_name)
        
        print(f"  Successfully wrote {df_spark.count()} rows to {full_table_name}.")

    except Exception as e:
        print(f"  FAILED to process {filename}. Error: {e}")


Processing file: transactions_2024-01.csv -> Target table: casino_ctg.log_db.transactions_bronze
  Transforming column 'date' to datetime type.
  Transforming column 'timestamp' to datetime type.
  FAILED to process transactions_2024-01.csv. Error: [UNSUPPORTED_OPERATION] appendcasino_ctg.log_db.transactions_bronze is not supported.

Processing file: transactions_2024-02.csv -> Target table: casino_ctg.log_db.transactions_bronze
  Transforming column 'date' to datetime type.
  Transforming column 'timestamp' to datetime type.
  FAILED to process transactions_2024-02.csv. Error: [UNSUPPORTED_OPERATION] appendcasino_ctg.log_db.transactions_bronze is not supported.

Processing file: transactions_2024-03.csv -> Target table: casino_ctg.log_db.transactions_bronze
  Transforming column 'date' to datetime type.
  Transforming column 'timestamp' to datetime type.
  FAILED to process transactions_2024-03.csv. Error: [UNSUPPORTED_OPERATION] appendcasino_ctg.log_db.transactions_bronze is not sup

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:139)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:139)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
table_name = f"game_metadata_bronze"
full_table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{table_name}"
raw_csv_path = f"{BASE_VOLUME_PATH}/{filename}"
    
print(f"\nProcessing file: {filename} -> Target table: {full_table_name}")
    
try:
        # Load data using Spark DataFrame API to avoid /dbfs path access issues
        df_spark = spark.read.option("header", True).csv(raw_csv_path)
        
        # Check for the existence of the 'date' column and perform conversion if present
        if DATE_COLUMN in df_spark.columns:
            print(f"  Transforming column '{DATE_COLUMN}' to datetime type.")
            df_spark = df_spark.withColumn(DATE_COLUMN, col(DATE_COLUMN).cast("timestamp"))
        else:
            print(f"  Column '{DATE_COLUMN}' not found, skipping datetime transformation.")

        if TS_COLUMN in df_spark.columns:
            print(f"  Transforming column '{TS_COLUMN}' to datetime type.")
            df_spark = df_spark.withColumn(TS_COLUMN, col(TS_COLUMN).cast("timestamp"))
        else:
            print(f"  Column '{TS_COLUMN}' not found, skipping datetime transformation.")

        # Write to the Bronze Delta Table using the UC naming convention
        df_spark.write.format("delta").mode("appendcasino_ctg.log_db.transactions_bronze").saveAsTable(full_table_name)
        
        print(f"  Successfully wrote {df_spark.count()} rows to {full_table_name}.")

except Exception as e:
        print(f"  FAILED to process {filename}. Error: {e}")