# Spark DataFrames API


### Dataframe

- DataFrames are ditributed collections of records, all with pre-defined structure(schema - structure and data types of all columns)
-  DataFrames are built on Spark's core concepts but with structure, optimization and SQL-like operations for data manipulation.
- DataFrames track their schema and provide native support for many common SQL functions and relational operators
- DataFrames are evaluated as DAGs, using lazy evaluation and providing lineage and fault tolerance.
- DataFrames are immutable

### SparkContext vs SparkSession

- SparkSession is Spark application entry point. 
- Introduced in spark 2.0 as a unified entry point for all contexts (formerly instantiated individually as SparkContext, SQLContext, HiveContext, StreamingContext)

<i>Note: In databricks it is automatically created for you as spark</i>

### DataFrame API Optimizations

- **Adaptive Query Execution:** Dynamic plan adjustments during runtime based on actual data characteristics and execution patterns.
- **In-Memory Columnar Storage(Tungsten):** In-Memory coloumnar format for all the DataFrames enabling efficient analytical query performance and reduced memory footprint.
- **Built-in Statistics** - Automatic statistics collection when saving to optimized formats (Parqurt, Delta in databricks) enables smarter query planning and execution.
- **Catalyst Optimizer:** Query optimization engine that coverts DataFrame operations into an optimized execution plan


<i>**Note** Databricks comes with a native vectorized query engine that accelerates query execution using photon engine</i>

**DataFrame Query Planning:** 

- When a DataFrame is evaluated, the driver creates an optimized execution plan through a series of transformations 
- Converts the logical plan into phycal execution that minimizes resource usage and execution time. (Unresolved LP -> analysed LP -> optimized LP -> Physical Plan)



In [1]:
# Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CutomerDFExample").getOrCreate()




Picked up JAVA_TOOL_OPTIONS: -XX:+UseContainerSupport -XX:ActiveProcessorCount=1
Picked up JAVA_TOOL_OPTIONS: -XX:+UseContainerSupport -XX:ActiveProcessorCount=1
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/30 18:13:56 WARN Utils: Your hostname, krishnagopi-trng2224dat-g3q9nc1wf47, resolves to a loopback address: 127.0.0.1; using 10.0.5.2 instead (on interface eth0)
25/06/30 18:13:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/30 18:13:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Creating DataFrames - DataFrameReader
# supports multiple formats such as JSON, CSV, Parquet, ORC, Text or Binary files, existing RDD, and an external db


df_customers = spark.read.csv("file:///workspace/TRNG-2224-data-engineering/week2/datasets/customer_data.csv", header= True, inferSchema=True)

df_customers.show(4)




+--------------------+---------------+--------------------+---+------+--------------------+-----------+-------------------+---------+-----------+
|         customer_id|           name|               email|age|gender|             country|signup_date|         last_login|is_active|total_spent|
+--------------------+---------------+--------------------+---+------+--------------------+-----------+-------------------+---------+-----------+
|20780d38-901f-450...| Michael Malone|    dhart@haynes.com| 58|  Male|    Saint Barthelemy| 2021-04-29|2024-10-20 15:56:26|     true|     3733.6|
|a2c56b05-acdc-4a7...|     Edwin Wall| bradley08@yahoo.com| 33|  Male|United Arab Emirates| 2025-01-02|2025-06-19 22:44:59|     true|    3708.71|
|2fe8ff2e-19ea-493...|  Rachel Strong|heather15@schmidt...| 61| Other|              Israel| 2023-02-13|2025-04-12 21:14:26|     true|    2993.41|
|5fd9f4a6-2134-41b...|Eddie Rodriguez|mitchell49@hotmai...| 20|  Male|             Nigeria| 2024-07-06|2025-03-06 17:09:20| 

### DataFrame Data Types

#### Primitive

**`pyspark.sql.types.DataType`**

- `ByteType`
- `ShortType`
- `IntegerType`
- `LongType`
- `FloatType`
- `DoubleType`
- `BooleanType`
- `StringType`
- `BinaryType`
- `TimestampType`
- `DateType`

#### complex data types

- `ArrayType`
- `MapType`
- `StructType`



In [6]:
# DataFrame Schema

df_customers.schema

