In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [3]:
import pandas as pd
import numpy as np

np.random.seed(456)

pandas_dataframe = pd.DataFrame(
    dict(n=np.arange(20), group=np.random.choice(list("abc"), 20))
)
pandas_dataframe.head()

Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c
3,3,a
4,4,c


In [4]:
df = spark.createDataFrame(pandas_dataframe)
df

DataFrame[n: bigint, group: string]

In [5]:
df.head()

Row(n=0, group='b')

In [6]:
df.show()

+---+-----+
|  n|group|
+---+-----+
|  0|    b|
|  1|    b|
|  2|    c|
|  3|    a|
|  4|    c|
|  5|    c|
|  6|    a|
|  7|    b|
|  8|    a|
|  9|    b|
| 10|    b|
| 11|    a|
| 12|    b|
| 13|    a|
| 14|    b|
| 15|    b|
| 16|    c|
| 17|    c|
| 18|    a|
| 19|    c|
+---+-----+



# Read from Files

In [7]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



# Columns

In [10]:
mpg.model

Column<b'model'>

**Select Columns**

In [11]:
mpg.select(mpg.hwy, mpg.cty, mpg.model)

DataFrame[hwy: bigint, cty: bigint, model: string]

In [12]:
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(5)

+---+---+-----+
|hwy|cty|model|
+---+---+-----+
| 29| 18|   a4|
| 29| 21|   a4|
| 31| 20|   a4|
| 30| 21|   a4|
| 26| 16|   a4|
+---+---+-----+
only showing top 5 rows



In [14]:
mpg.select(mpg.hwy, mpg.hwy +1).show(3)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
| 31|       32|
+---+---------+
only showing top 3 rows



In [16]:
mpg.select(mpg.hwy.alias('highway_mileage'), 
                         (mpg.hwy +1).alias('highway_mileage_plus1')).show(3)

+---------------+---------------------+
|highway_mileage|highway_mileage_plus1|
+---------------+---------------------+
|             29|                   30|
|             29|                   30|
|             31|                   32|
+---------------+---------------------+
only showing top 3 rows



In [17]:
col1 = mpg.hwy.alias('highway_mileage')
col2 = (mpg.hwy/2).alias('highway_mileage_halved')

mpg.select(col1, col2).show(3)

+---------------+----------------------+
|highway_mileage|highway_mileage_halved|
+---------------+----------------------+
|             29|                  14.5|
|             29|                  14.5|
|             31|                  15.5|
+---------------+----------------------+
only showing top 3 rows



In [21]:
from pyspark.sql.functions import col, expr

col('hwy')

Column<b'hwy'>

In [22]:
mpg.hwy

Column<b'hwy'>

In [23]:
avg_col = (col('hwy') + col('cty')) / 2

In [24]:
mpg.select(
    col('hwy').alias('hwy_mileage'),
    mpg.cty.alias('cty_mileage'),
    avg_col.alias('avg_mileage')
).show(3)

+-----------+-----------+-----------+
|hwy_mileage|cty_mileage|avg_mileage|
+-----------+-----------+-----------+
|         29|         18|       23.5|
|         29|         21|       25.0|
|         31|         20|       25.5|
+-----------+-----------+-----------+
only showing top 3 rows



In [26]:
mpg.select(
    expr('hwy'), # the same as 'col'
    expr('hwy + 1'), # arthimetic expression col('hwy') + 1
    expr('hwy AS hwy_mileage'), # using alias col('hwy').alias('hwy_mileage')
    expr('hwy + 1 AS hwy_incremented'), # a combo of the 2 above.
).show(3)

+---+---------+-----------+---------------+
|hwy|(hwy + 1)|hwy_mileage|hwy_incremented|
+---+---------+-----------+---------------+
| 29|       30|         29|             30|
| 29|       30|         29|             30|
| 31|       32|         31|             32|
+---+---------+-----------+---------------+
only showing top 3 rows



In [28]:
mpg.select(
    mpg.hwy.alias('highway'),
    col('hwy').alias('highway'),
    expr('hwy').alias('highway'),
    expr('hwy AS highway')
).show(3)

+-------+-------+-------+-------+
|highway|highway|highway|highway|
+-------+-------+-------+-------+
|     29|     29|     29|     29|
|     29|     29|     29|     29|
|     31|     31|     31|     31|
+-------+-------+-------+-------+
only showing top 3 rows



# Spark SQL

In [29]:
mpg.createOrReplaceTempView('mpg_view')

