# Purpose of this notebook is to learn Spark

In [2]:
# Import libraries
import pyspark

In [17]:
# This is how you Create a new spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [12]:
spark

### If you want to convert a Pandas Dataframe to Spark

In [7]:
import pandas as pd
import numpy as np

np.random.seed(456)

pandas_dataframe = pd.DataFrame(
    dict(n=np.arange(20), group=np.random.choice(list("abc"), 20))
)
pandas_dataframe.head(3)

Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c


In [8]:
df = spark.createDataFrame(pandas_dataframe)

DataFrame[n: bigint, group: string]

In [11]:
df

DataFrame[n: bigint, group: string]

    Notice that, while we do see the column names, we don't see the data in the dataframe
    like we would with a pandas dataframe. This is because spark is lazy, in that it won't
    show us values until it has to. For the purposes of looking at the first few rows of
    our data, we can use the .show method.

### .show()

In [9]:
df.show(5)

+---+-----+
|  n|group|
+---+-----+
|  0|    b|
|  1|    b|
|  2|    c|
|  3|    a|
|  4|    c|
+---+-----+
only showing top 5 rows



### .describe()

In [13]:
df.describe()

DataFrame[summary: string, n: string, group: string]

    Which, also like pandas, returns another dataframe. 
    However, since this is a spark dataframe, we have to explicitly show it.

In [15]:
df.describe().show()

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



### Lets use a more robust dataset

In [18]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [19]:
mpg.hwy

Column<'hwy'>

    While this expression would produce a Series of values from a pandas dataframe,
    for a spark dataframe this produces a Column object, which is an object that 
    represents a vertical slice of a dataframe, but does not contain the data itself.

    One way to use our column objects is to use them in combination with the .select
    method. .select is very powerful, and lets us specify what data we want to see 
    in the resulting dataframe.

### .select()

In [23]:
mpg.select(mpg.hwy).show(5)

+---+
|hwy|
+---+
| 29|
| 29|
| 31|
| 30|
| 26|
+---+
only showing top 5 rows



### Arithmetic Operators

In [24]:
mpg.select(mpg.hwy, mpg.hwy + 1).show(5)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
| 31|       32|
| 30|       31|
| 26|       27|
+---+---------+
only showing top 5 rows



    Our column objects support a numer of operations,
    including the arithmetic operators:

### .alias()

In [28]:
mpg.select(mpg.hwy, (mpg.hwy + 5).alias("highway_mileage")).show(5)

+---+---------------+
|hwy|highway_mileage|
+---+---------------+
| 29|             34|
| 29|             34|
| 31|             36|
| 30|             35|
| 26|             31|
+---+---------------+
only showing top 5 rows



    here I am selecting hwy and hwy + 5
    I am also only renaming hwy + 5 with .alias as highway_mileage
    finally showing only 5 records

### Storing column objects in variables and reference them

In [29]:
col1 = mpg.hwy.alias("highway_mileage")
col2 = (mpg.hwy / 2).alias("highway_mileage_halved")
mpg.select(col1, col2).show(5)

+---------------+----------------------+
|highway_mileage|highway_mileage_halved|
+---------------+----------------------+
|             29|                  14.5|
|             29|                  14.5|
|             31|                  15.5|
|             30|                  15.0|
|             26|                  13.0|
+---------------+----------------------+
only showing top 5 rows



# Other ways to create columns

In [30]:
from pyspark.sql.functions import col, expr

### col

In [36]:
# Equal to df["hwy"] instead of using df.hwy
col("hwy") 

Column<'hwy'>

In [34]:
avg_column = (col("hwy") + col("cty")) / 2

mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_column.alias("avg_mileage"),
).show(5)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg_mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
|             31|          20|       25.5|
|             30|          21|       25.5|
|             26|          16|       21.0|
+---------------+------------+-----------+
only showing top 5 rows



### expr

In [35]:
mpg.select(
    expr("hwy"),  # the same as `col`
    expr("hwy + 1"),  # an arithmetic expression
    expr("hwy AS highway_mileage"),  # using an alias
    expr("hwy + 1 AS highway_incremented"),  # a combination of the above
).show(5)

+---+---------+---------------+-------------------+
|hwy|(hwy + 1)|highway_mileage|highway_incremented|
+---+---------+---------------+-------------------+
| 29|       30|             29|                 30|
| 29|       30|             29|                 30|
| 31|       32|             31|                 32|
| 30|       31|             30|                 31|
| 26|       27|             26|                 27|
+---+---------+---------------+-------------------+
only showing top 5 rows



    The expr function is more powerful than col. It does everything
    col does and more. expr returns the same type of column object,
    but allows us to express manipulations to the column within the
    string that defines the column.

### Spark SQL

