In [20]:
# create a spark session
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [21]:
import pandas as pd
import numpy as np

In [22]:
# spark can convert any pandas dataframe into a spark dataframe
# here we will start with a pandas dataset
np.random.seed(456)
pandas_dataframe = pd.DataFrame(dict(n=np.arange(20), group = np.random.choice(list('abc'), 20))
                               )
pandas_dataframe

Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c
3,3,a
4,4,c
5,5,c
6,6,a
7,7,b
8,8,a
9,9,b


In [23]:
# convert the dataframe to a spark dataframe
df = spark.createDataFrame(pandas_dataframe)
df

DataFrame[n: bigint, group: string]

In [24]:
# look at the first five rows of our data (the default is 20 rows)
df.show(5)

+---+-----+
|  n|group|
+---+-----+
|  0|    b|
|  1|    b|
|  2|    c|
|  3|    a|
|  4|    c|
+---+-----+
only showing top 5 rows



In [25]:
# describe the spark dataframe
df.describe()

DataFrame[summary: string, n: string, group: string]

In [26]:
# show the 'describe'
df.describe().show()

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



In [27]:
### Lets use a larger dataset

from pydataset import data

mpg = spark.createDataFrame(data('mpg'))
mpg.show(3)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 3 rows



In [28]:
# a difference with spark and pandas df is that spark will give the 
# slice of the dataframe but5 the contain the data itself like pandas would

mpg.hwy

Column<'hwy'>

In [29]:
# or
mpg.select(mpg.hwy, mpg.cty, mpg.model)

DataFrame[hwy: bigint, cty: bigint, model: string]

In [30]:
# to see that data, we would have to use the .show at the end

mpg.select(mpg.hwy, mpg.cty, mpg.model).show(10)

+---+---+----------+
|hwy|cty|     model|
+---+---+----------+
| 29| 18|        a4|
| 29| 21|        a4|
| 31| 20|        a4|
| 30| 21|        a4|
| 26| 16|        a4|
| 26| 18|        a4|
| 27| 18|        a4|
| 26| 18|a4 quattro|
| 25| 16|a4 quattro|
| 28| 20|a4 quattro|
+---+---+----------+
only showing top 10 rows



In [31]:
# we can also return a column taht represents the values from the 
# original + 1 added like so:

mpg.select(mpg.hwy, mpg.hwy + 1).show(5)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
| 31|       32|
| 30|       31|
| 26|       27|
+---+---------+
only showing top 5 rows



In [32]:
# we can use alias to change the name of the column

mpg.select(mpg.hwy.alias('highway_mailage')).show(5)

+---------------+
|highway_mailage|
+---------------+
|             29|
|             29|
|             31|
|             30|
|             26|
+---------------+
only showing top 5 rows



In [33]:
# store column objects in variables and reference them

col1 = mpg.hwy.alias('highway_milage')
col2 = (mpg.hwy / 2).alias('highway_milage_halved')
mpg.select(col1, col2).show(5)

+--------------+---------------------+
|highway_milage|highway_milage_halved|
+--------------+---------------------+
|            29|                 14.5|
|            29|                 14.5|
|            31|                 15.5|
|            30|                 15.0|
|            26|                 13.0|
+--------------+---------------------+
only showing top 5 rows



### We can also create columns with the <code>col</code>  and <code>expr</code> functions from <code>pyspark.sql.functions</code> module 

In [34]:
from pyspark.sql.functions import col, expr

### col

In [36]:
# test
col('hwy')

Column<'hwy'>

#### mix and match the syntax we use, and the column object produced by the <code>col</code> function is the same as the previous column object we saw

In [37]:
avg_column = (col('hwy') + col('cty')) / 2

mpg.select(
    col('hwy').alias('highway_milage'),
    mpg.cty.alias('city_milage'),
    avg_column.alias('avg_milage'),
).show(5)

+--------------+-----------+----------+
|highway_milage|city_milage|avg_milage|
+--------------+-----------+----------+
|            29|         18|      23.5|
|            29|         21|      25.0|
|            31|         20|      25.5|
|            30|         21|      25.5|
|            26|         16|      21.0|
+--------------+-----------+----------+
only showing top 5 rows



### expr

In [39]:
# returns the same type of column object, but alows us to express 
# manipulations to the column within the string that defines the column.

mpg.select(
    expr('hwy'), # is the same as 'col'
    expr('hwy + 1'), # an arithmetic expression
    expr('hwy AS highway_milage'), # using an alias
    expr('hwy + 1 AS highway_incremented'), #a combination of the above
).show(5)