In [30]:
spark.sql(
    """
    SELECT hwy, cty, (hwy + cty)/2 AS avg
    FROM mpg_view
    """
).show(3)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
+---+---+----+
only showing top 3 rows



# Type Casting

In [31]:
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [32]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [33]:
mpg.select(mpg.hwy.cast('string')).printSchema()

root
 |-- hwy: string (nullable = true)



In [34]:
mpg.select(mpg.model, mpg.model.cast('int')).show(3)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 3 rows



# Basic Built-in Functions

In [39]:
from pyspark.sql.functions import round, concat, sum, min, max, count, avg, mean

In [40]:
mpg.select(
    (sum(mpg.hwy) / count(mpg.hwy)).alias('avg1'),
    round(avg(mpg.hwy), 2).alias('avg2'),
    mean(mpg.hwy).alias('avg3'),
    min(mpg.hwy),
    max(mpg.hwy)
).show(3)

+-----------------+-----+-----------------+--------+--------+
|             avg1| avg2|             avg3|min(hwy)|max(hwy)|
+-----------------+-----+-----------------+--------+--------+
|23.44017094017094|23.44|23.44017094017094|      12|      44|
+-----------------+-----+-----------------+--------+--------+



In [41]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(3)

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 3 rows



In [42]:
from pyspark.sql.functions import lit

In [43]:
mpg.select(concat(mpg.cyl, lit('cylinders')).alias('cylinders')).show(3)

+----------+
| cylinders|
+----------+
|4cylinders|
|4cylinders|
|4cylinders|
+----------+
only showing top 3 rows



# String Manipulation PySpark Functions

In [44]:
from pyspark.sql.functions import regexp_extract, regexp_replace


In [45]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



In [47]:
textdf.select('address', 
              regexp_extract('address',
                             r"^(\d+)", 1).alias('street_no'),
              regexp_extract('address', r"^\d+\s([\w\s]+?),", 1).alias('street')
             ).show(truncate=False)
              


+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+



In [48]:
textdf.select('address',
             regexp_replace('address',
                           r"^.*?,\s*", "").alias('city_state_zip')
             ).show(truncate=False)

+---------------------------------------------+---------------------+
|address                                      |city_state_zip       |
+---------------------------------------------+---------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215        |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227      |San Antonio, TX 78227|
+---------------------------------------------+---------------------+



In [51]:
mpg.filter(mpg.cyl==4).where(mpg["class"]=="subcompact").show(3)

+------------+-----+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----+-----+----+---+----------+---+---+---+---+----------+
|       honda|civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
+------------+-----+-----+----+---+----------+---+---+---+---+----------+
only showing top 3 rows



In [52]:
from pyspark.sql.functions import when


In [54]:
mpg.select(mpg.hwy, 
           when(mpg.hwy > 25, "good_mileage")
           .otherwise('bad_mileage')
           .alias("mpg_class")).show(
    12
)

+---+------------+
|hwy|   mpg_class|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25| bad_mileage|
| 28|good_mileage|
| 27|good_mileage|
| 25| bad_mileage|
+---+------------+
only showing top 12 rows



In [55]:
mpg.select(
    mpg.displ,
    (when(mpg.displ < 2, 'small')
     .when(mpg.displ < 3, 'medium')
     .otherwise('large')
     .alias('engine_size')
    )).show(10)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
+-----+-----------+
only showing top 10 rows



# Sorting and Ordering

In [56]:
mpg.sort(mpg.hwy).show(8)

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|       dodge|  dakota pickup 4wd|  5.2|1999|  8|  auto(l4)|  4| 11| 15|  r|pickup|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

In [57]:
from pyspark.sql.functions import asc, desc

mpg.sort(mpg.hwy.desc())
# is the same as...
mpg.sort(col('hwy').desc())
# is the same as ..
mpg.sort(desc('hwy')).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



In [62]:
mpg.select(mpg['class'], mpg.cyl, mpg.hwy).sort(desc("class"), mpg.cyl.desc(), col("hwy").desc()).show()


+-----+---+---+
|class|cyl|hwy|
+-----+---+---+
|  suv|  8| 20|
|  suv|  8| 20|
|  suv|  8| 19|
|  suv|  8| 19|
|  suv|  8| 19|
|  suv|  8| 19|
|  suv|  8| 18|
|  suv|  8| 18|
|  suv|  8| 18|
|  suv|  8| 18|
|  suv|  8| 18|
|  suv|  8| 18|
|  suv|  8| 18|
|  suv|  8| 18|
|  suv|  8| 17|
|  suv|  8| 17|
|  suv|  8| 17|
|  suv|  8| 17|
|  suv|  8| 17|
|  suv|  8| 17|
+-----+---+---+
only showing top 20 rows



