# Spark 101

All the basics! 

Slides: 

## Create Spark Session

In [2]:
#import spark for python! 
import pyspark
#create the spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/30 10:39:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/30 10:39:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Dataframe Basics
 - create dataframe: `spark.createdataframe`
 - see the results: `.show`
 - look at your data: `.describe`, `.dtypes`, `printSchema`
 - select & create columns: `.select`, `col`, `expr`

### create dataframe from a pandas dataframe

In [3]:
import pandas as pd
import numpy as np

In [13]:
np.random.seed(452)

pandas_dataframe = pd.DataFrame(
    dict(n=np.arange(20), group=np.random.choice(list("abc"), 20))
)

pandas_dataframe.head()

Unnamed: 0,n,group
0,0,c
1,1,a
2,2,a
3,3,a
4,4,a


In [14]:
#create spark dataframe 
df = spark.createDataFrame(pandas_dataframe)

### See the results

<div class='alert alert-box alert-info'>
<b>Note:</b> Spark is lazy and won't display info unless you tell it to
</div>

In [15]:
#one way to pull in a spark df
df.show()

+---+-----+
|  n|group|
+---+-----+
|  0|    c|
|  1|    a|
|  2|    a|
|  3|    a|
|  4|    a|
|  5|    a|
|  6|    c|
|  7|    c|
|  8|    a|
|  9|    a|
| 10|    b|
| 11|    a|
| 12|    a|
| 13|    a|
| 14|    a|
| 15|    a|
| 16|    b|
| 17|    a|
| 18|    c|
| 19|    a|
+---+-----+



### create dataframe from the pydataset

In [16]:
from pydataset import data

In [17]:
df = spark.createDataFrame(data('mpg'))
df

DataFrame[manufacturer: string, model: string, displ: double, year: bigint, cyl: bigint, trans: string, drv: string, cty: bigint, hwy: bigint, fl: string, class: string]

### look at your data

In [20]:
#to see the entire df
df.show() #default is 20, can change by inputting an integer

+------------+------------------+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+
|        audi|                a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|                a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|                a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|                a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|                a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
|        audi|                a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|
|        audi|                a4|  3.1|2008|  6|  auto(av)|  f| 18| 27|  p|compact|
|        audi|        a4 quattro|  1.8|1999|  4|manual(m5)|  4| 18| 26|  p|compact|
|        audi|        a4 quattro|  1.8|1999|  4|  auto(l5)|  4| 16| 25|  p|c

In [21]:
#describe
df.describe().show() #too long

23/06/30 10:42:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 14:>                                                       (0 + 16) / 16]

+-------+------------+-----------------+------------------+-----------------+-----------------+----------+---+------------------+-----------------+----+-------+
|summary|manufacturer|            model|             displ|             year|              cyl|     trans|drv|               cty|              hwy|  fl|  class|
+-------+------------+-----------------+------------------+-----------------+-----------------+----------+---+------------------+-----------------+----+-------+
|  count|         234|              234|               234|              234|              234|       234|234|               234|              234| 234|    234|
|   mean|        null|             null| 3.471794871794872|           2003.5|5.888888888888889|      null|4.0|16.858974358974358|23.44017094017094|null|   null|
| stddev|        null|             null|1.2919590310839348|4.509646313320452|1.611534484684289|      null|0.0| 4.255945678889394|5.954643441166446|null|   null|
|    min|        audi|      4runne

                                                                                

In [23]:
#describe vertical
df.describe().show(vertical=True) #works like transpose

-RECORD 0--------------------------
 summary      | count              
 manufacturer | 234                
 model        | 234                
 displ        | 234                
 year         | 234                
 cyl          | 234                
 trans        | 234                
 drv          | 234                
 cty          | 234                
 hwy          | 234                
 fl           | 234                
 class        | 234                
-RECORD 1--------------------------
 summary      | mean               
 manufacturer | null               
 model        | null               
 displ        | 3.471794871794872  
 year         | 2003.5             
 cyl          | 5.888888888888889  
 trans        | null               
 drv          | 4.0                
 cty          | 16.858974358974358 
 hwy          | 23.44017094017094  
 fl           | null               
 class        | null               
-RECORD 2--------------------------
 summary      | stddev      

In [25]:
#printSchema
df.printSchema().show() #gives column names and what datatype it is and if it can have null values

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



AttributeError: 'NoneType' object has no attribute 'show'

In [28]:
#dtypes
df.dtypes #gives dtype on its own

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [33]:
#number of rows
df.count() #how to get shape in spark

234

In [35]:
#number of columns
len(df.columns) #how to get shape in spark

11

### select columns

<div class='alert alert-box alert-info'>
<b>Reminder:</b> Spark is still lazy
</div>

In [40]:
#looking at a column
df.select('trans').show(5)

+----------+
|     trans|
+----------+
|  auto(l5)|
|manual(m5)|
|manual(m6)|
|  auto(av)|
|  auto(l5)|
+----------+
only showing top 5 rows



