## This notebook is part of the Apache Spark training delivered by CERN IT


Run this notebook from Jupyter with Python kernel
- When running on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run it locally on the SWAN container (which is the default)
- If running this outside CERN SWAN, please make sure to have PySpark installed: `pip install pyspark`


In order to run this notebook as slides:
 - on SWAN click on the button "Enter/Exit RISE slideshow" in the ribbon
 - on other environments please make sure to have the RISE extension installed `pip install RISE`

### SPARK DataFrame Hands-On Lab
Contact: Luca.Canali@cern.ch

### Objective: Perform Basic DataFrame Operations
1. Creating DataFrames
2. Select columns
3. Add, rename and drop columns
4. Filtering rows
5. Aggregations

## Getting started: create the SparkSession

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.5.tar.gz (317.2 MB)
     ---------------------------------------- 0.0/317.2 MB ? eta -:--:--
     --------------------------------------- 2.4/317.2 MB 19.2 MB/s eta 0:00:17
     - ------------------------------------- 9.4/317.2 MB 28.0 MB/s eta 0:00:12
     -- ----------------------------------- 20.2/317.2 MB 36.4 MB/s eta 0:00:09
     ---- --------------------------------- 36.7/317.2 MB 48.6 MB/s eta 0:00:06
     ------ ------------------------------- 54.8/317.2 MB 56.3 MB/s eta 0:00:05
     -------- ----------------------------- 68.7/317.2 MB 58.4 MB/s eta 0:00:05
     --------- ---------------------------- 78.6/317.2 MB 57.0 MB/s eta 0:00:05
     ---------- --------------------------- 90.7/317.2 MB 57.3 MB/s eta 0:00:04
     ----------- ------------------------- 100.7/317.2 MB 55.9 MB/s eta 0:00:04
     ------------ ------------------------ 109.8/317.2 MB 54.8 MB/s eta 0:00:04
     ------------- ----------------------- 119.5/317.2 MB 53.8


[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: C:\Users\mali23\AppData\Local\Programs\Python\Python39\python.exe -m pip install --upgrade pip


In [1]:
# !pip install pyspark

from pyspark.sql import SparkSession

spark = (SparkSession.builder
          .master("local[*]") \
          .appName("DataFrame HandsOn 1") \
          .config("spark.ui.showConsoleProgress","false") \
          .getOrCreate()
        )

spark

The master `local[*]` means that the executors are in the same node that is running the driver. The `*` tells Spark to start as many executors as there are logical cores available

### Hands-On 1 - Construct a DataFrame from csv file
This demostrates how to read a csv file and construct a DataFrame.  
We will use the online retail dataset from Kaggle, credits: https://www.kaggle.com/datasets/vijayuv/onlineretail


#### First, let's inspect the csv content

In [None]:
#modify below code to use the downloaded dataset

!gzip -cd ../data/online-retail-dataset.csv.gz 2>&1| head -n3

gzip: ../data/online-retail-dataset.csv.gz: No such file or directory


In [2]:
online_retail_schema="InvoiceNo int, StockCode string, Description string, Quantity int,\
InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string"

In [19]:
df = (spark.read
        .option("header", "true")
        .option("timestampFormat", "M/d/yyyy H:m")
        .csv("OnlineRetail.csv",
             schema=online_retail_schema)
     )

#### Inspect the data

In [20]:
df.show(2, False)

+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate        |UnitPrice|CustomerId|Country       |
+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER|6       |2010-12-01 08:26:00|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN               |6       |2010-12-01 08:26:00|3.39     |17850     |United Kingdom|
+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



#### Show columns

In [21]:
df.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Country: string (nullable = true)



### Hands-On 2 - Spark Transformations - select, add, rename and drop columns

Select dataframe columns

In [22]:
# select single column

df.select("Country").show(5)

+--------------+
|       Country|
+--------------+
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
+--------------+
only showing top 5 rows



Select multiple columns


In [23]:
df.select("StockCode","Description","UnitPrice").show(n=5, truncate=False)

+---------+-----------------------------------+---------+
|StockCode|Description                        |UnitPrice|
+---------+-----------------------------------+---------+
|85123A   |WHITE HANGING HEART T-LIGHT HOLDER |2.55     |
|71053    |WHITE METAL LANTERN                |3.39     |
|84406B   |CREAM CUPID HEARTS COAT HANGER     |2.75     |
|84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|3.39     |
|84029E   |RED WOOLLY HOTTIE WHITE HEART.     |3.39     |
+---------+-----------------------------------+---------+
only showing top 5 rows



In [24]:
df.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerId',
 'Country']