+---+---------+--------------+-------------------+
|hwy|(hwy + 1)|highway_milage|highway_incremented|
+---+---------+--------------+-------------------+
| 29|       30|            29|                 30|
| 29|       30|            29|                 30|
| 31|       32|            31|                 32|
| 30|       31|            30|                 31|
| 26|       27|            26|                 27|
+---+---------+--------------+-------------------+
only showing top 5 rows



In [40]:
# example: all columns created below are identical, and which syntax to use
# is just a style choice.

mpg.select(
    mpg.hwy.alias('highway'),
    # or
    col('hwy').alias('highway'),
    # or
    expr('hwy').alias('highway'),
    # or
    expr('hwy AS highway')
).show(5)

+-------+-------+-------+-------+
|highway|highway|highway|highway|
+-------+-------+-------+-------+
|     29|     29|     29|     29|
|     29|     29|     29|     29|
|     31|     31|     31|     31|
|     30|     30|     30|     30|
|     26|     26|     26|     26|
+-------+-------+-------+-------+
only showing top 5 rows



## Spark SQL

In [41]:
# 'register' table with spark

mpg.createOrReplaceTempView('mpg')

In [42]:
# now we can write a sql query against the mpg table:

spark.sql(
    """
    SELECT hwy, cty, (hwy + cty) / 2 AS avg
    FROM mpg
    """

)

DataFrame[hwy: bigint, cty: bigint, avg: double]

In [43]:
# show it

spark.sql(
    """
    SELECT hwy, cty, (hwy + cty) / 2 AS avg
    FROM mpg
    """

).show(5)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
| 30| 21|25.5|
| 26| 16|21.0|
+---+---+----+
only showing top 5 rows



#### Type Casting

In [44]:
# we can view the types of the column in our dataframe in one of two ways:

# dtypes

mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [45]:
# printSchema

mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [46]:
# we can also convert from one to another using the .cast method

mpg.select(mpg.hwy.cast('string')).printSchema()

root
 |-- hwy: string (nullable = true)



In [47]:
# if its not able to be converted, it will be replaced with null

mpg.select(mpg.model, mpg.model.cast('int')).show(5)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 5 rows



#### Other Basic Built-in Functions

- <code> concat </code>: to concatenate strings
- <code> sum </code>: to sum a group
- <code> avg </code>: to take the average of a group
- <code> min </code>: to find the minimum
- <code> max </code>: to find the maximum

In [48]:
# import all pyspark.sql.functions rather than just the ones we will use

from pyspark.sql.functions import *

In [49]:
mpg.select(
    sum(mpg.hwy) / count(mpg.hwy).alias('average_1'),
    avg(mpg.hwy).alias('average_2'),
    min(mpg.hwy),
    max(mpg.hwy),

).show()

+--------------------------------------+-----------------+--------+--------+
|(sum(hwy) / count(hwy) AS `average_1`)|        average_2|min(hwy)|max(hwy)|
+--------------------------------------+-----------------+--------+--------+
|                     23.44017094017094|23.44017094017094|      12|      44|
+--------------------------------------+-----------------+--------+--------+



In [50]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



In [51]:
# In order to use a string literal as part of our select, we'll need to use the 'lit'
# function, otherwise spark will try to resolve our string as a column

mpg.select(concat(mpg.cyl, lit('cylinders'))).show(5)

+----------------------+
|concat(cyl, cylinders)|
+----------------------+
|            4cylinders|
|            4cylinders|
|            4cylinders|
|            4cylinders|
|            6cylinders|
+----------------------+
only showing top 5 rows



#### <code>regexp_extract</code> and <code>regexp_replace</code>

In [52]:
# create a dataframe with some text data

textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            'address': [
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

+---------------------------------------+
|address                                |
+---------------------------------------+
|3130 Broadway St, San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215  |
|1255 SW Loop 410, San Antonio, TX 78227|
+---------------------------------------+



In [55]:
# The regexp_extract function lets you specify a regular expression with at
# least one capture group, and create a new column based on the contents of 
# a capture group.

textdf.select(
    "address",
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

+---------------------------------------+---------+-----------+
|address                                |street_no|street     |
+---------------------------------------+---------+-----------+
|3130 Broadway St, San Antonio, TX 78209|3130     |Broadway St|
|303 Pearl Pkwy, San Antonio, TX 78215  |303      |Pearl Pkwy |
|1255 SW Loop 410, San Antonio, TX 78227|1255     |SW Loop 410|
+---------------------------------------+---------+-----------+



In [56]:
# regexp_replace lets us make substitutions based on a regular expression.

textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)

+---------------------------------------+---------------------+
|address                                |city_state_zip       |
+---------------------------------------+---------------------+
|3130 Broadway St, San Antonio, TX 78209|San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215  |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227|San Antonio, TX 78227|
+---------------------------------------+---------------------+



#### <code>.filter</code> and <code>.where</code>

In [57]:
# both allow us to select a subset of the rows of our dataframe

mpg.filter(mpg.cyl == 4).where(mpg['class'] == 'subcompact').show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

#### When and Otherwise

In [58]:
# similar to an 'IF' in Excel, CASE...WHEN in SQL, or np.where in python, 
# 'when' is sparks version

mpg.select(mpg.hwy, when(mpg.hwy > 25, 'good_mileage').alias('mpg_desc')).show(12)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25|        null|
| 28|good_mileage|
| 27|good_mileage|
| 25|        null|
+---+------------+
only showing top 12 rows



In [59]:
# change the null value to say 'bad_mileage'

mpg.select(
    mpg.hwy,
    when(mpg.hwy > 25, 'good_mileage')
    .otherwise('bad_mileage')
    .alias('mpg_desc')

).show(12)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25| bad_mileage|
| 28|good_mileage|
| 27|good_mileage|
| 25| bad_mileage|
+---+------------+
only showing top 12 rows



In [60]:
# you can use .when to specify multiple conditions

mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, 'small')
        .when(mpg.displ < 3, 'medium')
        .otherwise('large')
        .alias('engine_size')
    
    )
).show(12)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
+-----+-----------+
only showing top 12 rows



### Sorting and Ordering

Spark lets us sort the rows in our dataframe by one or multiple columns with two methods: <code>.sort</code>, and <code>.orderBy</code>. <code>.sort</code> and <code>.orderBy</code> are aliases of each other and do the exact same thing. Like other methods we've seen, <code>.sort</code> takes in a Column object or a string that is the name of a column.

In [61]:
mpg.sort(mpg.hwy).show(8)

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|   chevrolet| c1500 suburban 2wd|  5.3|2008|  8|  auto(l4)|  r| 11| 15|  e|   suv|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

#### Ascending and Descending orders

In [62]:
mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col("hwy").desc())
# same as
mpg.sort(desc('hwy')).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



In [64]:
# to specify sorting by multiple columns, we provide each column 
# as a separate argument to .sort

mpg.sort(desc('class'), mpg.cyl.asc(), col('hwy').desc()).show()

#reverse alphabetically byt the vehicles class, then by the number of
# cylinders from lowest to highest, then by the vehicles highway mileage
# from greatest to smallest.

+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 20| 27|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 20| 26|  r|  suv|
|      subaru|      forester awd|  2.5|1999|  4|manual(m5)|  4| 18| 25|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 19| 25|  p|  suv|
|      subaru|      forester awd|  2.5|1999|  4|  auto(l4)|  4| 18| 24|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 18| 23|  p|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|  auto(l4)|  4| 16| 20|  r|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|manual(m5)|  4| 15| 20|  r|  suv|
|        jeep|grand cherokee 4wd|  3.0|2008|  6|  auto(l5)|  4| 17| 22|  d|  suv|
|      toyota|  

### Grouping and Aggregating

To aggregate our data by group, we can use the <code>.groupBy</code> method. Like with <code>.select</code>, we can pass either Column objects or strings that are column names to <code>.groupBy</code>. All of the expressions below are equivalent.

In [66]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(col('cyl'))
mpg.groupBy('cyl')

<pyspark.sql.group.GroupedData at 0x7fd0e8b12c70>

In [68]:
# once grouped, I need to spicify an agregation. We can use one of
# the aggregate functions we imported earlier, along with a column:

mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+------------------+-----------------+
|cyl|          avg(cty)|         avg(hwy)|
+---+------------------+-----------------+
|  6| 16.21518987341772|22.82278481012658|
|  5|              20.5|            28.75|
|  8|12.571428571428571|17.62857142857143|
|  4|21.012345679012345|28.80246913580247|
+---+------------------+-----------------+



In [69]:
# To group by multiple columns, pass each of the columns a a separate 
# argument to .groupBy (Note that this is different from pandas, where
# we would need to pass a list).

