In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sanity").getOrCreate()

25/09/19 14:12:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
df=spark.range(5)
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [3]:
spark.sql("SHOW CATALOGS").show(truncate=False)

+-------------+
|catalog      |
+-------------+
|demo         |
|spark_catalog|
+-------------+



In [1]:
print(spark.version)
print(spark.sparkContext.appName)
spark.sparkContext.getConf().getAll()


3.5.5
PySparkShell


[('spark.eventLog.enabled', 'true'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.sql.catalogImplementation', 'in-memory'),
 ('spark.sql.ca

## Running a small test with sample data


In [59]:
spark.stop()


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("df-basics").getOrCreate()


25/09/19 14:12:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
spark.sparkContext.appName

'df-basics'

In [6]:
orders = spark.read.option("header", True).option("inferSchema", True).csv("/home/iceberg/data/testing-spark/orders.csv")
customers = spark.read.option("header", True).option("inferSchema", True).csv("/home/iceberg/data/testing-spark/customers.csv")
products = spark.read.option("header", True).option("inferSchema", True).csv("/home/iceberg/data/testing-spark/products.csv")


In [41]:

orders.printSchema(); customers.printSchema(); products.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_ts: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- qty: integer (nullable = true)
 |-- order_date: date (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)

root
 |-- product_id: integer (nullable = true)
 |-- sku: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)



In [8]:
from pyspark.sql import functions as F

orders = (orders
  .withColumn("order_date", F.to_date("order_ts"))
        )

              
bg_customers = customers.filter(F.col("country") == "BG").select("customer_id","country")
bg_customers.show()

+-----------+-------+
|customer_id|country|
+-----------+-------+
|          1|     BG|
|          2|     BG|
+-----------+-------+



In [33]:
from pyspark import StorageLevel

line_items = orders\
    .join(products, "product_id")\
    .withColumn("line_amount", F.col("qty")*F.col("price"))\
    .persist(StorageLevel.MEMORY_ONLY)  #will bne reused a couple of times. Let's cache it

line_items.count()
               
daily_sales = line_items\
            .groupBy("order_date")\
            .agg(
                F.sum("line_amount").alias("gross_sales"),
                F.countDistinct("order_id").alias("orders"),
                F.sum("qty").alias("units")
            )\
            .orderBy("order_date")

daily_sales.show()

+----------+-----------+------+-----+
|order_date|gross_sales|orders|units|
+----------+-----------+------+-----+
|2025-03-21|      47.48|     2|    3|
|2025-03-22|      84.99|     3|    5|
|2025-03-23|      54.98|     2|    4|
|2025-03-24|       41.0|     2|    2|
|2025-03-25|      67.47|     3|    4|
|2025-03-26|       70.0|     2|    3|
|2025-03-27|      67.47|     3|    4|
|2025-03-28|       29.0|     1|    1|
+----------+-----------+------+-----+



In [13]:
#daily_sales.repartition(1).write.mode("overwrite").partitionBy("order_date").parquet("data/testing-spark-output/daily_sales")

daily_sales.repartition(F.col("order_date")).sortWithinPartitions("order_date").write.mode("overwrite").partitionBy("order_date").parquet("data/testing-spark-output/daily_sales")

                                                                                

##### Interesting is that first daily_sales is ordered by date but
##### this seems to be done only for the benefit of the partitionBy 
##### when writing to parquet. 
##### Later on when I read the parquet it reads all the separate partitions but the "ds" dataframe is not ordered by date.

In [10]:
ds = spark.read.parquet("data/testing-spark-output/daily_sales")
#ds.show()
ds.orderBy("order_date").show()

+-----------+------+-----+----------+
|gross_sales|orders|units|order_date|
+-----------+------+-----+----------+
|      47.48|     2|    3|2025-03-21|
|      84.99|     3|    5|2025-03-22|
|      54.98|     2|    4|2025-03-23|
|       41.0|     2|    2|2025-03-24|
|      67.47|     3|    4|2025-03-25|
|       70.0|     2|    3|2025-03-26|
|      67.47|     3|    4|2025-03-27|
|       29.0|     1|    1|2025-03-28|
+-----------+------+-----+----------+



In [18]:
daily_sales.explain(True)

== Parsed Logical Plan ==
'Sort ['order_date ASC NULLS FIRST], true
+- Aggregate [order_date#86], [order_date#86, sum(line_amount#118) AS gross_sales#140, count(distinct order_id#24) AS orders#141L, sum(qty#28) AS units#143L]
   +- Project [product_id#27, order_id#24, order_ts#25, customer_id#26, qty#28, order_date#86, sku#79, category#80, price#81, (cast(qty#28 as double) * price#81) AS line_amount#118]
      +- Project [product_id#27, order_id#24, order_ts#25, customer_id#26, qty#28, order_date#86, sku#79, category#80, price#81]
         +- Join Inner, (product_id#27 = product_id#78)
            :- Project [order_id#24, order_ts#25, customer_id#26, product_id#27, qty#28, to_date(order_ts#25, None, Some(Etc/UTC), false) AS order_date#86]
            :  +- Relation [order_id#24,order_ts#25,customer_id#26,product_id#27,qty#28] csv
            +- Relation [product_id#78,sku#79,category#80,price#81] csv

== Analyzed Logical Plan ==
order_date: date, gross_sales: double, orders: bigint, un

In [19]:
ds.explain(True)

== Parsed Logical Plan ==
Relation [gross_sales#208,orders#209L,units#210L,order_date#211] parquet

== Analyzed Logical Plan ==
gross_sales: double, orders: bigint, units: bigint, order_date: date
Relation [gross_sales#208,orders#209L,units#210L,order_date#211] parquet

== Optimized Logical Plan ==
Relation [gross_sales#208,orders#209L,units#210L,order_date#211] parquet

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [gross_sales#208,orders#209L,units#210L,order_date#211] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/notebooks/notebooks/data/testing-spark-output/daily..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<gross_sales:double,orders:bigint,units:bigint>



In [21]:

#ds.cache().count()
#ds.is_cached
ds.storageLevel
#ds.unpersist()  # unpersist only this cache
#ds.is_cached
#spark.stop()

StorageLevel(False, False, False, False, 1)

### Excercises

In [34]:
line_items.show()

+----------+--------+-------------------+-----------+---+----------+------------+-----------+-----+-----------+
|product_id|order_id|           order_ts|customer_id|qty|order_date|         sku|   category|price|line_amount|
+----------+--------+-------------------+-----------+---+----------+------------+-----------+-----+-----------+
|        10|    1001|2025-03-21 10:15:00|          1|  2|2025-03-21|S-RED-TSHIRT|    Apparel|19.99|      39.98|
|        12|    1002|2025-03-21 11:02:00|          1|  1|2025-03-21|  MUG-COFFEE|       Home|  7.5|        7.5|
|        14|    1003|2025-03-22 09:45:00|          2|  1|2025-03-22|  NB-13-CASE|Electronics| 29.0|       29.0|
|        13|    1004|2025-03-22 10:10:00|          3|  3|2025-03-22|HDMI-CABL-2M|Electronics| 12.0|       36.0|
|        10|    1005|2025-03-22 11:35:00|          5|  1|2025-03-22|S-RED-TSHIRT|    Apparel|19.99|      19.99|
|        11|    1006|2025-03-23 08:00:00|          4|  2|2025-03-23|S-BLU-TSHIRT|    Apparel|19.99|     

In [35]:
cat_sales= line_items\
            .groupBy("category")\
            .agg(F.round(F.sum("line_amount"),2).alias("revenue"))\
            .orderBy(F.desc("revenue"))

In [36]:
cat_sales.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [revenue#1325 DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(revenue#1325 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1421]
      +- HashAggregate(keys=[category#80], functions=[sum(line_amount#423)])
         +- Exchange hashpartitioning(category#80, 200), ENSURE_REQUIREMENTS, [plan_id=1418]
            +- HashAggregate(keys=[category#80], functions=[partial_sum(line_amount#423)])
               +- InMemoryTableScan [category#80, line_amount#423]
                     +- InMemoryRelation [product_id#27, order_id#24, order_ts#25, customer_id#26, qty#28, order_date#86, sku#79, category#80, price#81, line_amount#423], StorageLevel(memory, 1 replicas)
                           +- AdaptiveSparkPlan isFinalPlan=true
                              +- == Final Plan ==
                                 *(2) Project [product_id#27, order_id#24, order_ts#25, customer_id#26, qty#28, order_date#86, sku#79, category#

#### Note that after caching the polan contains 
 +- AdaptiveSparkPlan isFinalPlan=true
                              +- == Final Plan ==
                                  
which is the orginal plan of the cached bit


In [39]:
by_country = orders.join(customers, "customer_id")\
  .groupBy("country")\
  .agg(F.countDistinct("order_id").alias("orders"))
by_country.show()

+-------+------+
|country|orders|
+-------+------+
|     DE|     3|
|     US|     7|
|     BG|     8|
+-------+------+



In [57]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_ts: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- qty: integer (nullable = true)
 |-- order_date: date (nullable = true)



In [54]:
from pyspark.sql.window import Window
import time
w = Window.partitionBy("order_date").orderBy(F.desc("units"))

top_per_day = (orders\
  .groupBy("order_date","product_id")\
  .agg(F.sum("qty").alias("units"))\
  .withColumn("rnk", F.row_number().over(w))\
  .filter("rnk = 1"))
top_per_day.show()

print("Spark UI:", spark.sparkContext.uiWebUrl)
time.sleep(120)

+----------+----------+-----+---+
|order_date|product_id|units|rnk|
+----------+----------+-----+---+
|2025-03-21|        10|    2|  1|
|2025-03-22|        13|    3|  1|
|2025-03-23|        11|    2|  1|
|2025-03-24|        13|    1|  1|
|2025-03-25|        10|    2|  1|
|2025-03-26|        14|    2|  1|
|2025-03-27|        11|    2|  1|
|2025-03-28|        14|    1|  1|
+----------+----------+-----+---+

Spark UI: http://839fa80d5d3b:4041


In [56]:
spark.stop()

# Day 4
## Joins, Aggregations, Spark SQL, Windowing

In [3]:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("joins-sql-windows").getOrCreate()


orders = spark.read.option("header", True).option("inferSchema", True).csv("/home/iceberg/data/testing-spark/orders.csv") \
    .withColumn("order_ts", F.to_timestamp("order_ts")) \
    .withColumn("order_date", F.to_date("order_ts")) \
    .withColumn("qty", F.col("qty").cast("int"))
customers = spark.read.option("header", True).option("inferSchema", True).csv("/home/iceberg/data/testing-spark/customers.csv")
products = spark.read.option("header", True).option("inferSchema", True).csv("/home/iceberg/data/testing-spark/products.csv")

In [7]:
orders.printSchema(), customers.printSchema(), products.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_ts: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- qty: integer (nullable = true)
 |-- order_date: date (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)

root
 |-- product_id: integer (nullable = true)
 |-- sku: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)



(None, None, None)

In [12]:
# Join Types 

inner = orders.join(customers, "customer_id")
inner.show()
no_order_customers = customers.join(orders, "customer_id", "left_anti")
no_order_customers.show()

+-----------+--------+-------------------+----------+---+----------+----------+---------+-------+-----------+
|customer_id|order_id|           order_ts|product_id|qty|order_date|first_name|last_name|country|signup_date|
+-----------+--------+-------------------+----------+---+----------+----------+---------+-------+-----------+
|          1|    1001|2025-03-21 10:15:00|        10|  2|2025-03-21|      Alex|   Petrov|     BG| 2024-11-01|
|          1|    1002|2025-03-21 11:02:00|        12|  1|2025-03-21|      Alex|   Petrov|     BG| 2024-11-01|
|          2|    1003|2025-03-22 09:45:00|        14|  1|2025-03-22|     Maria|  Ivanova|     BG| 2024-11-15|
|          3|    1004|2025-03-22 10:10:00|        13|  3|2025-03-22|       Sam|       Ng|     US| 2025-01-03|
|          5|    1005|2025-03-22 11:35:00|        10|  1|2025-03-22|     Chris|      Lee|     US| 2025-03-11|
|          4|    1006|2025-03-23 08:00:00|        11|  2|2025-03-23|      Jana|  Schmidt|     DE| 2025-02-20|
|         

In [13]:
#Spark SQL
orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")
products.createOrReplaceTempView("products")

spark.sql("""
SELECT o.order_date, c.country, SUM(o.qty * p.price) AS revenue
FROM orders o
JOIN customers c USING (customer_id)
JOIN products p USING (product_id)
GROUP BY o.order_date, c.country
ORDER BY o.order_date, revenue DESC
""").show()


+----------+-------+------------------+
|order_date|country|           revenue|
+----------+-------+------------------+
|2025-03-21|     BG|             47.48|
|2025-03-22|     US|55.989999999999995|
|2025-03-22|     BG|              29.0|
|2025-03-23|     DE|             39.98|
|2025-03-23|     US|              15.0|
|2025-03-24|     BG|              41.0|
|2025-03-25|     US|             47.48|
|2025-03-25|     DE|             19.99|
|2025-03-26|     BG|              70.0|
|2025-03-27|     DE|             39.98|
|2025-03-27|     US|             27.49|
|2025-03-28|     BG|              29.0|
+----------+-------+------------------+



In [19]:
# Null handling
from pyspark.sql.types import DecimalType

li=orders.join(products, "product_id")\
        .withColumn("price", F.col("price").cast(DecimalType(10,2)))\
        .withColumn("line_amount", (F.col("qty")*F.col("price")).cast(DecimalType(12,2)))
li.select("order_id","product_id","qty","price","line_amount").show()

+--------+----------+---+-----+-----------+
|order_id|product_id|qty|price|line_amount|
+--------+----------+---+-----+-----------+
|    1001|        10|  2|19.99|      39.98|
|    1002|        12|  1| 7.50|       7.50|
|    1003|        14|  1|29.00|      29.00|
|    1004|        13|  3|12.00|      36.00|
|    1005|        10|  1|19.99|      19.99|
|    1006|        11|  2|19.99|      39.98|
|    1007|        12|  2| 7.50|      15.00|
|    1008|        14|  1|29.00|      29.00|
|    1009|        13|  1|12.00|      12.00|
|    1010|        10|  2|19.99|      39.98|
|    1011|        11|  1|19.99|      19.99|
|    1012|        12|  1| 7.50|       7.50|
|    1013|        14|  2|29.00|      58.00|
|    1014|        13|  1|12.00|      12.00|
|    1015|        10|  1|19.99|      19.99|
|    1016|        11|  2|19.99|      39.98|
|    1017|        12|  1| 7.50|       7.50|
|    1018|        14|  1|29.00|      29.00|
+--------+----------+---+-----+-----------+



In [29]:
#Window functions (dense_rank, rolling 2-day)
from pyspark.sql.window import Window
w_day = Window.partitionBy("order_date").orderBy(F.desc("revenue"))
ranked = (li.groupBy("order_date","product_id")\
            .agg(F.sum("line_amount").alias("revenue"))\
            .withColumn("rank", F.dense_rank().over(w_day)))
ranked.filter("rank <= 1").orderBy("order_date","rank").show()

+----------+----------+-------+----+
|order_date|product_id|revenue|rank|
+----------+----------+-------+----+
|2025-03-21|        10|  39.98|   1|
|2025-03-21|        12|   7.50|   2|
|2025-03-22|        13|  36.00|   1|
|2025-03-22|        14|  29.00|   2|
|2025-03-22|        10|  19.99|   3|
|2025-03-23|        11|  39.98|   1|
|2025-03-23|        12|  15.00|   2|
|2025-03-24|        14|  29.00|   1|
|2025-03-24|        13|  12.00|   2|
|2025-03-25|        10|  39.98|   1|
|2025-03-25|        11|  19.99|   2|
|2025-03-25|        12|   7.50|   3|
|2025-03-26|        14|  58.00|   1|
|2025-03-26|        13|  12.00|   2|
|2025-03-27|        11|  39.98|   1|
|2025-03-27|        10|  19.99|   2|
|2025-03-27|        12|   7.50|   3|
|2025-03-28|        14|  29.00|   1|
+----------+----------+-------+----+



In [30]:
# Rolling 2-day revenue (requires continuous dates to be meaningful; demo only)
w_roll = Window.orderBy("order_date").rowsBetween(-1, 0)
daily_rev = (li.groupBy("order_date").agg(F.sum("line_amount").alias("rev")))
daily_rev.withColumn("rev_2d", F.sum("rev").over(w_roll)).orderBy("order_date").show()

25/09/22 13:35:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/22 13:35:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/22 13:35:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/22 13:35:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/22 13:35:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/22 13:35:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/22 1

+----------+-----+------+
|order_date|  rev|rev_2d|
+----------+-----+------+
|2025-03-21|47.48| 47.48|
|2025-03-22|84.99|132.47|
|2025-03-23|54.98|139.97|
|2025-03-24|41.00| 95.98|
|2025-03-25|67.47|108.47|
|2025-03-26|70.00|137.47|
|2025-03-27|67.47|137.47|
|2025-03-28|29.00| 96.47|
+----------+-----+------+



25/09/22 13:35:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/22 13:35:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [21]:
help(pyspark.sql.window.Window)
#help(Window)

Help on class Window in module pyspark.sql.window:

class Window(builtins.object)
 |  Utility functions for defining window in DataFrames.
 |  
 |  .. versionadded:: 1.4.0
 |  
 |  .. versionchanged:: 3.4.0
 |      Supports Spark Connect.
 |  
 |  Notes
 |  -----
 |  When ordering is not defined, an unbounded window frame (rowFrame,
 |  unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined,
 |  a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.
 |  
 |  Examples
 |  --------
 |  >>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  >>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
 |  
 |  >>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
 |  >>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
 |  
 |  Static methods defined here:
 |  
 |  orderBy(*cols: Union[ForwardRef('ColumnOrName'), List[Forwa

In [35]:
(li.write.mode("overwrite")
   .partitionBy("order_date")
   .parquet("data/testing-spark-output/line_items"))

(ranked.write.mode("overwrite")
   .parquet("data/testing-spark-output/ranked_products"))



                                                                                

In [36]:
spark.stop()