In [0]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import arrays_zip, explode, col, to_timestamp, to_date, date_format, current_timestamp
from pyspark.sql.types import (
    StructType, StructField,
    LongType, StringType, TimestampType
)
from pyspark.errors import PySparkException 
import time
from datetime import datetime

# Ensure Spark session is available
try:
    spark
except NameError:
    spark = SparkSession.builder.getOrCreate()

air_qty_df = spark.table("air_quality_project.air_quality_bronze_data.air_quality_data")

process_name = "air_quality_load_job"

error_schema = StructType([
    #StructField("ID", LongType(), False),
    StructField("Process_name", StringType(), True),
    StructField("error_message", StringType(), True),
    StructField("error_time", TimestampType(), True),
    StructField("error_code", StringType(), True)
])

try:
    df_zip_array_col = air_qty_df.withColumn(
        "hourly",
        explode(
            arrays_zip(
                col("`hourly.time`"),
                col("`hourly.pm10`"),
                col("`hourly.pm2_5`")
            )
        )
    )

    df_struct_col = df_zip_array_col.select(
        col("latitude"),
        col("longitude"),
        col("elevation"),
        col("timezone"),
        to_timestamp(col("hourly.`hourly.time`")).alias("timestamp"),
        col("hourly.`hourly.pm10`").alias("pm10"),
        col("hourly.`hourly.pm2_5`").alias("pm2_5")
    )

    df_silver = (
        df_struct_col
        .withColumn("date", to_date("timestamp"))
        .withColumn("time", date_format("timestamp", "HH:mm:ss"))   
        .withColumn("insertdate", current_timestamp()) 
        .dropna()
        .dropDuplicates()
    )

    df_silver.createOrReplaceTempView("people_view")

    #sql_query = "SELECT 1/0 FROM people_view"
    result_df = spark.sql(sql_query)

    # Display the result
    result_df.show()

    #display(df_silver)

    df_silver.write.format("delta").mode("append").saveAsTable("air_quality_project.air_quality_silver_data.hourly_air_quality")

except PySparkException as e:
    # Handle the specific PySpark error
  

    
    error_row = Row(
        #ID=int(time.time()),  # unique ID using epoch
        Process_name=process_name,
        error_message=str(e),
        error_time=datetime.now(),
        error_code=(e.getCondition() if hasattr(e, "getCondition") else type(e).__name__)
    )

    # Create DataFrame from Row with schema
    error_df = spark.createDataFrame([error_row], schema=error_schema)


    # Write error log to Delta
    error_df.write.format("delta").mode("append").saveAsTable("log_master.process_log.error_log")



except Exception as e:
    # Handle other general Python exceptions
    print(f"A general Python error occurred: {e}")
finally:
    print("Spark session stopped.")