StructType([StructField('customer_id', StringType(), True), StructField('name', StringType(), True), StructField('email', StringType(), True), StructField('age', IntegerType(), True), StructField('gender', StringType(), True), StructField('country', StringType(), True), StructField('signup_date', DateType(), True), StructField('last_login', TimestampType(), True), StructField('is_active', BooleanType(), True), StructField('total_spent', DoubleType(), True)])

In [7]:
# custom schema definition

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, BooleanType, TimestampType, DateType

custom_schema = StructType([
    StructField("customer_id", StringType(), True ),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("country", StringType(), True),
    StructField("signup_date", DateType(), True),
    StructField("last_login", TimestampType(), True),
    StructField("is_active", BooleanType(), True),
    StructField("total_spent", DoubleType(), True)
])

df_customers = spark.read.csv("file:///workspace/TRNG-2224-data-engineering/week2/datasets/customer_data.csv", header= True, schema=custom_schema)

df_customers.printSchema()




root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- last_login: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- total_spent: double (nullable = true)



In [10]:
# DDL Schema

ddl_schema = """
    customer_id STRING,
    name STRING,
    email STRING,
    age INT,
    gender STRING,
    country STRING,
    signup_date DATE,
    last_login TIMESTAMP,
    is_active BOOLEAN,
    total_spent DOUBLE
"""

df_customers = spark.read.csv("file:///workspace/TRNG-2224-data-engineering/week2/datasets/customer_data.csv", header= True, schema=ddl_schema)

df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- last_login: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- total_spent: double (nullable = true)



### Common DataFrame API methods

#### Transformations

##### Narrow Transformations

- narrow transformations process data within each partition independetly, without needing to combine data from other partitions.
- faster and more efficient because they avoid data shuffling between partitions. 

1. `select()` : selecting specific rows
2. `filter()`: Applying a filter condition to rows. 
3. `map()`: Applying a function to each row. 
4. `union()`: Combining two DataFrames with identical schemas. 
5. `withColumn()`: Adding a new column based on existing ones. 
6. `drop()`: Removing a column. 

##### Wide Transformations

- Wide transformations require data to be redistributed across partitions, often involving shuffling data based on keys.

1. `groupBy()`: Grouping data based on a column, which often requires shuffling to aggregate data from different partitions. 
2. `join()`: Joining two DataFrames, which requires shuffling data to combine rows based on a join key. 
3. `distinct()`: Removing duplicate rows, which might require shuffling to compare rows across partitions. 

#### Actions

1. `count()`: returns number of rows in a Dataframe
2. `show()`: display DataFrame content
3. `take(n)`: return first n rows from a DataFrame
4. `first()`: return first row from a DataFrame
5. `write()`: save DataFrame to storage

In [13]:
# Map, Shuffle and Reduce

from pyspark.sql.functions import sum


df_customers.filter(df_customers.age > 30) \
                    .select("country", "total_spent")\
                    .groupBy("country") \
                    .agg(sum("total_spent").alias("revenue")).show()


+-----------------+------------------+
|          country|           revenue|
+-----------------+------------------+
|            Macao|            3022.4|
|            Yemen|           1407.35|
|         Kiribati|           1833.03|
|           Guyana|           1149.76|
|           Jersey|           1398.78|
|   Norfolk Island|           2929.03|
|         Djibouti|           4291.06|
|            Tonga|           2167.22|
|           Malawi|           4418.94|
|          Germany|2965.7599999999998|
|           Jordan|           2175.54|
|            Sudan|            800.07|
|           Greece|           2734.41|
|             Togo|           3097.22|
|          Ecuador|           3340.19|
|            Qatar|           5966.42|
|          Lesotho|            1977.1|
|       Madagascar|           3372.53|
|Brunei Darussalam|           4201.99|
|             Peru|           2080.31|
+-----------------+------------------+
only showing top 20 rows


In [16]:
# Select Specific Columns

df_customers.select("name", "email", "country").show(4)

+---------------+--------------------+--------------------+
|           name|               email|             country|
+---------------+--------------------+--------------------+
| Michael Malone|    dhart@haynes.com|    Saint Barthelemy|
|     Edwin Wall| bradley08@yahoo.com|United Arab Emirates|
|  Rachel Strong|heather15@schmidt...|              Israel|
|Eddie Rodriguez|mitchell49@hotmai...|             Nigeria|
+---------------+--------------------+--------------------+
only showing top 4 rows