In [25]:
# select first 5 columns
df.select(df.columns[0:5]).show(2)

+---------+---------+--------------------+--------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|
+---------+---------+--------------------+--------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|
+---------+---------+--------------------+--------+-------------------+
only showing top 2 rows



In [26]:
# select first 5 rows of all columns
df.select(df.columns[:]).show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [27]:
# selects all the original columns and adds a new column that specifies high value item
(df.selectExpr("*", "(UnitPrice > 100) as HighValueItem").show(5))

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|HighValueItem|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|        false|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|        false|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|        false|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|        false|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|        false|
+---------+---------+---

In [None]:
# selects all the original columns and adds a new column that specifies high value item
(df.selectExpr(
   "*", # all original columns
   "(UnitPrice > 100) as HighValueItem")
   .show(5)
)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|HighValueItem|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|        false|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|        false|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|        false|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|        false|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|        false|
+---------+---------+---

In [28]:
# selects all the original columns and adds a new column that specifies high value item
# sums total quantity available in inventory and total value of inventory
(df.selectExpr("sum(Quantity) as TotalQuantity", "cast(sum(UnitPrice) as int) as InventoryValue").show())

+-------------+--------------+
|TotalQuantity|InventoryValue|
+-------------+--------------+
|      5176450|       2498803|
+-------------+--------------+



#### Adding, renaming and dropping columns

In [29]:
# add a new column called InvoiceValue
from pyspark.sql.functions import expr
df_1 = (df
        .withColumn("InvoiceValue", expr("UnitPrice * Quantity"))
        .select("InvoiceNo","Description","UnitPrice","Quantity","InvoiceValue")
       )
df_1.show(2, False)

# rename InvoiceValue to LineTotal
df_2 = df_1.withColumnRenamed("InvoiceValue","LineTotal")
df_2.show(2, False)

# drop a column
df_2.drop("LineTotal").show(2, False)

+---------+----------------------------------+---------+--------+------------+
|InvoiceNo|Description                       |UnitPrice|Quantity|InvoiceValue|
+---------+----------------------------------+---------+--------+------------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER|2.55     |6       |15.299999   |
|536365   |WHITE METAL LANTERN               |3.39     |6       |20.34       |
+---------+----------------------------------+---------+--------+------------+
only showing top 2 rows

+---------+----------------------------------+---------+--------+---------+
|InvoiceNo|Description                       |UnitPrice|Quantity|LineTotal|
+---------+----------------------------------+---------+--------+---------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER|2.55     |6       |15.299999|
|536365   |WHITE METAL LANTERN               |3.39     |6       |20.34    |
+---------+----------------------------------+---------+--------+---------+
only showing top 2 rows

+---------+---------

### Hands-On 3 - Spark Transformations - filter, sort and cast

In [30]:
from pyspark.sql.functions import col

# select invoice lines with quantity > 50 and unitprice > 20
df.where(col("Quantity") > 20).where(col("UnitPrice") > 50).show(2)
df.filter(df.Quantity > 20).filter(df.UnitPrice > 50).show(2)
df.filter("Quantity > 20 and UnitPrice > 50").show(2)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   556444|    22502|PICNIC BASKET WIC...|      60|2011-06-10 15:28:00|    649.5|     15098|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   556444|    22502|PICNIC BASKET WIC...|      60|2011-06-10 15:28:00|    649.5|     15098|United Kingdom|
+---------+---------+------

