In [4]:
from pyspark.sql.functions import col, from_unixtime, to_timestamp
from delta.tables import DeltaTable

StatementMeta(, cd379c97-b92d-46ad-957e-604f67365c72, 6, Finished, Available, Finished)

In [5]:
# Read the Carbon Intensity JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"Files/carbon_emission_UK_data.json")

StatementMeta(, cd379c97-b92d-46ad-957e-604f67365c72, 7, Finished, Available, Finished)

In [6]:
df = (
    df.select(
        col("from").alias("time_from"),
        col("to").alias("time_to"),
        col("intensity.forecast").alias("forecast_intensity"),
        col("intensity.actual").alias("actual_intensity"),
        col("intensity.index").alias("intensity_index")
    )
)

df.show(n=200, truncate=False)

StatementMeta(, cd379c97-b92d-46ad-957e-604f67365c72, 8, Finished, Available, Finished)

+-----------------+-----------------+------------------+----------------+---------------+
|time_from        |time_to          |forecast_intensity|actual_intensity|intensity_index|
+-----------------+-----------------+------------------+----------------+---------------+
|2024-12-20T10:00Z|2024-12-20T10:30Z|96                |87              |low            |
|2024-12-20T10:30Z|2024-12-20T11:00Z|87                |80              |low            |
|2024-12-20T11:00Z|2024-12-20T11:30Z|82                |75              |low            |
|2024-12-20T11:30Z|2024-12-20T12:00Z|77                |73              |low            |
|2024-12-20T12:00Z|2024-12-20T12:30Z|74                |73              |low            |
|2024-12-20T12:30Z|2024-12-20T13:00Z|70                |74              |low            |
|2024-12-20T13:00Z|2024-12-20T13:30Z|68                |73              |low            |
|2024-12-20T13:30Z|2024-12-20T14:00Z|70                |73              |low            |
|2024-12-2

In [7]:
df = (
    df.withColumn("time_from", to_timestamp(col("time_from"), "yyyy-MM-dd'T'HH:mm'Z'"))
      .withColumn("time_to", to_timestamp(col("time_to"), "yyyy-MM-dd'T'HH:mm'Z'"))
)

df.show(n=200, truncate=False)

StatementMeta(, cd379c97-b92d-46ad-957e-604f67365c72, 9, Finished, Available, Finished)

+-------------------+-------------------+------------------+----------------+---------------+
|time_from          |time_to            |forecast_intensity|actual_intensity|intensity_index|
+-------------------+-------------------+------------------+----------------+---------------+
|2024-12-20 10:00:00|2024-12-20 10:30:00|96                |87              |low            |
|2024-12-20 10:30:00|2024-12-20 11:00:00|87                |80              |low            |
|2024-12-20 11:00:00|2024-12-20 11:30:00|82                |75              |low            |
|2024-12-20 11:30:00|2024-12-20 12:00:00|77                |73              |low            |
|2024-12-20 12:00:00|2024-12-20 12:30:00|74                |73              |low            |
|2024-12-20 12:30:00|2024-12-20 13:00:00|70                |74              |low            |
|2024-12-20 13:00:00|2024-12-20 13:30:00|68                |73              |low            |
|2024-12-20 13:30:00|2024-12-20 14:00:00|70                |

In [8]:
# Remove duplicate rows based on 'time_from' and 'time_to'
df = df.dropDuplicates(["time_from", "time_to"])

StatementMeta(, cd379c97-b92d-46ad-957e-604f67365c72, 10, Finished, Available, Finished)

In [9]:
table_name = "carbon_emission_silver"

StatementMeta(, cd379c97-b92d-46ad-957e-604f67365c72, 11, Finished, Available, Finished)

In [10]:
# Check if the Delta table already exists and perform a merge to add only new data
if DeltaTable.isDeltaTable(spark, table_name):
    delta_table = DeltaTable.forName(spark, table_name)
    delta_table.alias("tgt").merge(
        df.alias("src"),
        "tgt.time_from = src.time_from AND tgt.time_to = src.time_to"  # Matching condition
    ).whenNotMatchedInsertAll().execute()
else:
    # If the table doesn't exist, create it
    df.write.format("delta").mode("overwrite").saveAsTable(table_name)


StatementMeta(, cd379c97-b92d-46ad-957e-604f67365c72, 12, Finished, Available, Finished)