In [19]:
# Filter Active Customers Over 30

df_customers.filter((df_customers.age>30) & (df_customers.is_active == True)).show()

+--------------------+-------------------+--------------------+---+------+--------------------+-----------+-------------------+---------+-----------+
|         customer_id|               name|               email|age|gender|             country|signup_date|         last_login|is_active|total_spent|
+--------------------+-------------------+--------------------+---+------+--------------------+-----------+-------------------+---------+-----------+
|20780d38-901f-450...|     Michael Malone|    dhart@haynes.com| 58|  Male|    Saint Barthelemy| 2021-04-29|2024-10-20 15:56:26|     true|     3733.6|
|a2c56b05-acdc-4a7...|         Edwin Wall| bradley08@yahoo.com| 33|  Male|United Arab Emirates| 2025-01-02|2025-06-19 22:44:59|     true|    3708.71|
|2fe8ff2e-19ea-493...|      Rachel Strong|heather15@schmidt...| 61| Other|              Israel| 2023-02-13|2025-04-12 21:14:26|     true|    2993.41|
|b290eed5-e70c-48d...|       Kayla Powell|johnnash@hotmail.com| 50| Other|        Burkina Faso| 2021

In [20]:
# Group by Country and Get Average Spend

df_customers.groupBy("country").avg("total_spent").show()

+-----------------+------------------+
|          country|  avg(total_spent)|
+-----------------+------------------+
|            Macao|            3022.4|
|            Yemen|           1407.35|
|         Kiribati|           1833.03|
|           Guyana|           1149.76|
|           Jersey|           1398.78|
|   Norfolk Island|           2929.03|
|         Djibouti|           4291.06|
|            Tonga|           2167.22|
|           Malawi|           4418.94|
|          Germany|1482.8799999999999|
|           Jordan|           2175.54|
|     Saint Helena|           4639.94|
|            Sudan|            800.07|
|           Greece|           2734.41|
|             Togo|           3097.22|
|Equatorial Guinea|           4459.59|
|          Ecuador|           3340.19|
|            Qatar|           2983.21|
|          Lesotho|            988.55|
|       Madagascar|           3372.53|
+-----------------+------------------+
only showing top 20 rows


In [21]:
# Add a New Column for Spend Category

from pyspark.sql.functions import when

df_customer_with_catgory = df_customers.withColumn(
    "spend_category",
    when(df_customers.total_spent >3000, "High")
    .when(df_customers.total_spent> 1000, "Medium")
    .otherwise("Low")
)

df_customer_with_catgory.select("name", "email","total_spent" ,"spend_category").show()

+----------------+--------------------+-----------+--------------+
|            name|               email|total_spent|spend_category|
+----------------+--------------------+-----------+--------------+
|  Michael Malone|    dhart@haynes.com|     3733.6|          High|
|      Edwin Wall| bradley08@yahoo.com|    3708.71|          High|
|   Rachel Strong|heather15@schmidt...|    2993.41|        Medium|
| Eddie Rodriguez|mitchell49@hotmai...|    1171.33|        Medium|
|    Kayla Powell|johnnash@hotmail.com|     2850.1|        Medium|
| Kathleen Nelson|     qpena@gmail.com|    1180.16|        Medium|
| Nicolas Kennedy|catherineblack@wa...|    3818.16|          High|
| Jacqueline Reid|smithjoshua@baker...|    1767.23|        Medium|
|  Matthew Mendez|jeremymontoya@har...|     600.83|           Low|
|      Jay Little|brogers@wright-wo...|    4252.72|          High|
|       Amber Ray|   alan33@taylor.net|    3675.69|          High|
| Elizabeth Ellis|jjohnson@smith-th...|     486.12|           

In [22]:
# DataFrameWriter - flexible output formats and partitioning. supports various save modes (overwrite, append)

df_customer_with_catgory.write.mode("overwrite").parquet("cutomer_oputput.parquet")

                                                                                

#### Handiling missing values

**common functions**

