In [135]:
 from pyspark.sql import SparkSession 

# Create SparkSession

In [136]:
# Create SparkSession 
spark = ( 
    SparkSession.builder 
.master("local[*]") 
.appName("Lab0-OnlineRetail-Warmup") 
.config("spark.ui.showConsoleProgress", "false") 
.getOrCreate() 
) 
print("Spark version:", spark.version) 
print("Master:", spark.sparkContext.master)

Spark version: 3.5.1
Master: local[*]


In [137]:
# Inspect the compressed file (first 3 lines) 
!gzip -cd OnlineRetail.csv.zip | head -n 3 
# Unzip the CSV file 
!unzip -o OnlineRetail.csv.zip

gzip: OnlineRetail.csv.zip.gz: No such file or directory
unzip:  cannot find or open OnlineRetail.csv.zip, OnlineRetail.csv.zip.zip or OnlineRetail.csv.zip.ZIP.


# Define explicit schema and load DataFrame

In [138]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType

# Define schema for OnlineRetail 
online_retail_schema = StructType([
    StructField("InvoiceNo", IntegerType(), True),
    StructField("StockCode", StringType(), True), 
    StructField("Description", StringType(), True), 
    StructField("Quantity", IntegerType(), True), 
    StructField("InvoiceDate", TimestampType(), True), 
    StructField("UnitPrice", FloatType(), True), 
    StructField("CustomerId", IntegerType(), True), 
    StructField("Country", StringType(), True), 
]) 

df = (
    spark.read
    .option("header", "true")
    .option("timestampFormat", "M/d/yyyy H:m")
    .schema(online_retail_schema)
    .csv("/kaggle/input/onlineretail/OnlineRetail.csv")
)

print("Rows:", df.count())
df.printSchema()

Rows: 541909
root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Country: string (nullable = true)



#  Basic inspection

In [139]:
 # Show a few rows 
df.show(5, truncate=False) 
# Display columns 
print("Columns:", df.columns) 
# Quick statistical summary on numeric fields 
df.describe(["Quantity", "UnitPrice"]).show() 

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerId|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |17850     |United Kingdom|
+---------+-----

# Select columns

In [140]:
from pyspark.sql.functions import col, expr
# Single column
df.select("Country").show(5, truncate=False)
# Multiple columns
df.select("StockCode", "Description", "UnitPrice").show(5, truncate=False)
# First 5 columns using slicing on df.columns
df.select(df.columns[0:5]).show(5, truncate=False)

+--------------+
|Country       |
+--------------+
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
+--------------+
only showing top 5 rows

+---------+-----------------------------------+---------+
|StockCode|Description                        |UnitPrice|
+---------+-----------------------------------+---------+
|85123A   |WHITE HANGING HEART T-LIGHT HOLDER |2.55     |
|71053    |WHITE METAL LANTERN                |3.39     |
|84406B   |CREAM CUPID HEARTS COAT HANGER     |2.75     |
|84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|3.39     |
|84029E   |RED WOOLLY HOTTIE WHITE HEART.     |3.39     |
+---------+-----------------------------------+---------+
only showing top 5 rows

+---------+---------+-----------------------------------+--------+-------------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |
+---------+---------+-----------------------------------+--------+-------------------+
|536365   |851

# Add computed column with selectExpr

In [141]:
from pyspark.sql.functions import sum as spark_sum
# Add a simple indicator column using an expression
df_flagged = df.selectExpr(
"*",
"UnitPrice > 100 as HighValueItem"
)
df_flagged.select("InvoiceNo", "Description", "UnitPrice", "HighValueItem")
# Compute some global aggregates in a single selectExpr
df_agg = df.selectExpr(
"sum(Quantity) as total_quantity",
"sum(UnitPrice * Quantity) as approx_revenue"
)
df_agg.show()

+--------------+----------------+
|total_quantity|  approx_revenue|
+--------------+----------------+
|       5176450|9747747.90762934|
+--------------+----------------+



 #  Add and rename a column 

In [142]:
from pyspark.sql.functions import col, expr
# Add InvoiceValue column, then rename to LineTotal
df_with_value = df.withColumn("InvoiceValue", col("UnitPrice") * col("Quantity"))
df_with_value.select(
"InvoiceNo", "Description", "UnitPrice", "Quantity", "InvoiceValue").show(5, truncate=False)
df_line_total = df_with_value.withColumnRenamed("InvoiceValue", "LineTotal")
df_line_total.select(
"InvoiceNo", "Description", "UnitPrice", "Quantity", "LineTotal").show(5, truncate=False)

+---------+-----------------------------------+---------+--------+------------+
|InvoiceNo|Description                        |UnitPrice|Quantity|InvoiceValue|
+---------+-----------------------------------+---------+--------+------------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER |2.55     |6       |15.299999   |
|536365   |WHITE METAL LANTERN                |3.39     |6       |20.34       |
|536365   |CREAM CUPID HEARTS COAT HANGER     |2.75     |8       |22.0        |
|536365   |KNITTED UNION FLAG HOT WATER BOTTLE|3.39     |6       |20.34       |
|536365   |RED WOOLLY HOTTIE WHITE HEART.     |3.39     |6       |20.34       |
+---------+-----------------------------------+---------+--------+------------+
only showing top 5 rows