In [None]:
# select invoice lines with quantity > 100 or unitprice > 20
df.where((col("Quantity") > 100) | (col("UnitPrice") > 20)).show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536378|    21212|PACK OF 72 RETROS...|     120|2010-12-01 09:37:00|     0.42|     14688|United Kingdom|
|     NULL|        D|            Discount|      -1|2010-12-01 09:41:00|     27.5|     14527|United Kingdom|
|   536387|    79321|       CHILLI LIGHTS|     192|2010-12-01 09:58:00|     3.82|     16029|United Kingdom|
|   536387|    22780|LIGHT GARLAND BUT...|     192|2010-12-01 09:58:00|     3.37|     16029|United Kingdom|
|   536387|    22779|WOODEN OWLS LIGHT...|     192|2010-12-01 09:58:00|     3.37|     16029|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [31]:
from pyspark.sql.functions import desc, asc

# sort in the default order: ascending
df.orderBy(expr("UnitPrice")).show(2)

df.orderBy(col("Quantity").desc(), col("UnitPrice").asc()).show(10)

+---------+---------+---------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|    Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+---------------+--------+-------------------+---------+----------+--------------+
|     NULL|        B|Adjust bad debt|       1|2011-08-12 14:51:00|-11062.06|      NULL|United Kingdom|
|     NULL|        B|Adjust bad debt|       1|2011-08-12 14:52:00|-11062.06|      NULL|United Kingdom|
+---------+---------+---------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   581483|    23843|PAPER CRAFT 

### Hands-On 4 - Spark Transformations - aggregations
full list of built int functions - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions

In [32]:
%%time
# Count distinct customers
from pyspark.sql.functions import countDistinct
df.select(countDistinct("CustomerID")).show()

+--------------------------+
|count(DISTINCT CustomerID)|
+--------------------------+
|                      4372|
+--------------------------+

CPU times: user 19.7 ms, sys: 2.44 ms, total: 22.1 ms
Wall time: 2.71 s


In [33]:
%%time
# approx. distinct stock items
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("CustomerID", 0.1)).show()

+---------------------------------+
|approx_count_distinct(CustomerID)|
+---------------------------------+
|                             4336|
+---------------------------------+

CPU times: user 11.8 ms, sys: 0 ns, total: 11.8 ms
Wall time: 1.51 s


In [34]:
# average, maximum and minimum purchase quantity
from pyspark.sql.functions import avg, max, min
( df.select(
    avg("Quantity").alias("avg_purchases"),
    max("Quantity").alias("max_purchases"),
    min("Quantity").alias("min_purchases"))
   .show()
)

+----------------+-------------+-------------+
|   avg_purchases|max_purchases|min_purchases|
+----------------+-------------+-------------+
|9.55224954743324|        80995|       -80995|
+----------------+-------------+-------------+



### Hands-On 5 - Spark Transformations - grouping and windows

In [35]:
# count of items on the invoice
df.groupBy("InvoiceNo", "CustomerId").count().show(5)

# grouping with expressions
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
  .show(5)

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536573|     17025|    4|
|   537228|     17677|    1|
|   537419|     13495|   14|
|   538093|     12682|   33|
|   538648|     17937|    5|
+---------+----------+-----+
only showing top 5 rows

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536532| 25.36986301369863|  16.850272831671976|
|   537632|               1.0|                 0.0|
|   538708| 10.61111111111111|   7.150282736359209|
|   538877|14.258278145695364|   27.56989037543246|
|   538993| 9.333333333333334|   2.748737083745107|
+---------+------------------+--------------------+
only showing top 5 rows



### Read the csv file into DataFrame

`%%time` is an iPython magic https://ipython.readthedocs.io/en/stable/interactive/magics.html


It's possible to read files without specifying the schema. Some file formats (Parquet is one of them) include the schema, which means that Spark can start reading the file. For format without schema (csv, json...) Spark can infer the schema. Let's see what's the difference in terms of time and of results:

In [36]:
online_retail_schema="InvoiceNo int, StockCode string, Description string, Quantity int,\
InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string"

In [37]:
%%time
df = spark.read \
        .option("header", "true") \
        .option("timestampFormat", "M/d/yyyy H:m")\
        .csv("OnlineRetail.csv",
             schema=online_retail_schema)

CPU times: user 2.84 ms, sys: 972 µs, total: 3.81 ms
Wall time: 36.5 ms


In [38]:
%%time
df_infer = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv("OnlineRetail.csv")

CPU times: user 13.7 ms, sys: 5.79 ms, total: 19.5 ms
Wall time: 3.37 s


## Exercises

