# Support PySpark 

Ce notebook a pour objectif de faciliter la compréhension de l'architecture de Spark et l'utilisation des fonctionnalités de l'API PySpark dans un environnement Databricks. 

Il se divise en trois sections principales :
- Concepts de base de Spark : introduction des concepts fondamentaux en expliquant SparkContext, SparkSession ainsi que les Resilient Distributed Datasets (RDD)
- Manipulation des données avec Spark DataFrame : lecture et collecte de fichiers (.txt, .csv, .json) en DataFrame Spark et présentation des principales transformations et actions réalisables.
- Utilisation de Spark SQL 

Je tiens à remercier la chaîne coder2j pour ses explications et ses conseils pratiques qui enrichissent ce notebook.

# SparkContext vs SparkSession

SparkContext : ancien point d’entrée pour interagir avec Apache Spark. Il permet de se connecter au cluster Spark et de traiter des données. Ce point d’entrée était utilisé avant Spark 2.0.

SparkSession : nouveau point d’entrée principal à partir de Spark 2.0. Il regroupe les fonctionnalités de SparkContext tout en ajoutant celles de SQL, DataFrame et streaming, rendant l’utilisation de Spark plus simple et plus complète.

# Create SparkSession in Apache Spark

In [0]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark-Session") \
    .getOrCreate()

# RDDs (Resilient Distributed Datasets)

Caractéristiques :
- Immutabilité : les transformations créent de nouveaux RDD 
- Distribution : données partitionnées et traitées en parallèle
- Résilience : reconstruction automatique des RDD en cas de panne
- Évaluation paresseuse : les transformations des RDD sont exécutées uniquement lorsqu’une action est appelée

Transformations : créent de nouveaux RDD à partir d’un RDD existant et restent en attente jusqu'à l’appel d’une action.

Ex : map, filter, flatMap, reduceByKey, sortBy, join


Actions : lancent le calcul des transformations et produisent un résultat final.

Ex : collect, count, first, take, save, foreach


In [0]:
# Create a SparkSession
spark = SparkSession.builder.appName("RDD").getOrCreate()

## How to create RDDs

In [0]:
# Create an RDD from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

# Collect action: Retrieve all elements of the RDD
print("All elements of the rdd: ", rdd.collect())

All elements of the rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


## RDDs Operation: actions

In [0]:
# Count action: Count the number of elements in the RDD
count = rdd.count()
print("The total number of elements in rdd: ", count)

The total number of elements in rdd:  4


In [0]:
# First action: retrieve the first element of the RDD
first_element = rdd.first()
print("The first element of the rdd: ", first_element)

The first element of the rdd:  ('Alice', 25)


In [0]:
# Take action: retrieve the n elements of the RDD
taken_elements = rdd.take(2)
print("The first two elements of the rdd: ", taken_elements)

The first two elements of the rdd:  [('Alice', 25), ('Bob', 30)]


## RDDs Operation: transformation

In [0]:
# Map transformation: convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))

result = mapped_rdd.collect()
print("rdd with uppercease name: ", result)

rdd with uppercease name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


In [0]:
# Filter transformation: filter records where age is greater than 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
filtered_rdd.collect()

Out[82]: [('Charlie', 35), ('Alice', 40)]

In [0]:
# ReduceByKey transformation: calculate the total age for each name
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

Out[83]: [('Charlie', 35), ('Bob', 30), ('Alice', 65)]

In [0]:
# SortBy transformation: sort the RDD by age in descending order
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

