In [1]:
%SparkSession

# SparkSession - Delta
| Spark Session        | http://localhost:4040        |
|----------------------|------------------------------|
| Spark Master         | http://localhost:5050        |
| Spark Worker A       | http://localhost:5051        |
| Spark Worker B       | http://localhost:5052        | 
| Hive Metastore       | thrift://hive-metastore:9083 |

| MinIO Object Storage | http://localhost:9090 |
|----------------------|-----------------------|
| user                 | `admin`               |
| password             | `password`            |

- Use `%trino` or `%%trino` to run queries using Trino
- Use `%sparksql` or `%%sparksql` to run queries using SparkSQL


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("SparkSession-Delta") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

spark

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-19f823f5-c6ef-4ca0-a0b3-757cbe9d8c0c;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.0.0/delta-spark_2.12-3.0.0.jar ...
	[SUCCESSFUL ] io.delta#delta-spark_2.12;3.0.0!delta-spark_2.12.jar (477ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/3.0.0/delta-storage-3.0.0.jar ...
	[SUCCESSFUL ] io.delta#delta-storage;3.0.0!delta-storage.jar (22ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.9.3/antlr4-runtime-4.9.3.jar ...
	[SUCCESSFUL ] org.antlr#antlr4-runtime;4.9.3!antlr4-runtime.jar (50ms)
:: resolution report :: resolve 2121ms :: artifacts dl 55

25/03/26 20:21:46 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
25/03/26 20:21:46 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:981)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import  functions as F
from pyspark.sql import SparkSession
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, BooleanType, ByteType
import requests

In [20]:
%sparksql drop table conformed.date_dimension

Execution time: 0.03 seconds


                                                                                

SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

In [5]:
spark.sql("CREATE DATABASE IF NOT EXISTS conformed")

Hive Session ID = 5519f7d5-8b6f-4eab-b369-9da7a6ca9d57


DataFrame[]

In [21]:
# Register the Delta table
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS conformed.date_dimension(
    Date date,
    Date_ID long,
    Day integer ,
    Day_Name string ,
    Is_Weekend boolean ,
    Week integer ,
    Month integer ,
    Month_Name string ,
    Quarter integer ,
    Year integer ,
    Is_Holiday boolean)
    USING DELTA
    LOCATION 's3a://test/date_dimension'
""")

25/03/26 07:51:37 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`conformed`.`date_dimension` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


DataFrame[]

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import when, col, lit

# Initialize Spark session
spark = SparkSession.builder.appName("DateDimension").getOrCreate()

# Define start and end dates
start_date = "2024-01-01"
end_date = "2027-12-31"

# Create a DataFrame with a sequence of dates
date_df = spark.sql(f"""
    SELECT sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day) AS date_range
""").selectExpr("explode(date_range) AS Date")

# Debug: Check the Date column values and type

# Add Date_ID column and debug
date_df_with_id = date_df.withColumn("Date_ID", ((F.unix_timestamp("Date") - lit(946684800)) / lit(86400)).cast("long"))
date_df_with_id.show()

# Add date attributes
date_dimension = (date_df_with_id
    .withColumn("Day", F.dayofmonth("Date")) 
    .withColumn("Day_Name", F.date_format("Date", "EEEE")) 
    .withColumn("Is_Weekend", when(F.dayofweek("Date").isin(1, 7), lit(True)).otherwise(lit(False))) 
    .withColumn("Week", F.weekofyear("Date")) 
    .withColumn("Month", F.month("Date")) 
    .withColumn("Month_Name", F.date_format("Date", "MMMM")) 
    .withColumn("Quarter", F.quarter("Date")) 
    .withColumn("Year", F.year("Date")) 
    .withColumn("Is_Holiday", lit(False)))  # Placeholder for holiday logic


+----------+-------+
|      Date|Date_ID|
+----------+-------+
|2024-01-01|   8766|
|2024-01-02|   8767|
|2024-01-03|   8768|
|2024-01-04|   8769|
|2024-01-05|   8770|
|2024-01-06|   8771|
|2024-01-07|   8772|
|2024-01-08|   8773|
|2024-01-09|   8774|
|2024-01-10|   8775|
|2024-01-11|   8776|
|2024-01-12|   8777|
|2024-01-13|   8778|
|2024-01-14|   8779|
|2024-01-15|   8780|
|2024-01-16|   8781|
|2024-01-17|   8782|
|2024-01-18|   8783|
|2024-01-19|   8784|
|2024-01-20|   8785|
+----------+-------+
only showing top 20 rows



In [24]:
s3_path = "s3a://test/date_dimension"

# Write Date Dimension to S3 as a Delta table
date_dimension.write.format("delta").partitionBy("Year").mode("overwrite").option("overwriteSchema", "true").save(s3_path)

print(f"Delta table saved to {s3_path}")

                                                                                

Delta table saved to s3a://test/date_dimension
