# PySpark


## 1. Installed packages

### !pip for High Concurrency sessions

In [1]:
!pip show duckdb

StatementMeta(, bc69e24c-20bf-489c-a9d6-c61b6392441f, 3, Finished, Available, Finished)

[0m

### Where'd my DuckDB go?

In [2]:
from importlib.metadata import distributions
import pandas as pd

packages = [(dist.metadata['Name'], dist.version) for dist in distributions()]
df = pd.DataFrame(packages, columns=['Package', 'Version'])
display(df.sort_values('Package'))

StatementMeta(, bc69e24c-20bf-489c-a9d6-c61b6392441f, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c0dbdf52-ccec-498a-a9ed-41c1d576d5ef)

## 2. How to get data

### Load from files (2 Options - Spark or Pandas)
### Load from Tables (1 Option - Spark) 
### Drag and drop (Files, Tables)


In [3]:
df = spark.sql("SELECT * FROM Lakebeach.sets LIMIT 1000")
display(df)

StatementMeta(, bc69e24c-20bf-489c-a9d6-c61b6392441f, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ea1666fc-00f9-4095-8813-3f2692ab93ba)

In [45]:
df = spark.sql("SELECT * FROM Lakebeach.sets LIMIT 1000")
display(df)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 47, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 02a412ab-c7e1-4c63-b0c7-4d5fb9cef1f4)

In [46]:
df = spark.read.format("csv").option("header","true").load("Files/Legos/sets.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Legos/sets.csv".
display(df)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 48, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, bbeea655-2543-472a-9662-b4004a159eb7)

## 3. Magic Commands

### Set language at the cell level

In [47]:
%%sql

SELECT * FROM Lakebeach.sets LIMIT 5

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 49, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 6 fields>

## 4. DataFrames aka Tables

### What is it
- ### RDD (Resilient Distributed Dataset) is the fundamental data structure in Spark
- ### DataFrames are easier to work with - higher level abstraction
- ### Type()

In [48]:
# Create an RDD from a list
rdd = sc.parallelize([1, 2, 3, 4, 5])
type(rdd)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 50, Finished, Available, Finished)

pyspark.rdd.RDD

In [49]:
import os
os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'

import pyspark.pandas as ps

pdf = ps.read_csv("Files/Legos/sets.csv", index_col='set_num')

type(pdf) 

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 51, Finished, Available, Finished)

pyspark.pandas.frame.DataFrame

In [4]:
import pandas as pd
pdf = pd.read_csv("/lakehouse/default/Files/Legos/sets.csv")
type(pdf)

StatementMeta(, bc69e24c-20bf-489c-a9d6-c61b6392441f, 6, Finished, Available, Finished)

pandas.core.frame.DataFrame

In [51]:
import pandas as pd
# Load data into pandas DataFrame from "/lakehouse/default/Files/Legos/sets.csv"
df = pd.read_csv("/lakehouse/default/Files/Legos/sets.csv")
display(df)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 53, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2baf1b3c-f640-4273-bf4d-9b6a0a68ee68)

## 5. Data Wrangler

- ### Load Samples
    - #### Titanic Dataset
    - #### Not enough data to see if door could hold two people
- ### Load Existing data frame
- ### Great way to learn syntax
- ### Works with Python Notebook too!

In [5]:
import pandas as pd

wrangler_sample_df = pd.read_csv("https://aka.ms/wrangler/titanic.csv")
display(wrangler_sample_df)

StatementMeta(, bc69e24c-20bf-489c-a9d6-c61b6392441f, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cd5371c2-6106-4a48-b846-3d1c5ae38498)

In [ ]:
# Code generated by Data Wrangler for pandas DataFrame

def clean_data(wrangler_sample_df):
    # Filter rows based on column: 'Survived'
    wrangler_sample_df = wrangler_sample_df[wrangler_sample_df['Survived'] == 1]
    return wrangler_sample_df

wrangler_sample_df_clean = clean_data(wrangler_sample_df.copy())
display(wrangler_sample_df_clean)

In [53]:
legosetsdf =\
    spark.read.format("csv")\
    .option("header","true")\
    .load("Files/Legos/sets.csv")