Out[84]: [('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

# DataFrame imports

In [0]:
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrame-Import").getOrCreate()

## Read CSV with header

In [0]:
# Read CSV file into DataFrame
csv_file_path = "/FileStore/tables/products.csv"
df = spark.read.csv(csv_file_path, header=True)

# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price: string (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



## Read CSV with an explicit schema definition

In [0]:
# Import necessary types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [0]:
# Import necessary types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define the schema
schema = StructType([
    StructField(name="id", dataType=IntegerType(), nullable=True),
    StructField(name="name", dataType=StringType(), nullable=True),
    StructField(name="category", dataType=StringType(), nullable=True),
    StructField(name="quantity", dataType=IntegerType(), nullable=True),
    StructField(name="price", dataType=DoubleType(), nullable=True)
])

# Read CSV file into DataFrame with schema definition
csv_file_path = "/FileStore/tables/products.csv"
df = spark.read.csv(csv_file_path, header=True, schema=schema)

# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



## Read CSV with inferSchema

In [0]:
# Read CSV file into DataFrame with inferSchema
csv_file_path = "/FileStore/tables/products.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



## Read JSON file into DataFrame

### Single Line JSON

In [0]:
%fs head /FileStore/tables/products_singleline.json

In [0]:
# Read single line JSON
json_file_path = "/FileStore/tables/products_singleline.json"
df = spark.read.json(json_file_path)

# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



### Multi-lines JSON

In [0]:
%fs head /FileStore/tables/products_multiline.json

In [0]:
# Read multi-line JSON
json_file_path = "/FileStore/tables/products_multiline.json"
df = spark.read.json(json_file_path, multiLine=True)

# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



# DataFrame operations

In [0]:
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrame-Operations").getOrCreate()

# Load the synthetic data into a DataFrame
data_file_path = "/FileStore/tables/stocks.txt"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

# Display schema of DataFrame
df.printSchema()

# Show the initial DataFrame
print("Initial DataFrame:")
df.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

Initial DataFrame:
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         Macbook|Electronics|       5|1299.99|
|  3|            iPad|Electronics|      15| 499.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
| 10|    Dining Table|  Furniture|      10| 249.99|
+---+----------------+-----------+--------+-------+
only showing top 10 rows



##Select: choose specific columns

In [0]:
selected_columns = df.select("id", "name", "price")
print("Selected Columns:")
selected_columns.show(10)

Selected Columns:
+---+----------------+-------+
| id|            name|  price|
+---+----------------+-------+
|  1|          iPhone| 899.99|
|  2|         Macbook|1299.99|
|  3|            iPad| 499.99|
|  4|      Samsung TV| 799.99|
|  5|           LG TV| 699.99|
|  6|      Nike Shoes|  99.99|
|  7|    Adidas Shoes|  89.99|
|  8| Sony Headphones| 149.99|
|  9|Beats Headphones| 199.99|
| 10|    Dining Table| 249.99|
+---+----------------+-------+
only showing top 10 rows



##Filter: apply conditions to filter rows

In [0]:
filtered_data = df.filter(df.quantity > 20)
print("Filtered Data:", filtered_data.count())
filtered_data.show()

Filtered Data: 12
+---+--------------+-----------+--------+-----+
| id|          name|   category|quantity|price|
+---+--------------+-----------+--------+-----+
|  6|    Nike Shoes|   Clothing|      30|99.99|
|  7|  Adidas Shoes|   Clothing|      25|89.99|
| 12|        Apples|       Food|     100|  0.5|
| 13|       Bananas|       Food|     150| 0.25|
| 14|       Oranges|       Food|     120| 0.75|
| 15|Chicken Breast|       Food|      50| 3.99|
| 16| Salmon Fillet|       Food|      30| 5.99|
| 24|    Laptop Bag|Accessories|      25|29.99|
| 25|      Backpack|Accessories|      30|24.99|
| 28|         Jeans|   Clothing|      30|59.99|
| 29|       T-shirt|   Clothing|      50|14.99|
| 30|      Sneakers|   Clothing|      40|79.99|
+---+--------------+-----------+--------+-----+



##GroupBy: group data based on specific columns
##Aggregations: perform functions on grouped data

In [0]:
grouped_data = df.groupBy("category").agg({"quantity": "sum", "price": "avg"})
print("Grouped and Aggregated Data:")
grouped_data.show()

Grouped and Aggregated Data:
+-----------+-------------+------------------+
|   category|sum(quantity)|        avg(price)|
+-----------+-------------+------------------+
|Electronics|           98| 586.6566666666665|
|       Food|          450|2.2960000000000003|
|   Clothing|          200|  99.2757142857143|
|  Furniture|           41|            141.99|
|     Sports|           35|             34.99|
|Accessories|           55|             27.49|
+-----------+-------------+------------------+



##Join: combine multiple DataFrames based on specified columns

In [0]:
df2 = df.select("id", "category").limit(10)
joined_data = df.join(df2, "id", "inner")
print("Joined Data:")
joined_data.show()

Joined Data:
+---+----------------+-----------+--------+-------+-----------+
| id|            name|   category|quantity|  price|   category|
+---+----------------+-----------+--------+-------+-----------+
|  1|          iPhone|Electronics|      10| 899.99|Electronics|
|  2|         Macbook|Electronics|       5|1299.99|Electronics|
|  3|            iPad|Electronics|      15| 499.99|Electronics|
|  4|      Samsung TV|Electronics|       8| 799.99|Electronics|
|  5|           LG TV|Electronics|      10| 699.99|Electronics|
|  6|      Nike Shoes|   Clothing|      30|  99.99|   Clothing|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|   Clothing|
|  8| Sony Headphones|Electronics|      12| 149.99|Electronics|
|  9|Beats Headphones|Electronics|      20| 199.99|Electronics|
| 10|    Dining Table|  Furniture|      10| 249.99|  Furniture|
+---+----------------+-----------+--------+-------+-----------+



##Sort: arrange rows based on one or more columns

In [0]:
sorted_data = df.orderBy("price")
print("Sorted Data:")
sorted_data.show(10)

Sorted Data:
+---+--------------+-----------+--------+-----+
| id|          name|   category|quantity|price|
+---+--------------+-----------+--------+-----+
| 13|       Bananas|       Food|     150| 0.25|
| 12|        Apples|       Food|     100|  0.5|
| 14|       Oranges|       Food|     120| 0.75|
| 15|Chicken Breast|       Food|      50| 3.99|
| 16| Salmon Fillet|       Food|      30| 5.99|
| 29|       T-shirt|   Clothing|      50|14.99|
| 19|      Yoga Mat|     Sports|      20|19.99|
| 25|      Backpack|Accessories|      30|24.99|
| 24|    Laptop Bag|Accessories|      25|29.99|
| 20|  Dumbbell Set|     Sports|      15|49.99|
+---+--------------+-----------+--------+-----+
only showing top 10 rows



In [0]:
from pyspark.sql.functions import col, desc
sorted_data = df.orderBy(col("price").desc(), col("id").desc())
print("Sorted Data Descending:")
sorted_data.show(10)

Sorted Data Descending:
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  2|         Macbook|Electronics|       5|1299.99|
|  1|          iPhone|Electronics|      10| 899.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
| 26|          Camera|Electronics|      10| 599.99|
|  3|            iPad|Electronics|      15| 499.99|
| 10|    Dining Table|  Furniture|      10| 249.99|
| 17|  Leather Jacket|   Clothing|      15| 199.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
| 18|     Winter Coat|   Clothing|      10| 149.99|
+---+----------------+-----------+--------+-------+
only showing top 10 rows



##Distinct: get unique rows

In [0]:
distinct_rows = df.select("category").distinct()
print("Distinct Product Categories:")
distinct_rows.show()

Distinct Product Categories:
+-----------+
|   category|
+-----------+
|Electronics|
|       Food|
|   Clothing|
|  Furniture|
|     Sports|
|Accessories|
+-----------+



##Drop: remove specified columns

In [0]:
dropped_columns = df.drop("quantity", "category")
print("Dropped Columns:")
dropped_columns.show(10)

Dropped Columns:
+---+----------------+-------+
| id|            name|  price|
+---+----------------+-------+
|  1|          iPhone| 899.99|
|  2|         Macbook|1299.99|
|  3|            iPad| 499.99|
|  4|      Samsung TV| 799.99|
|  5|           LG TV| 699.99|
|  6|      Nike Shoes|  99.99|
|  7|    Adidas Shoes|  89.99|
|  8| Sony Headphones| 149.99|
|  9|Beats Headphones| 199.99|
| 10|    Dining Table| 249.99|
+---+----------------+-------+
only showing top 10 rows



##WithColumn: add new calculated columns

In [0]:
df_with_new_column = df.withColumn("revenue", df.quantity * df.price)
print("DataFrame with New Column:")
df_with_new_column.show(10)

DataFrame with New Column:
+---+----------------+-----------+--------+-------+-------+
| id|            name|   category|quantity|  price|revenue|
+---+----------------+-----------+--------+-------+-------+
|  1|          iPhone|Electronics|      10| 899.99| 8999.9|
|  2|         Macbook|Electronics|       5|1299.99|6499.95|
|  3|            iPad|Electronics|      15| 499.99|7499.85|
|  4|      Samsung TV|Electronics|       8| 799.99|6399.92|
|  5|           LG TV|Electronics|      10| 699.99| 6999.9|
|  6|      Nike Shoes|   Clothing|      30|  99.99| 2999.7|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|2249.75|
|  8| Sony Headphones|Electronics|      12| 149.99|1799.88|
|  9|Beats Headphones|Electronics|      20| 199.99| 3999.8|
| 10|    Dining Table|  Furniture|      10| 249.99| 2499.9|
+---+----------------+-----------+--------+-------+-------+
only showing top 10 rows



##Alias: rename columns

In [0]:
df_with_alias = df.withColumnRenamed("price", "product_price")
print("DataFrame with Aliased Column:")
df_with_alias.show(10)

DataFrame with Aliased Column:
+---+----------------+-----------+--------+-------------+
| id|            name|   category|quantity|product_price|
+---+----------------+-----------+--------+-------------+
|  1|          iPhone|Electronics|      10|       899.99|
|  2|         Macbook|Electronics|       5|      1299.99|
|  3|            iPad|Electronics|      15|       499.99|
|  4|      Samsung TV|Electronics|       8|       799.99|
|  5|           LG TV|Electronics|      10|       699.99|
|  6|      Nike Shoes|   Clothing|      30|        99.99|
|  7|    Adidas Shoes|   Clothing|      25|        89.99|
|  8| Sony Headphones|Electronics|      12|       149.99|
|  9|Beats Headphones|Electronics|      20|       199.99|
| 10|    Dining Table|  Furniture|      10|       249.99|
+---+----------------+-----------+--------+-------------+
only showing top 10 rows



# Spark SQL

In [0]:
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrame-SQL").getOrCreate()

# Load the synthetic data into a DataFrame
data_file_path = "/FileStore/tables/stocks.txt"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

# Display schema of DataFrame
df.printSchema()

# Show the initial DataFrame
print("Initial DataFrame:")
df.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

Initial DataFrame:
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         Macbook|Electronics|       5|1299.99|
|  3|            iPad|Electronics|      15| 499.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
| 10|    Dining Table|  Furniture|      10| 249.99|
+---+----------------+-----------+--------+-------+
only showing top 10 rows



In [0]:
# Register the DataFrame as a Temporary Table
df.createOrReplaceTempView("my_table")

In [0]:
# Perform SQL query
result = spark.sql("SELECT * FROM my_table WHERE quantity > 25")
result.show()

+---+--------------+-----------+--------+-----+
| id|          name|   category|quantity|price|
+---+--------------+-----------+--------+-----+
|  6|    Nike Shoes|   Clothing|      30|99.99|
| 12|        Apples|       Food|     100|  0.5|
| 13|       Bananas|       Food|     150| 0.25|
| 14|       Oranges|       Food|     120| 0.75|
| 15|Chicken Breast|       Food|      50| 3.99|
| 16| Salmon Fillet|       Food|      30| 5.99|
| 25|      Backpack|Accessories|      30|24.99|
| 28|         Jeans|   Clothing|      30|59.99|
| 29|       T-shirt|   Clothing|      50|14.99|
| 30|      Sneakers|   Clothing|      40|79.99|
+---+--------------+-----------+--------+-----+