In [44]:
#another way
df.select(df.trans).show(5)

+----------+
|     trans|
+----------+
|  auto(l5)|
|manual(m5)|
|manual(m6)|
|  auto(av)|
|  auto(l5)|
+----------+
only showing top 5 rows



In [45]:
#select two columns
df.select('trans', 'hwy').show(5)

+----------+---+
|     trans|hwy|
+----------+---+
|  auto(l5)| 29|
|manual(m5)| 29|
|manual(m6)| 31|
|  auto(av)| 30|
|  auto(l5)| 26|
+----------+---+
only showing top 5 rows



#### save our selected columns

<div class='alert alert-box alert-info'>
<b>Note:</b> You can not save a display output
</div>

In [50]:
#save this as a variable and do something with it
df_cols = df.select('model','year').show() #this df did not save the DF inside the VARIABLE because of the .show()

+------------------+----+
|             model|year|
+------------------+----+
|                a4|1999|
|                a4|1999|
|                a4|2008|
|                a4|2008|
|                a4|1999|
|                a4|1999|
|                a4|2008|
|        a4 quattro|1999|
|        a4 quattro|1999|
|        a4 quattro|2008|
|        a4 quattro|2008|
|        a4 quattro|1999|
|        a4 quattro|1999|
|        a4 quattro|2008|
|        a4 quattro|2008|
|        a6 quattro|1999|
|        a6 quattro|2008|
|        a6 quattro|2008|
|c1500 suburban 2wd|2008|
|c1500 suburban 2wd|2008|
+------------------+----+
only showing top 20 rows



In [51]:
#trying to look at the variable
df_cols.show()

AttributeError: 'NoneType' object has no attribute 'show'

In [54]:
#type
type(df_cols) #because of that .show() -- spark only visualizes

NoneType

In [57]:
#remove the .show()
df_cols = df.select('model','year')
df_cols.show() #now it is saved into a variable

+------------------+----+
|             model|year|
+------------------+----+
|                a4|1999|
|                a4|1999|
|                a4|2008|
|                a4|2008|
|                a4|1999|
|                a4|1999|
|                a4|2008|
|        a4 quattro|1999|
|        a4 quattro|1999|
|        a4 quattro|2008|
|        a4 quattro|2008|
|        a4 quattro|1999|
|        a4 quattro|1999|
|        a4 quattro|2008|
|        a4 quattro|2008|
|        a6 quattro|1999|
|        a6 quattro|2008|
|        a6 quattro|2008|
|c1500 suburban 2wd|2008|
|c1500 suburban 2wd|2008|
+------------------+----+
only showing top 20 rows



### create columns
 - use basic math operators
 - change column names: `alias`

In [55]:
df.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



#### half the highway mileage

In [58]:
#create transformation
df.hwy / 2

Column<'(hwy / 2)'>

In [61]:
#create action
df.select(df.hwy / 2).show(5)

+---------+
|(hwy / 2)|
+---------+
|     14.5|
|     14.5|
|     15.5|
|     15.0|
|     13.0|
+---------+
only showing top 5 rows



In [64]:
#create alias for column name
df.select(df.hwy, (df.hwy/2).alias('half_hwy')).show(5)

+---+--------+
|hwy|half_hwy|
+---+--------+
| 29|    14.5|
| 29|    14.5|
| 31|    15.5|
| 30|    15.0|
| 26|    13.0|
+---+--------+
only showing top 5 rows



### select & create columns: `col`

In [65]:
from pyspark.sql.functions import col

In [76]:
df.select(df.hwy,
          col('hwy'),
          col('hwy') + 1,
          (col('hwy') + col('cty')).alias('hwy_and_city')).show(5)

+---+---+---------+------------+
|hwy|hwy|(hwy + 1)|hwy_and_city|
+---+---+---------+------------+
| 29| 29|       30|          47|
| 29| 29|       30|          50|
| 31| 31|       32|          51|
| 30| 30|       31|          51|
| 26| 26|       27|          42|
+---+---+---------+------------+
only showing top 5 rows



### select & create columns: `expr`

In [77]:
from pyspark.sql.functions import expr

In [79]:
df.select(
    expr('hwy'),
    expr('hwy + 1')
         
).show(5)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
| 31|       32|
| 30|       31|
| 26|       27|
+---+---------+
only showing top 5 rows



### create column: `withColumn`

In [84]:
#keeps the original
df.withColumn(
    'hwy_plus_one', #new column name
              col('hwy') + 1).show(5) #how the new column is made

+------------+-----+-----+----+---+----------+---+---+---+---+-------+------------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|hwy_plus_one|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+------------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|          30|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|          30|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|          32|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|          31|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|          27|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+------------+
only showing top 5 rows



## Transforming columns

### built-in functions - math

In [86]:
from pyspark.sql.functions import min, max, sum, count, mean, avg
#import pyspark.sql.functions as F #also common import! call as F.min()