# df now is a Spark DataFrame containing CSV data from "Files/Legos/sets.csv".
display(legosetsdf)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 55, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, eacd7f9f-318a-4aae-9bfd-1655318a570d)

### Code Generated from Data Wrangler

In [54]:
# Code generated by Data Wrangler for PySpark DataFrame

from pyspark.sql import types as T

def clean_data(legosetsdf):
    # Rename column 'set_num' to 'Set Number'
    legosetsdf = legosetsdf.withColumnRenamed('set_num', 'Set Number')
    # Rename column 'name' to 'Set Name'
    legosetsdf = legosetsdf.withColumnRenamed('name', 'Set Name')
    # Change column type to int64 for column: 'year'
    legosetsdf = legosetsdf.withColumn('year', legosetsdf['year'].cast(T.LongType()))
    # Filter rows based on column: 'year'
    legosetsdf = legosetsdf.filter(legosetsdf['year'] >= 2025)
    return legosetsdf

legosetsdf_clean = clean_data(legosetsdf)
display(legosetsdf_clean)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 56, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 911e6abb-faf4-4fc3-adab-83969866fa6f)

## 6. Various Transformations

### Joining DataFrames

In [6]:
legosets = spark.table("Lakebeach.sets")
legothemes = spark.table("Lakebeach.themes")

(
    legosets.
    join(legothemes, legosets.theme_id == legothemes.id, how="left")
    .show(8)
)

StatementMeta(, bc69e24c-20bf-489c-a9d6-c61b6392441f, 27, Finished, Available, Finished)

+------------+--------------------+----+--------+---------+--------------------+---+--------------------+---------+
|     set_num|                name|year|theme_id|num_parts|             img_url| id|                name|parent_id|
+------------+--------------------+----+--------+---------+--------------------+---+--------------------+---------+
|0003977811-1|Ninjago: Book of ...|2022|     761|        1|https://cdn.rebri...|761|Activity Books wi...|      497|
|       001-1|               Gears|1965|     756|       43|https://cdn.rebri...|756|           Samsonite|      365|
|      0011-2|   Town Mini-Figures|1979|      67|       12|https://cdn.rebri...| 67|        Classic Town|       50|
|      0011-3|Castle 2 for 1 Bo...|1987|     199|        0|https://cdn.rebri...|199|        Lion Knights|      186|
|      0012-1|  Space Mini-Figures|1979|     143|       12|https://cdn.rebri...|143|        Supplemental|      126|
|      0013-1|  Space Mini-Figures|1979|     143|       12|https://cdn.r

### Join with renaming

In [56]:
from pyspark.sql.functions import col

legosets = spark.table("Lakebeach.sets")
legothemes = spark.table("Lakebeach.themes")

(
    legosets
    .join(legothemes, legosets.theme_id == legothemes.id, how="left")
    .select(
        legosets["*"],  # All columns from legosets
        legothemes.name.alias("theme_name")  # Rename themes.name to theme_name
    )
    .drop("theme_id")  # Drop the theme_id column
    .show(8)
)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 58, Finished, Available, Finished)

+------------+--------------------+----+---------+--------------------+--------------------+
|     set_num|                name|year|num_parts|             img_url|          theme_name|
+------------+--------------------+----+---------+--------------------+--------------------+
|0003977811-1|Ninjago: Book of ...|2022|        1|https://cdn.rebri...|Activity Books wi...|
|       001-1|               Gears|1965|       43|https://cdn.rebri...|           Samsonite|
|      0011-2|   Town Mini-Figures|1979|       12|https://cdn.rebri...|        Classic Town|
|      0011-3|Castle 2 for 1 Bo...|1987|        0|https://cdn.rebri...|        Lion Knights|
|      0012-1|  Space Mini-Figures|1979|       12|https://cdn.rebri...|        Supplemental|
|      0013-1|  Space Mini-Figures|1979|       12|https://cdn.rebri...|        Supplemental|
|      0014-1|  Space Mini-Figures|1979|        2|https://cdn.rebri...|        Supplemental|
|      0015-1|  Space Mini-Figures|1979|       18|https://cdn.rebri...

### Read from CSV - apply GroupBy