# Grouping and Aggregating

In [63]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(col('cyl'))
mpg.groupBy('cyl')

<pyspark.sql.group.GroupedData at 0x7fcd52282c10>

In [65]:
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show(5)

+---+------------------+-----------------+
|cyl|          avg(cty)|         avg(hwy)|
+---+------------------+-----------------+
|  6| 16.21518987341772|22.82278481012658|
|  5|              20.5|            28.75|
|  8|12.571428571428571|17.62857142857143|
|  4|21.012345679012345|28.80246913580247|
+---+------------------+-----------------+



In [66]:
mpg.groupBy('cyl', 'class').agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+----------+------------------+------------------+
|cyl|     class|          avg(cty)|          avg(hwy)|
+---+----------+------------------+------------------+
|  5|   compact|              21.0|              29.0|
|  5|subcompact|              20.0|              28.5|
|  6|subcompact|              17.0|24.714285714285715|
|  6|    pickup|              14.5|              17.9|
|  4|subcompact|22.857142857142858| 30.80952380952381|
|  8|       suv|12.131578947368421|16.789473684210527|
|  8|    pickup|              11.8|              15.8|
|  8|   midsize|              16.0|              24.0|
|  4|   midsize|              20.5|           29.1875|
|  8|   2seater|              15.4|              24.8|
|  6|   compact|16.923076923076923|25.307692307692307|
|  6|   minivan|              15.6|              22.2|
|  4|   compact|            21.375|          29.46875|
|  8|subcompact|              14.8|              21.6|
|  6|   midsize|17.782608695652176| 26.26086956521739|
|  4|   mi

In [67]:
mpg.rollup('cyl').count().sort('cyl').show()

+----+-----+
| cyl|count|
+----+-----+
|null|  234|
|   4|   81|
|   5|    4|
|   6|   79|
|   8|   70|
+----+-----+



In [69]:
mpg.rollup('cyl').agg(expr('avg(hwy)')).sort('cyl').show()

# these are the same

mpg.rollup('cyl').agg(avg(mpg.hwy)).sort('cyl').show()


+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+

+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+



In [70]:
mpg.rollup('cyl', 'class').mean('hwy').sort(col('cyl'), col('class')
                                           ).show()

+----+----------+------------------+
| cyl|     class|          avg(hwy)|
+----+----------+------------------+
|null|      null| 23.44017094017094|
|   4|      null| 28.80246913580247|
|   4|   compact|          29.46875|
|   4|   midsize|           29.1875|
|   4|   minivan|              24.0|
|   4|    pickup|20.666666666666668|
|   4|subcompact| 30.80952380952381|
|   4|       suv|             23.75|
|   5|      null|             28.75|
|   5|   compact|              29.0|
|   5|subcompact|              28.5|
|   6|      null| 22.82278481012658|
|   6|   compact|25.307692307692307|
|   6|   midsize| 26.26086956521739|
|   6|   minivan|              22.2|
|   6|    pickup|              17.9|
|   6|subcompact|24.714285714285715|
|   6|       suv|              18.5|
|   8|      null| 17.62857142857143|
|   8|   2seater|              24.8|
+----+----------+------------------+
only showing top 20 rows



# Crosstabs and Pivot Tables

In [71]:
mpg.crosstab('class', 'cyl').show()

+----------+---+---+---+---+
| class_cyl|  4|  5|  6|  8|
+----------+---+---+---+---+
|   midsize| 16|  0| 23|  2|
|subcompact| 21|  2|  7|  5|
|   2seater|  0|  0|  0|  5|
|    pickup|  3|  0| 10| 20|
|   minivan|  1|  0| 10|  0|
|       suv|  8|  0| 16| 38|
|   compact| 32|  2| 13|  0|
+----------+---+---+---+---+



In [72]:
mpg.groupBy('class').pivot('cyl').mean('hwy').sort(col('class')).show()

+----------+------------------+----+------------------+------------------+
|     class|                 4|   5|                 6|                 8|
+----------+------------------+----+------------------+------------------+
|   2seater|              null|null|              null|              24.8|
|   compact|          29.46875|29.0|25.307692307692307|              null|
|   midsize|           29.1875|null| 26.26086956521739|              24.0|
|   minivan|              24.0|null|              22.2|              null|
|    pickup|20.666666666666668|null|              17.9|              15.8|
|subcompact| 30.80952380952381|28.5|24.714285714285715|              21.6|
|       suv|             23.75|null|              18.5|16.789473684210527|
+----------+------------------+----+------------------+------------------+