In [97]:
#use min, max, and calculate average highway mileage
df.select(
    min(df.hwy), max(df.hwy), avg(df.hwy), mean(df.hwy), count(col('hwy')) #these are all only one value
                                    #these operate the exact same way
).show(5)

+--------+--------+-----------------+-----------------+----------+
|min(hwy)|max(hwy)|         avg(hwy)|         avg(hwy)|count(hwy)|
+--------+--------+-----------------+-----------------+----------+
|      12|      44|23.44017094017094|23.44017094017094|       234|
+--------+--------+-----------------+-----------------+----------+



### built-in functions - strings
- `concat`: to concatenate strings
- `lit`: creates literal value of character

In [98]:
from pyspark.sql.functions import concat, lit

In [101]:
#combine manufacturer and model together
df.select(
    concat(df.manufacturer, df.model)

).show(5) #smushed together 

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



In [104]:
#get a space
df.select(
    concat(df.manufacturer, lit(' '), df.model) #using lit to get a LITERAL space
).show(5)

+------------------------------+
|concat(manufacturer,  , model)|
+------------------------------+
|                       audi a4|
|                       audi a4|
|                       audi a4|
|                       audi a4|
|                       audi a4|
+------------------------------+
only showing top 5 rows



In [110]:
#combine city and highway together
df.select(
    df.cty, 
    df.hwy,
    concat(df.cty, lit(' '), df.hwy) #concat treats things as strings   #if you want to add use the operator

).show(5) 

+---+---+-------------------+
|cty|hwy|concat(cty,  , hwy)|
+---+---+-------------------+
| 18| 29|              18 29|
| 21| 29|              21 29|
| 20| 31|              20 31|
| 21| 30|              21 30|
| 16| 26|              16 26|
+---+---+-------------------+
only showing top 5 rows



### Regex! 

- `regexp_extract`: use regex to extract data
- `regexp_replace`: use regex to replace data

In [111]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [113]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410!!!!, San - Antonio, TX @78227@",
            ]
        }
    )
)

textdf.show(truncate=False) #allows us to see the full cell!

+-----------------------------------------------+
|address                                        |
+-----------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205  |
|3130 Broadway St, San Antonio, TX 78209        |
|303 Pearl Pkwy, San Antonio, TX 78215          |
|1255 SW Loop 410!!!!, San - Antonio, TX @78227@|
+-----------------------------------------------+



In [130]:
#cleaning the strings using regexp
textdf.select(
    'address',
    regexp_extract('address', r'\d+', 0).alias('street_num'),
    regexp_extract('address', r'(\d+)\s(\w+)', 1).alias('street_name'),
    #replace exclamation points
    regexp_replace('address', r'[^\w\s]', '').alias('clean_address')

).show(truncate=False)

+-----------------------------------------------+----------+-----------+-------------------------------------------+
|address                                        |street_num|street_name|clean_address                              |
+-----------------------------------------------+----------+-----------+-------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205  |600       |600        |600 Navarro St ste 600 San Antonio TX 78205|
|3130 Broadway St, San Antonio, TX 78209        |3130      |3130       |3130 Broadway St San Antonio TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215          |303       |303        |303 Pearl Pkwy San Antonio TX 78215        |
|1255 SW Loop 410!!!!, San - Antonio, TX @78227@|1255      |1255       |1255 SW Loop 410 San  Antonio TX 78227     |
+-----------------------------------------------+----------+-----------+-------------------------------------------+



### Filter and Where

In [None]:
df = spark.createDataFrame(data('mpg'))
df

In [None]:
df.show(5)

### When and Otherwise

In [None]:
from pyspark.sql.functions import when

In [None]:
df.select(

).show(5)

### sorting and ordering

### Grouping and Aggregating

In [None]:
#groupby/groupBy


In [None]:
#rollup


### Crosstabs and Pivot Tables

In [None]:
#crosstab


In [None]:
#groupby and pivot


### Handling Missing Data

- `.na.fill`: to replace missing values with a specified value
- `.na.drop`: to drop rows containing missing values

In [None]:
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

### More Dataframe Manipulation Examples

In [None]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
df = spark.createDataFrame(weather)
df.show(6)

#### shape of df

#### start and end date

#### Find the total rainfall per month

In [None]:
from pyspark.sql.functions import month, year, quarter

### Joins

In [None]:
(
    df.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

In [None]:
users = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4, 5, 6],
            "name": ["bob", "joe", "sally", "adam", "jane", "mike"],
            "role_id": [1, 2, 3, 3, np.nan, np.nan],
        }
    )
)
roles = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4],
            "name": ["admin", "author", "reviewer", "commenter"],
        }
    )
)
print("--- users ---")
users.show()
print("--- roles ---")
roles.show()

In [None]:
users.join(roles, on=users.role_id == roles.id).show()

In [None]:
users.join(roles, on=users.role_id == roles.id, how="left").show()