- `isNull()`/`isNotNull()` - checks if values are null
- `count(col)` - counts non null values in a specific column
- `df.fillna()`/`df.na.fill()` - replace nulls with values
- `df.dropna()`/`df.na.drop()` - remove rows with nulls

#### referencing columns

- direct - `df.select("col_name")` - basic column selection

- by attribute - `df.select(df.attribute)` - column names that are valid python idenfiers, can be referenced across DataFrames(e.g., join)

- column expression - `df.select(df["col_name"])` - any column names, can be referenced across DataFrames

- column object - `df.select(col("name"))` - required when building complex expressions or using column specific operations like `cast()`, `alias()`, `asc()`or `desc()`


#### common column object methods

- `alias()` - rename column
- `cast()` - chnage data type
- `isNull()` or `isNotNull()` - check for nulls
- `contains()` - string matching
- `asc()`/`desc()` - sort direction - `df.sort(col("c1").asc())`


[pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.html)

#### common built-in functions

- `concat(col1, col2)` - concatenate strings
- `date_format(col, fmt)` - fromat date strings
- `round(col, scale)` - round number to scale
- `regexp_replace(col, pattern, replace)` - replace using regex
- `coalesce(col1, col2)` - first non null value


[pyspark built in functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

#### user defined functions

- allows to use python functions on dataframe columns
- helpful to create custom reusable functions
- can impact performance as they can not be optimized by the Catalyst optimizer and has serialiation overhead.

```py
@udf("data_type")
def function_name(name):
    return value

df.select(function_name("name"))
```

<i><b>note:</b> pandas udfs are efficient becuase they operate on batches of raows instead of single rows, they leverage Apache Arrow for more efficient python-JVM serialization </i>

### Aggregate functions

can be applied in `agg()` method after `groupBy()` operation or directly within `select()`

- `sum()`
- `avg()`
- `min()`
- `max()`
- `count()`
- `first()`
- `last()`

### Spark SQL

- Spark SQL is a module in spark that allows you to run SQL queries on structured and semi-structured data.
- It provides a SQL-like interface on top of Spark's powerful DataFrame API, enabling both SQL and programmatic access to data, all with the same optimized execution engine.

- `createOrReplaceTempView()` : created a temp in-memory view that you can query with SQL.
- `createGlobalTempView()` : Creates a global temporary view accessible across sessions (under global_temp database).

In [1]:
# Setup

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("CustomerDataCleaning").getOrCreate()



Picked up JAVA_TOOL_OPTIONS: -XX:+UseContainerSupport -XX:ActiveProcessorCount=1
Picked up JAVA_TOOL_OPTIONS: -XX:+UseContainerSupport -XX:ActiveProcessorCount=1
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/01 18:21:18 WARN Utils: Your hostname, krishnagopi-trng2224dat-g3q9nc1wf47, resolves to a loopback address: 127.0.0.1; using 10.0.5.2 instead (on interface eth0)
25/07/01 18:21:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/01 18:21:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
custom_schema = StructType([
    StructField("customer_id", StringType(), True ),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("age", DoubleType(), True),
    StructField("gender", StringType(), True),
    StructField("country", StringType(), True),
    StructField("signup_date", DateType(), True),
    StructField("last_login", TimestampType(), True),
    StructField("is_active", BooleanType(), True),
    StructField("total_spent", DoubleType(), True),
    StructField("preferences", StringType(), True)
])

df_customer = spark.read.csv("file:///workspace/TRNG-2224-data-engineering/week2/datasets/enriched_customer_data.csv", header=True, escape='"', schema=custom_schema )

df_customer.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- last_login: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- preferences: string (nullable = true)



In [4]:
df_customer.select("customer_id", "preferences").show(truncate=False)

+------------------------------------+-----------------------------------------------------------------+
|customer_id                         |preferences                                                      |
+------------------------------------+-----------------------------------------------------------------+
|0e99a07c-c7a5-43df-b5f3-79e2a9f18fc6|{"newsletter": true, "notifications": "push", "language": "en"}  |
|3a69ac3e-6726-431c-82a7-5241ef568188|{"newsletter": true, "notifications": "push", "language": "fr"}  |
|c63cab5f-dc06-4842-926a-118f85a2e7ae|{"newsletter": true, "notifications": "email", "language": "de"} |
|50b165d0-6486-4d5b-bcb8-12147c6f8160|{"newsletter": true, "notifications": "sms", "language": "es"}   |
|4657a2b1-abae-49ab-95d4-d21b878eb69e|{"newsletter": false, "notifications": "email", "language": "es"}|
|0ffe272a-f261-4503-9aa2-33cac8959e0a|{"newsletter": true, "notifications": "sms", "language": "en"}   |
|ca9191a8-f736-46c8-b727-cfaa320a1c89|{"newsletter": fa

#### Data Cleaning

In [None]:
# drop rows where email is null

df_clean = df_customer.na.drop(subset="email")
# df_clean.filter(col("email").isNull() == True).show()

+-----------+----+-----+---+------+-------+-----------+----------+---------+-----------+-----------+
|customer_id|name|email|age|gender|country|signup_date|last_login|is_active|total_spent|preferences|
+-----------+----+-----+---+------+-------+-----------+----------+---------+-----------+-----------+
+-----------+----+-----+---+------+-------+-----------+----------+---------+-----------+-----------+



In [11]:

df_clean = df_clean.na.fill({"gender": "Unknown"})
df_clean.show()

+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|         customer_id|          name|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|         preferences|
+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|robinjackson@wrig...|NULL| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|{"newsletter": tr...|
|3a69ac3e-6726-431...|Kimberly Blake|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|{"newsletter": tr...|
|c63cab5f-dc06-484...|William Taylor|leahwilliams@gmai...|NULL| Female|               Kenya| 2023-11-16|2024-09-05 04:59:24|    false|    5913.19|{"newsletter":

In [13]:
# filling missing age with median age

median_age = df_clean.approxQuantile("age", [0.5], 0.01)[0]

df_clean = df_clean.na.fill({"age": median_age})

df_clean.show()



+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|         customer_id|          name|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|         preferences|
+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|robinjackson@wrig...|50.0| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|{"newsletter": tr...|
|3a69ac3e-6726-431...|Kimberly Blake|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|{"newsletter": tr...|
|c63cab5f-dc06-484...|William Taylor|leahwilliams@gmai...|50.0| Female|               Kenya| 2023-11-16|2024-09-05 04:59:24|    false|    5913.19|{"newsletter":

In [15]:
# fill out negative and zero total_spent

df_clean = df_clean.filter((col("total_spent").isNotNull()) & (col("total_spent") >=0))
df_clean.show()


+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|         customer_id|          name|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|         preferences|
+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|robinjackson@wrig...|50.0| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|{"newsletter": tr...|
|3a69ac3e-6726-431...|Kimberly Blake|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|{"newsletter": tr...|
|c63cab5f-dc06-484...|William Taylor|leahwilliams@gmai...|50.0| Female|               Kenya| 2023-11-16|2024-09-05 04:59:24|    false|    5913.19|{"newsletter":

#### Enrichment

In [16]:
# add age group

df_enriched = df_clean.withColumn("age_group",
    when(col("age")< 30, "Young")
    .when((col("age") >=30) & (col("age") <= 80), "Adult")
    .otherwise("Senior")
)

df_enriched.show()

+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+---------+
|         customer_id|          name|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|         preferences|age_group|
+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+---------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|robinjackson@wrig...|50.0| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|{"newsletter": tr...|    Adult|
|3a69ac3e-6726-431...|Kimberly Blake|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|{"newsletter": tr...|    Young|
|c63cab5f-dc06-484...|William Taylor|leahwilliams@gmai...|50.0| Female|               Kenya| 2023-11-16|2024-0

In [None]:
# parse preferences JSOn cloumn into struct

preferences_schema = StructType([
    StructField("newsletter", BooleanType(), True),
    StructField("notifications", StringType(), True),
    StructField("language", StringType(), True)
])

df_parsed = df_enriched.withColumn(
    "preferences_struct",
    from_json(col("preferences"), preferences_schema)
)

# flatten struct fields into columns

df_final = df_parsed \
    .withColumn("pref_newsletter", col("preferences_struct.newsletter")) \
    .withColumn("pref_notifications", col("preferences_struct.notifications")) \
    .withColumn("pref_language", col("preferences_struct.language")) \
        .drop("preferences", "preferences_struct")

df_final.select("customer_id","pref_newsletter", "pref_notifications", "pref_language" ).show(9)

+--------------------+---------------+------------------+-------------+
|         customer_id|pref_newsletter|pref_notifications|pref_language|
+--------------------+---------------+------------------+-------------+
|0e99a07c-c7a5-43d...|           true|              push|           en|
|3a69ac3e-6726-431...|           true|              push|           fr|
|c63cab5f-dc06-484...|           true|             email|           de|
|50b165d0-6486-4d5...|           true|               sms|           es|
|4657a2b1-abae-49a...|          false|             email|           es|
|0ffe272a-f261-450...|           true|               sms|           en|
|ca9191a8-f736-46c...|          false|             email|           es|
|808301e6-260a-47a...|           true|             email|           de|
|997d2ea4-5957-43d...|           true|               sms|           de|
+--------------------+---------------+------------------+-------------+
only showing top 9 rows


In [28]:
# custom udf to split name to first_name and last_name

return_type = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True)
]
)

@udf(returnType=return_type)
def split_full_name(full_name: str) -> dict:
    if full_name is None:
        return None

    parts = full_name.split(" ",1)
    first_name = parts[0]
    last_name = None
    if(len(parts)>1):
        last_name= parts[1]

    return {"first_name": first_name, "last_name": last_name}


df_with_split_name = df_final.withColumn(
    "name_parts", split_full_name(col("name"))

)

df_with_split_name.show(truncate=False)





+------------------------------------+--------------+-------------------------------+----+-------+--------------------------------+-----------+-------------------+---------+-----------+---------+---------------+------------------+-------------+-----------------+
|customer_id                         |name          |email                          |age |gender |country                         |signup_date|last_login         |is_active|total_spent|age_group|pref_newsletter|pref_notifications|pref_language|name_parts       |
+------------------------------------+--------------+-------------------------------+----+-------+--------------------------------+-----------+-------------------+---------+-----------+---------+---------------+------------------+-------------+-----------------+
|0e99a07c-c7a5-43df-b5f3-79e2a9f18fc6|Thomas Lamb   |robinjackson@wright.com        |50.0|Female |France                          |2023-03-01 |2025-05-29 22:36:25|true     |1438.4     |Adult    |true           |

In [29]:
df_with_split_name.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: double (nullable = false)
 |-- gender: string (nullable = false)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- last_login: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- age_group: string (nullable = false)
 |-- pref_newsletter: boolean (nullable = true)
 |-- pref_notifications: string (nullable = true)
 |-- pref_language: string (nullable = true)
 |-- name_parts: struct (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)



In [30]:
df_with_split_name.withColumn("first_name", col("name_parts.first_name")) \
     .withColumn("last_name", col("name_parts.last_name")) \
     .drop("name_parts", "name").show()

+--------------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+---------+---------------+------------------+-------------+----------+---------+
|         customer_id|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|age_group|pref_newsletter|pref_notifications|pref_language|first_name|last_name|
+--------------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+---------+---------------+------------------+-------------+----------+---------+
|0e99a07c-c7a5-43d...|robinjackson@wrig...|50.0| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|    Adult|           true|              push|           en|    Thomas|     Lamb|
|3a69ac3e-6726-431...|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|    Young|           

In [31]:
df_with_split_name.show(9)

+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+---------+---------------+------------------+-------------+-----------------+
|         customer_id|          name|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|age_group|pref_newsletter|pref_notifications|pref_language|       name_parts|
+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+---------+---------------+------------------+-------------+-----------------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|robinjackson@wrig...|50.0| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|    Adult|           true|              push|           en|   {Thomas, Lamb}|
|3a69ac3e-6726-431...|Kimberly Blake|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|

#### Analysis

In [32]:
# total and average revenue by country

df_with_split_name.groupBy("country").agg(
    count("*").alias("customers"),
    sum("total_spent").alias("total_revenue"),
    round(avg("total_spent"), 2).alias("avg_revanue")
).orderBy(col("total_revenue").desc()).show()

+--------------------+---------+------------------+-----------+
|             country|customers|     total_revenue|avg_revanue|
+--------------------+---------+------------------+-----------+
|      United Kingdom|        3|          14057.16|    4685.72|
|             Jamaica|        2|          14054.52|    7027.26|
|Saint Vincent and...|        2|          13525.15|    6762.58|
|           Lithuania|        2|           12224.8|     6112.4|
|             Tunisia|        3|          11639.62|    3879.87|
|              Guinea|        2|          11605.49|    5802.75|
|          Bangladesh|        2|           11274.3|    5637.15|
| Trinidad and Tobago|        2|          10533.08|    5266.54|
|            Anguilla|        2|10513.830000000002|    5256.92|
|              Zambia|        1|           9995.75|    9995.75|
|            Mongolia|        1|           9763.25|    9763.25|
|              Taiwan|        1|            9732.5|     9732.5|
|        Sierra Leone|        1|        

In [33]:
# count of customers by age_group

df_with_split_name.groupby("age_group").count().show()

+---------+-----+
|age_group|count|
+---------+-----+
|    Adult|   81|
|    Young|    8|
+---------+-----+



In [34]:
# most common notification type

df_with_split_name.groupBy("pref_notifications").count().orderBy("count", ascending=False).show()

+------------------+-----+
|pref_notifications|count|
+------------------+-----+
|               sms|   35|
|              push|   28|
|             email|   26|
+------------------+-----+



In [35]:
# save to parquet

df_with_split_name.write.mode("overwrite").parquet("final_customers.parquet")

                                                                                

### Compare Spark SQL and DataFrame API

In [36]:
# register view

df_with_split_name.createOrReplaceTempView("customers")




In [41]:
sql_plan = spark.sql("""
    SELECT country, COUNT(*) AS total, ROUND(AVG(total_spent), 2) AS avg_spent
    FROM customers
    GROUP BY country
""").explain(True)

sql_plan

== Parsed Logical Plan ==
'Aggregate ['country], ['country, 'COUNT(1) AS total#960, 'ROUND('AVG('total_spent), 2) AS avg_spent#961]
+- 'UnresolvedRelation [customers], [], false

== Analyzed Logical Plan ==
country: string, total: bigint, avg_spent: double
Aggregate [country#16], [country#16, count(1) AS total#960L, round(avg(total_spent#20), 2) AS avg_spent#961]
+- SubqueryAlias customers
   +- View (`customers`, [customer_id#11, name#12, email#13, age#288, gender#228, country#16, signup_date#17, last_login#18, is_active#19, total_spent#20, age_group#424, pref_newsletter#474, pref_notifications#476, pref_language#478, name_parts#623])
      +- Project [customer_id#11, name#12, email#13, age#288, gender#228, country#16, signup_date#17, last_login#18, is_active#19, total_spent#20, age_group#424, pref_newsletter#474, pref_notifications#476, pref_language#478, split_full_name(name#12)#622 AS name_parts#623]
         +- Project [customer_id#11, name#12, email#13, age#288, gender#228, count

In [42]:
# DataFrame equivalent

df_plan = df_with_split_name.groupBy("country").agg(
    count("*").alias("total"),
    round(avg("total_spent"), 2).alias("avg_spent")
).explain(True)

df_plan

== Parsed Logical Plan ==
'Aggregate ['country], ['country, 'count(*) AS total#970, 'round('avg('total_spent), 2) AS avg_spent#971]
+- Project [customer_id#11, name#12, email#13, age#288, gender#228, country#16, signup_date#17, last_login#18, is_active#19, total_spent#20, age_group#424, pref_newsletter#474, pref_notifications#476, pref_language#478, split_full_name(name#12)#622 AS name_parts#623]
   +- Project [customer_id#11, name#12, email#13, age#288, gender#228, country#16, signup_date#17, last_login#18, is_active#19, total_spent#20, age_group#424, pref_newsletter#474, pref_notifications#476, pref_language#478]
      +- Project [customer_id#11, name#12, email#13, age#288, gender#228, country#16, signup_date#17, last_login#18, is_active#19, total_spent#20, preferences#21, age_group#424, preferences_struct#473, pref_newsletter#474, pref_notifications#476, preferences_struct#473.language AS pref_language#478]
         +- Project [customer_id#11, name#12, email#13, age#288, gender#228,

In [43]:
sql_plan == df_plan

True