+---------+-----------------------------------+---------+--------+---------+
|InvoiceNo|Description                        |UnitPrice|Quantity|LineTotal|
+---------+-----------------------------------+---------+--------+---------+
|536365   |WHITE HANGING

#  Drop columns

In [143]:
# Drop a couple of columns (for example CustomerId and StockCode)
df_reduced = df.drop("CustomerId", "StockCode")
print("Original number of columns:", len(df.columns))
print("After drop:", len(df_reduced.columns))
df_reduced.show(5, truncate=False)

Original number of columns: 8
After drop: 6
+---------+-----------------------------------+--------+-------------------+---------+--------------+
|InvoiceNo|Description                        |Quantity|InvoiceDate        |UnitPrice|Country       |
+---------+-----------------------------------+--------+-------------------+---------+--------------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |United Kingdom|
|536365   |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |United Kingdom|
|536365   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |United Kingdom|
|536365   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |United Kingdom|
|536365   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |United Kingdom|
+---------+-----------------------------------+--------+-------------------+---------+--------------+
only showing top 5 rows



# GroupBy aggregations

In [144]:
 from pyspark.sql.functions import avg, stddev_pop 
# Average quantity per country 
avg_qty_per_country = ( 
    df.groupBy("Country")
 .agg(avg("Quantity").alias("avg_quantity")) 
) 
avg_qty_per_country.show(10, truncate=False) 
# Summary statistics per invoice 
invoice_stats = ( 
    df.groupBy("InvoiceNo") 
.agg( 
          avg("Quantity").alias("avg_quantity"), 
          stddev_pop("Quantity").alias("std_quantity") 
) 
) 
invoice_stats.show(5, truncate=False)

+---------+------------------+
|Country  |avg_quantity      |
+---------+------------------+
|Sweden   |77.13636363636364 |
|Singapore|22.85589519650655 |
|Germany  |12.369457609268036|
|France   |12.91106696272058 |
|Greece   |10.657534246575343|
|Belgium  |11.18994683421943 |
|Finland  |15.346762589928058|
|Italy    |9.961394769613948 |
|EIRE     |17.40324548560273 |
|Lithuania|18.62857142857143 |
+---------+------------------+
only showing top 10 rows

+---------+------------------+------------------+
|InvoiceNo|avg_quantity      |std_quantity      |
+---------+------------------+------------------+
|536532   |25.36986301369863 |16.850272831671976|
|537632   |1.0               |0.0               |
|538708   |10.61111111111111 |7.150282736359209 |
|538877   |14.258278145695364|27.56989037543246 |
|538993   |9.333333333333334 |2.748737083745107 |
+---------+------------------+------------------+
only showing top 5 rows



In [145]:
from pyspark.sql.functions import avg, stddev_pop 
# Average quantity per country 
avg_qty_per_country = ( 
    df.groupBy("Country")
 .agg(avg("Quantity").alias("avg_quantity")) 
) 
avg_qty_per_country.show(10, truncate=False) 
# Summary statistics per invoice 
invoice_stats = ( 
    df.groupBy("InvoiceNo") 
.agg( 
          avg("Quantity").alias("avg_quantity"), 
          stddev_pop("Quantity").alias("std_quantity") 
) 
) 
invoice_stats.show(5, truncate=False) 


+---------+------------------+
|Country  |avg_quantity      |
+---------+------------------+
|Sweden   |77.13636363636364 |
|Singapore|22.85589519650655 |
|Germany  |12.369457609268036|
|France   |12.91106696272058 |
|Greece   |10.657534246575343|
|Belgium  |11.18994683421943 |
|Finland  |15.346762589928058|
|Italy    |9.961394769613948 |
|EIRE     |17.40324548560273 |
|Lithuania|18.62857142857143 |
+---------+------------------+
only showing top 10 rows

+---------+------------------+------------------+
|InvoiceNo|avg_quantity      |std_quantity      |
+---------+------------------+------------------+
|536532   |25.36986301369863 |16.850272831671976|
|537632   |1.0               |0.0               |
|538708   |10.61111111111111 |7.150282736359209 |
|538877   |14.258278145695364|27.56989037543246 |
|538993   |9.333333333333334 |2.748737083745107 |
+---------+------------------+------------------+
only showing top 5 rows



# # Schema inference vs explicit schema timing

In [146]:
 # Explicit schema timing 
 
df_schema = ( 
    spark.read 
.option("header", "true") 
.option("timestampFormat", "M/d/yyyy H:m") 
.schema(online_retail_schema) 
.csv("/kaggle/input/onlineretail/OnlineRetail.csv") 
) 
# Inferred schema timing 
df_infer = ( 
    spark.read 
.option("header", "true") 
.option("inferSchema", "true") 
.option("timestampFormat", "M/d/yyyy H:m") 
.csv("/kaggle/input/onlineretail/OnlineRetail.csv") 
) 
print("Schema with explicit schema:") 
df_schema.printSchema() 
print("\nSchema with inferSchema:") 
df_infer.printSchema() 

Schema with explicit schema:
root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Country: string (nullable = true)


Schema with inferSchema:
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