mpg.groupBy('cyl', 'class').agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+----------+------------------+------------------+
|cyl|     class|          avg(cty)|          avg(hwy)|
+---+----------+------------------+------------------+
|  5|   compact|              21.0|              29.0|
|  5|subcompact|              20.0|              28.5|
|  6|subcompact|              17.0|24.714285714285715|
|  6|    pickup|              14.5|              17.9|
|  4|subcompact|22.857142857142858| 30.80952380952381|
|  8|       suv|12.131578947368421|16.789473684210527|
|  8|    pickup|              11.8|              15.8|
|  8|   midsize|              16.0|              24.0|
|  4|   midsize|              20.5|           29.1875|
|  8|   2seater|              15.4|              24.8|
|  6|   compact|16.923076923076923|25.307692307692307|
|  6|   minivan|              15.6|              22.2|
|  4|   compact|            21.375|          29.46875|
|  8|subcompact|              14.8|              21.6|
|  6|   midsize|17.782608695652176| 26.26086956521739|
|  4|   mi

In [70]:
# I can also use .rollup, which will do the same aggregations, but 
# will also include the overall total:

mpg.rollup('cyl').count().sort('cyl').show()

# here the null shows the total count

+----+-----+
| cyl|count|
+----+-----+
|null|  234|
|   4|   81|
|   5|    4|
|   6|   79|
|   8|   70|
+----+-----+



In [71]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()
# the null here will show the average highway milage

+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+



In [72]:
mpg.rollup("cyl", "class").mean("hwy").sort(col("cyl"), col("class")).show()

+----+----------+------------------+
| cyl|     class|          avg(hwy)|
+----+----------+------------------+
|null|      null| 23.44017094017094|
|   4|      null| 28.80246913580247|
|   4|   compact|          29.46875|
|   4|   midsize|           29.1875|
|   4|   minivan|              24.0|
|   4|    pickup|20.666666666666668|
|   4|subcompact| 30.80952380952381|
|   4|       suv|             23.75|
|   5|      null|             28.75|
|   5|   compact|              29.0|
|   5|subcompact|              28.5|
|   6|      null| 22.82278481012658|
|   6|   compact|25.307692307692307|
|   6|   midsize| 26.26086956521739|
|   6|   minivan|              22.2|
|   6|    pickup|              17.9|
|   6|subcompact|24.714285714285715|
|   6|       suv|              18.5|
|   8|      null| 17.62857142857143|
|   8|   2seater|              24.8|
+----+----------+------------------+
only showing top 20 rows



### Crosstabs and Pivot Tables

In [73]:
# The spark .crosstab function calculates the number of
# occurances of each unique value from the two passed columns:

mpg.crosstab('class', 'cyl').show()

+----------+---+---+---+---+
| class_cyl|  4|  5|  6|  8|
+----------+---+---+---+---+
|   midsize| 16|  0| 23|  2|
|subcompact| 21|  2|  7|  5|
|   2seater|  0|  0|  0|  5|
|    pickup|  3|  0| 10| 20|
|   minivan|  1|  0| 10|  0|
|       suv|  8|  0| 16| 38|
|   compact| 32|  2| 13|  0|
+----------+---+---+---+---+



<code>.crosstab</code> simply does counts, if we want a different aggregation, we can use <code>.pivot</code>. For example, to find the average highway mileage for each combination of car class and number of cylinders, we could do the following:

In [75]:
mpg.groupby('class').pivot('cyl').mean('hwy').show()
# groupby class - pivot by cylinder - average by highway mileage

+----------+------------------+----+------------------+------------------+
|     class|                 4|   5|                 6|                 8|
+----------+------------------+----+------------------+------------------+
|subcompact| 30.80952380952381|28.5|24.714285714285715|              21.6|
|   compact|          29.46875|29.0|25.307692307692307|              null|
|   minivan|              24.0|null|              22.2|              null|
|       suv|             23.75|null|              18.5|16.789473684210527|
|   midsize|           29.1875|null| 26.26086956521739|              24.0|
|    pickup|20.666666666666668|null|              17.9|              15.8|
|   2seater|              null|null|              null|              24.8|
+----------+------------------+----+------------------+------------------+



### Handle Missing Data

In [79]:
# First I'll create a dataframe that has a few missing values

df = spark.createDataFrame(
    pd.DataFrame(
        {'x': [1, 2, np.nan, 4, 5, np.nan], 'y': [np.nan, 0, 0, 3, 1, np.nan]}
    )

)
df.show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
|NaN|NaN|
+---+---+



In [80]:
# to handle the missing values, we can either .fill or .drop
# Both methods are accessed through the .na property.

df.na.drop().show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



In [82]:
df.na.fill(0).show()
# fill with '0'