### In order to start using spark SQL, we'll first "register" the table with spark:

In [41]:
# register table with spark
mpg.createOrReplaceTempView("mpg")

In [46]:
spark.sql(
"""
    SELECT hwy, cty, (hwy + cty) / 2 AS avg
    FROM mpg
"""
)

DataFrame[hwy: bigint, cty: bigint, avg: double]

In [45]:
spark.sql(
"""
    SELECT hwy, cty, (hwy + cty) / 2 AS avg
    FROM mpg
"""
).show(5)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
| 30| 21|25.5|
| 26| 16|21.0|
+---+---+----+
only showing top 5 rows



# Type Casting

In [47]:
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [48]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



### To convert from one type to another, we can use the .cast method on a column.

### .cast()

In [50]:
mpg.select(mpg.hwy.cast("string")).printSchema()

root
 |-- hwy: string (nullable = true)



In [51]:
mpg.select(mpg.hwy.cast("string")).dtypes

[('hwy', 'string')]

## Warning

In [52]:
mpg.select(mpg.model, mpg.model.cast("int")).show(5)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 5 rows



    Note that if a value is not able to be converted,
    it will be replaced with null:

# Basic Built-in Functions

In [53]:
# Note: The pyspark avg and mean functions are aliases of eachother
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean

### sum()

In [54]:
mpg.select(
    sum(mpg.hwy) / count(mpg.hwy).alias("average_1"),
    avg(mpg.hwy).alias("average_2"),
    min(mpg.hwy),
    max(mpg.hwy),
).show()

+------------------------------------+-----------------+--------+--------+
|(sum(hwy) / count(hwy) AS average_1)|        average_2|min(hwy)|max(hwy)|
+------------------------------------+-----------------+--------+--------+
|                   23.44017094017094|23.44017094017094|      12|      44|
+------------------------------------+-----------------+--------+--------+



### concat()

In [55]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



In [56]:
mpg.select(mpg.manufacturer).show(5)

+------------+
|manufacturer|
+------------+
|        audi|
|        audi|
|        audi|
|        audi|
|        audi|
+------------+
only showing top 5 rows



In [57]:
mpg.select(mpg.model).show(5)

+-----+
|model|
+-----+
|   a4|
|   a4|
|   a4|
|   a4|
|   a4|
+-----+
only showing top 5 rows



# lit()

    In order to use a string literal as part of our select
    we'll need to use the lit function, otherwise spark will
    try to resolve our string as a column.

In [59]:
from pyspark.sql.functions import lit

In [60]:
mpg.select(concat(mpg.cyl, lit(" cylinders"))).show(5)

+-----------------------+
|concat(cyl,  cylinders)|
+-----------------------+
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            6 cylinders|
+-----------------------+
only showing top 5 rows



### More pyspark Functions for String Manipulation

In [61]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [62]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



### regexp_extract()

    The regexp_extract function lets us specify a regular 
    expression with at least one capture group,
    and create a new column based on the contents of a 
    capture group.

In [63]:
textdf.select(
    "address",
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+



### regexp_replace()

    In the example below, we obtain just the city, 
    state, and zip code of the address by replacing
    everything up to the first comma with an empty string.

In [64]:
textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)

+---------------------------------------------+---------------------+
|address                                      |city_state_zip       |
+---------------------------------------------+---------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215        |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227      |San Antonio, TX 78227|
+---------------------------------------------+---------------------+



### .filter and .where

In [67]:
mpg.filter(mpg.cyl == 4).where(mpg["class"] == "subcompact").show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----+-----+----+---+----------+---+---+---+---+----------+
|       honda|civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
+------------+-----+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



# When and Otherwise

In [68]:
from pyspark.sql.functions import when

### .when()

In [69]:
mpg.select(mpg.hwy, when(mpg.hwy > 25, "good_mileage").alias("mpg_desc")).show(
    12
)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25|        null|
| 28|good_mileage|
| 27|good_mileage|
| 25|        null|
+---+------------+
only showing top 12 rows



### .otherwise()

    Notice below that if the condition we specified is false,
    null will be produced. Instead of null, we can specify a
    value to use if our condition is false with the .otherwise
    method.

In [70]:
mpg.select(
    mpg.hwy,
    when(mpg.hwy > 25, "good_mileage")
    .otherwise("bad_mileage")
    .alias("mpg_desc"),
).show(12)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25| bad_mileage|
| 28|good_mileage|
| 27|good_mileage|
| 25| bad_mileage|
+---+------------+
only showing top 12 rows



### Multiple .when() statements

In [71]:
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).show(10)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
+-----+-----------+
only showing top 10 rows



    Notice here that a car with a displ of 1.8 matches 
    both conditions we specified, but small is produced 
    because it is associated with the first matching condition. 
    For any value between 2 and 3, medium will be produced, 
    and anything larger than 3 will produce large.