Reminder: documentation at
https://spark.apache.org/docs/latest/api/python/index.html

If you didn't run the previous cells, run the following one:

In [39]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("DataFrame HandsOn 1") \
        .config("spark.ui.showConsoleProgress","false") \
        .getOrCreate()

online_retail_schema="InvoiceNo int, StockCode string, Description string, Quantity int,\
InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string"

df = spark.read \
        .option("header", "true") \
        .option("timestampFormat", "M/d/yyyy H:m")\
        .csv("OnlineRetail.csv",
             schema=online_retail_schema)

Task: Show 5 lines of the "description" column

In [40]:
df.select("Description").show(5)

+--------------------+
|         Description|
+--------------------+
|WHITE HANGING HEA...|
| WHITE METAL LANTERN|
|CREAM CUPID HEART...|
|KNITTED UNION FLA...|
|RED WOOLLY HOTTIE...|
+--------------------+
only showing top 5 rows



Task: Count the number of distinct invoices in the dataframe

In [41]:
df.select(countDistinct("InvoiceNo")).show()

+-------------------------+
|count(DISTINCT InvoiceNo)|
+-------------------------+
|                    22061|
+-------------------------+



Task: Find out in which month most invoices have been issued

In [42]:
from pyspark.sql.functions import month, count

df.groupBy(month("InvoiceDate").alias("Month")).agg(count("InvoiceNo").alias("InvoiceCount")) \
               .orderBy("InvoiceCount", ascending=False).show(1)

+-----+------------+
|Month|InvoiceCount|
+-----+------------+
|   11|       83635|
+-----+------------+
only showing top 1 row



Task: Filter the lines where the Quantity is more than 30

In [44]:
df.filter("Quantity > 30").count()         # printed only 20 rows here

25858

In [43]:
df.filter("Quantity > 30").show(20)         # printed only 20 rows here, as a total of 25858 matching rows

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536367|    84879|ASSORTED COLOUR B...|      32|2010-12-01 08:34:00|     1.69|     13047|United Kingdom|
|   536370|    10002|INFLATABLE POLITI...|      48|2010-12-01 08:45:00|     0.85|     12583|        France|
|   536370|    22492|MINI PAINT SET VI...|      36|2010-12-01 08:45:00|     0.65|     12583|        France|
|   536371|    22086|PAPER CHAIN KIT 5...|      80|2010-12-01 09:00:00|     2.55|     13748|United Kingdom|
|   536374|    21258|VICTORIAN SEWING ...|      32|2010-12-01 09:09:00|    10.95|     15100|United Kingdom|
|   536376|    22114|HOT WATER BOTTLE ...|      48|2010-12-01 09:32:00|     3.45|     15291|United Kingdom|
|   536376|    21733|RED HAN

Task: Show the four most sold items (by quantity)

In [50]:
df.orderBy("Quantity", ascending=False).show(4, truncate=False)

+---------+---------+------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                   |Quantity|InvoiceDate        |UnitPrice|CustomerId|Country       |
+---------+---------+------------------------------+--------+-------------------+---------+----------+--------------+
|581483   |23843    |PAPER CRAFT , LITTLE BIRDIE   |80995   |2011-12-09 09:15:00|2.08     |16446     |United Kingdom|
|541431   |23166    |MEDIUM CERAMIC TOP STORAGE JAR|74215   |2011-01-18 10:01:00|1.04     |12346     |United Kingdom|
|578841   |84826    |ASSTD DESIGN 3D PAPER STICKERS|12540   |2011-11-25 15:57:00|0.0      |13256     |United Kingdom|
|542504   |37413    |NULL                          |5568    |2011-01-28 12:03:00|0.0      |NULL      |United Kingdom|
+---------+---------+------------------------------+--------+-------------------+---------+----------+--------------+
only showing top 4 rows



Bonus question: why do these two operations return different results? Hint: look at the documentation

In [53]:
print(df.select("InvoiceNo").distinct().count())
from pyspark.sql.functions import countDistinct
df.select(countDistinct("InvoiceNo")).show()

22062
+-------------------------+
|count(DISTINCT InvoiceNo)|
+-------------------------+
|                    22061|
+-------------------------+



distinct().count() maintains count of null InvoiceNo as well, while countDistinct does not.