In [57]:
from pyspark.sql.functions import count

legosets = spark.read.format("csv").option("header","true").load("Files/Legos/sets.csv")

legosets.groupBy("name").agg(count("theme_id").alias("Theme Count")).show(10)


StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 59, Finished, Available, Finished)

+--------------------+-----------+
|                name|Theme Count|
+--------------------+-----------+
| Castle Mini Figures|          4|
|Spider-Man Action...|          1|
|Friends Hearts Pe...|          3|
|Winnie the Pooh's...|          1|
|          Basic Pack|          1|
|    My First Tractor|          1|
|Mickey Mouse & Do...|          1|
|     Passenger Coach|          2|
|              Flower|          5|
|Island Xtreme Stu...|          1|
+--------------------+-----------+
only showing top 10 rows



### Alias example

In [58]:
from pyspark.sql.functions import col

(
legothemes
    .select(col("id"), col("name")
    .alias("Theme Name"))
    .show(10)
)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 60, Finished, Available, Finished)

+---+--------------------+
| id|          Theme Name|
+---+--------------------+
|  1|             Technic|
|  3|         Competition|
|  4|      Expert Builder|
| 16|          RoboRiders|
| 17|      Speed Slammers|
| 18|           Star Wars|
| 19|        Supplemental|
| 20|     Throwbot Slizer|
| 21|Universal Buildin...|
| 22|             Creator|
+---+--------------------+
only showing top 10 rows



### Count()

In [59]:
legothemes.count()

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 61, Finished, Available, Finished)

480

### Filtering

In [60]:
stawarslegos =\
    legothemes\
    .filter(legothemes.name == "Star Wars")
display(stawarslegos)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 62, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e18d83d2-02dc-40c7-af76-5594bd1e5f37)

### Select name column

In [61]:
onlynames =\
legothemes\
    .select(legothemes.name)\
    .limit(3)

display(onlynames)

StatementMeta(, a47dabc6-6652-439d-8887-8d16c8a96f30, 63, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8db1fb2e-3c47-4aa4-9ce2-de6d07d2fae9)

### There are multiple ways to refer to columns
### - sets.select(sets.name)
### - sets.select(sets["name"])
### - sets.select("name)"
### - sets.select(col"name")
###      - requires importing col
###          - from pyspark.sql.functions import col

## 7. Lazy Evaluation

### Can be confusing
## Notice it gives info about Dataframe
## but not the data itself - so <u>**lazy**</u>

In [7]:
# Only return info about dataframe

legothemes

StatementMeta(, bc69e24c-20bf-489c-a9d6-c61b6392441f, 28, Finished, Available, Finished)

DataFrame[id: int, name: string, parent_id: int]

In [8]:
legothemes = spark.read.format("csv").option("header","true").load("Files/Legos/themes.csv")
# legothemes now is a Spark DataFrame containing CSV data from "Files/Legos/themes.csv".

#display using the show() method limiting to 5 rows
legothemes.show(5)

#display using the show() method limiting to 5 rows and truncate column length
legothemes.show(5, truncate=10)

StatementMeta(, bc69e24c-20bf-489c-a9d6-c61b6392441f, 29, Finished, Available, Finished)

+---+--------------+---------+
| id|          name|parent_id|
+---+--------------+---------+
|  1|       Technic|     NULL|
|  3|   Competition|        1|
|  4|Expert Builder|        1|
| 16|    RoboRiders|        1|
| 17|Speed Slammers|        1|
+---+--------------+---------+
only showing top 5 rows

+---+----------+---------+
| id|      name|parent_id|
+---+----------+---------+
|  1|   Technic|     NULL|
|  3|Competi...|        1|
|  4|Expert ...|        1|
| 16|RoboRiders|        1|
| 17|Speed S...|        1|
+---+----------+---------+
only showing top 5 rows



### Action will "execute" the code
### Displaying the data is one of those actions
### <u>**Two Ways to display**</u>
- ### show() method (shown above)
    - ### also can show(5, truncate=25)
- ### display() function (shown below)

## 8. Add to Pipeline

- ### Notes in slides on adding libraries in pipeline
- ### Add to New or Existing pipeline
- ### Schedule