# Sorting and Ordering

In [72]:
mpg.sort(mpg.hwy).show(8)

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|  land rover|        range rover|  4.0|1999|  8|  auto(l4)|  4| 11| 15|  p|   suv|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

### Ascending and Descending

In [73]:
from pyspark.sql.functions import asc, desc

In [74]:
mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col("hwy").desc())
# is the same as
mpg.sort(desc("hwy")).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



### Sorting by multiple columns

In [75]:
mpg.sort(desc("class"), mpg.cyl.asc(), col("hwy").desc()).show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 20| 27|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 20| 26|  r|  suv|
|      subaru|      forester awd|  2.5|1999|  4|manual(m5)|  4| 18| 25|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 19| 25|  p|  suv|
|      subaru|      forester awd|  2.5|1999|  4|  auto(l4)|  4| 18| 24|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 18| 23|  p|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|manual(m5)|  4| 15| 20|  r|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|  auto(l4)|  4| 16| 20|  r|  suv|
|        jeep|grand cherokee 4wd|  3.0|2008|  6|  auto(l5)|  4| 17| 22|  d|  suv|
|        jeep|gr

# Grouping and Aggregating

In [76]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(col("cyl"))
mpg.groupBy("cyl")

<pyspark.sql.group.GroupedData at 0x7ffb439de0a0>

    Once the data is grouped, we need to specify an 
    aggregation. We can use one of the aggregate 
    functions we imported earlier, alond with a column:

In [77]:
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+------------------+-----------------+
|cyl|          avg(cty)|         avg(hwy)|
+---+------------------+-----------------+
|  6| 16.21518987341772|22.82278481012658|
|  8|12.571428571428571|17.62857142857143|
|  4|21.012345679012345|28.80246913580247|
|  5|              20.5|            28.75|
+---+------------------+-----------------+



In [78]:
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+----------+------------------+------------------+
|cyl|     class|          avg(cty)|          avg(hwy)|
+---+----------+------------------+------------------+
|  6|    pickup|              14.5|              17.9|
|  8|       suv|12.131578947368421|16.789473684210527|
|  8|    pickup|              11.8|              15.8|
|  8|   midsize|              16.0|              24.0|
|  4|   midsize|              20.5|           29.1875|
|  8|   2seater|              15.4|              24.8|
|  6|   compact|16.923076923076923|25.307692307692307|
|  6|   minivan|              15.6|              22.2|
|  4|   compact|            21.375|          29.46875|
|  6|   midsize|17.782608695652176| 26.26086956521739|
|  4|   minivan|              18.0|              24.0|
|  6|       suv|              14.5|              18.5|
|  6|subcompact|              17.0|24.714285714285715|
|  4|subcompact|22.857142857142858| 30.80952380952381|
|  8|subcompact|              14.8|              21.6|
|  4|     

### .groupBy()

    In addition to .groupBy, we can use .rollup,
    which will do the same aggregations,
    but will also include the overall total:

In [79]:
mpg.rollup("cyl").count().sort("cyl").show()

+----+-----+
| cyl|count|
+----+-----+
|null|  234|
|   4|   81|
|   5|    4|
|   6|   79|
|   8|   70|
+----+-----+



In [80]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()

+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+



    And in the example above, the null row represents
    the overall average highway mileage.

# Crosstabs and Pivot Tables

### .crosstabs()

In [81]:
mpg.crosstab("class", "cyl").show()

+----------+---+---+---+---+
| class_cyl|  4|  5|  6|  8|
+----------+---+---+---+---+
|   midsize| 16|  0| 23|  2|
|subcompact| 21|  2|  7|  5|
|   2seater|  0|  0|  0|  5|
|    pickup|  3|  0| 10| 20|
|   minivan|  1|  0| 10|  0|
|       suv|  8|  0| 16| 38|
|   compact| 32|  2| 13|  0|
+----------+---+---+---+---+



### .pivot()

In [82]:
mpg.groupby("class").pivot("cyl").mean("hwy").show()

+----------+------------------+----+------------------+------------------+
|     class|                 4|   5|                 6|                 8|
+----------+------------------+----+------------------+------------------+
|subcompact| 30.80952380952381|28.5|24.714285714285715|              21.6|
|   compact|          29.46875|29.0|25.307692307692307|              null|
|   minivan|              24.0|null|              22.2|              null|
|       suv|             23.75|null|              18.5|16.789473684210527|
|   midsize|           29.1875|null| 26.26086956521739|              24.0|
|    pickup|20.666666666666668|null|              17.9|              15.8|
|   2seater|              null|null|              null|              24.8|
+----------+------------------+----+------------------+------------------+



    More to learn later