In [74]:
mpg.groupBy('class', 'cyl').mean('hwy').sort(col('class')).show()

+----------+---+------------------+
|     class|cyl|          avg(hwy)|
+----------+---+------------------+
|   2seater|  8|              24.8|
|   compact|  6|25.307692307692307|
|   compact|  4|          29.46875|
|   compact|  5|              29.0|
|   midsize|  4|           29.1875|
|   midsize|  8|              24.0|
|   midsize|  6| 26.26086956521739|
|   minivan|  4|              24.0|
|   minivan|  6|              22.2|
|    pickup|  6|              17.9|
|    pickup|  4|20.666666666666668|
|    pickup|  8|              15.8|
|subcompact|  6|24.714285714285715|
|subcompact|  8|              21.6|
|subcompact|  4| 30.80952380952381|
|subcompact|  5|              28.5|
|       suv|  4|             23.75|
|       suv|  6|              18.5|
|       suv|  8|16.789473684210527|
+----------+---+------------------+



# Handling Missing Data

In [75]:
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
|NaN|NaN|
+---+---+



In [76]:
df.na.drop().show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



In [77]:
df.na.fill(0).show()

+---+---+
|  x|  y|
+---+---+
|1.0|0.0|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|0.0|
+---+---+



In [78]:
df.na.fill(0, subset='x').na.fill(-1, subset='y').show()

+---+----+
|  x|   y|
+---+----+
|1.0|-1.0|
|2.0| 0.0|
|0.0| 0.0|
|4.0| 3.0|
|5.0| 1.0|
|0.0|-1.0|
+---+----+



In [79]:
df.na.drop(subset='y').show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



# DataFrame Transformations

In [80]:
mpg.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[manufacturer#13,model#14,displ#15,year#16L,cyl#17L,trans#18,drv#19,cty#20L,hwy#21L,fl#22,class#23]




In [81]:
mpg.select(mpg.cyl, mpg.hwy).explain()

== Physical Plan ==
*(1) Project [cyl#17L, hwy#21L]
+- *(1) Scan ExistingRDD[manufacturer#13,model#14,displ#15,year#16L,cyl#17L,trans#18,drv#19,cty#20L,hwy#21L,fl#22,class#23]




In [82]:
mpg.groupBy(mpg.cyl).agg(min(mpg.hwy), max(mpg.hwy)).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[cyl#17L], functions=[min(hwy#21L), max(hwy#21L)])
+- Exchange hashpartitioning(cyl#17L, 200), true, [id=#857]
   +- *(1) HashAggregate(keys=[cyl#17L], functions=[partial_min(hwy#21L), partial_max(hwy#21L)])
      +- *(1) Project [cyl#17L, hwy#21L]
         +- *(1) Scan ExistingRDD[manufacturer#13,model#14,displ#15,year#16L,cyl#17L,trans#18,drv#19,cty#20L,hwy#21L,fl#22,class#23]




In [84]:
(
    mpg.select(col("cyl"), expr("(cty + hwy) / 2 AS avg_mpg"))
    .filter(expr('class == "compact"'))
    .groupby("cyl")
    .agg(min("avg_mpg"), avg("avg_mpg"), max("avg_mpg"))
    .explain()
)

== Physical Plan ==
*(2) HashAggregate(keys=[cyl#17L], functions=[min(avg_mpg#1758), avg(avg_mpg#1758), max(avg_mpg#1758)])
+- Exchange hashpartitioning(cyl#17L, 200), true, [id=#882]
   +- *(1) HashAggregate(keys=[cyl#17L], functions=[partial_min(avg_mpg#1758), partial_avg(avg_mpg#1758), partial_max(avg_mpg#1758)])
      +- *(1) Project [cyl#17L, (cast((cty#20L + hwy#21L) as double) / 2.0) AS avg_mpg#1758]
         +- *(1) Filter (isnotnull(class#23) AND (class#23 = compact))
            +- *(1) Scan ExistingRDD[manufacturer#13,model#14,displ#15,year#16L,cyl#17L,trans#18,drv#19,cty#20L,hwy#21L,fl#22,class#23]




In [86]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [87]:
min_date, max_date = weather.select(min("date"), max("date")).first()
min_date, max_date

('2012-01-01', '2015-12-31')

In [88]:
from pyspark.sql.functions import month, year, quarter


In [89]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4|             375.4|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12|             622.7|
+-----+------------------+