+---+---+
|  x|  y|
+---+---+
|1.0|0.0|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|0.0|
+---+---+



In [84]:
# or we can use both

df.na.drop(subset='y').show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



### DataFrame Transformations

The <code>.explain</code> method will show how spark is thinking about our dataframe.

In [85]:
# Basic example - one step
mpg.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[manufacturer#621,model#622,displ#623,year#624L,cyl#625L,trans#626,drv#627,cty#628L,hwy#629L,fl#630,class#631]




In [86]:
mpg.select(mpg.cyl, mpg.hwy).explain()

# Notice now that there is another step after "Scan ExistingRDD",
# a "Project" that contains the names of the columns we are looking for

== Physical Plan ==
*(1) Project [cyl#625L, hwy#629L]
+- *(1) Scan ExistingRDD[manufacturer#621,model#622,displ#623,year#624L,cyl#625L,trans#626,drv#627,cty#628L,hwy#629L,fl#630,class#631]




In [87]:
# Here we will do advanced calculation, but this is still just a single step to spark

mpg.select(((mpg.cyl + mpg.hwy) / 2).alias('avg_mpg')).explain()

== Physical Plan ==
*(1) Project [(cast((cyl#625L + hwy#629L) as double) / 2.0) AS avg_mpg#2542]
+- *(1) Scan ExistingRDD[manufacturer#621,model#622,displ#623,year#624L,cyl#625L,trans#626,drv#627,cty#628L,hwy#629L,fl#630,class#631]




In [88]:
# filter also in a single step in spark

mpg.filter(mpg.cyl == 6).explain()

== Physical Plan ==
*(1) Filter (isnotnull(cyl#625L) AND (cyl#625L = 6))
+- *(1) Scan ExistingRDD[manufacturer#621,model#622,displ#623,year#624L,cyl#625L,trans#626,drv#627,cty#628L,hwy#629L,fl#630,class#631]




Even though we specified the transformations(<code>.select</code> and <code>.filter</code>) in a different order, we end up with the same output when we call <code>.explain</code>. This is because spark will look at our dataframe and transform it into the most efficient representation possible.

In [90]:
mpg.select('cyl', 'hwy').filter(expr('cyl = 6')).explain()
mpg.filter(expr('cyl = 6')).select('cyl', 'hwy').explain()

== Physical Plan ==
*(1) Project [cyl#625L, hwy#629L]
+- *(1) Filter (isnotnull(cyl#625L) AND (cyl#625L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#621,model#622,displ#623,year#624L,cyl#625L,trans#626,drv#627,cty#628L,hwy#629L,fl#630,class#631]


== Physical Plan ==
*(1) Project [cyl#625L, hwy#629L]
+- *(1) Filter (isnotnull(cyl#625L) AND (cyl#625L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#621,model#622,displ#623,year#624L,cyl#625L,trans#626,drv#627,cty#628L,hwy#629L,fl#630,class#631]




In [91]:
# Here we have 2 seperate select statements, but spark will condense this down to a 
# single Project, as it is smart enough to realize that it doesn't actually need to 
# do all the arithmetic we specified in the first select, since we arent using that 
# value later on.

mpg.select(min(mpg.cyl)).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[min(cyl#625L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#918]
   +- *(1) HashAggregate(keys=[], functions=[partial_min(cyl#625L)])
      +- *(1) Project [cyl#625L]
         +- *(1) Scan ExistingRDD[manufacturer#621,model#622,displ#623,year#624L,cyl#625L,trans#626,drv#627,cty#628L,hwy#629L,fl#630,class#631]




In [92]:
# to calculate a minimum, we have to look at all the rows in the dataset to find the smallest.

mpg.groupby(mpg.cyl).agg(min(mpg.hwy), max(mpg.hwy)).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[cyl#625L], functions=[min(hwy#629L), max(hwy#629L)])
+- Exchange hashpartitioning(cyl#625L, 200), ENSURE_REQUIREMENTS, [id=#939]
   +- *(1) HashAggregate(keys=[cyl#625L], functions=[partial_min(hwy#629L), partial_max(hwy#629L)])
      +- *(1) Project [cyl#625L, hwy#629L]
         +- *(1) Scan ExistingRDD[manufacturer#621,model#622,displ#623,year#624L,cyl#625L,trans#626,drv#627,cty#628L,hwy#629L,fl#630,class#631]




In [None]:
(
    mpg.select(col('cyl'), exp('(cty + hwy) / 2 AS avg_mpg'))
    .filter(expr('class == "compact"))
)