In [117]:
import pyspark
from pyspark.sql.functions import *

from pydataset import data
from vega_datasets import data as vega

import pandas as pd
import numpy as np

In [4]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/09 12:58:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
#1
df = spark.createDataFrame(pd.DataFrame(
    {'language': ['Python',
                  'R',
                  'MATLAB']
    })
                          )
#shape
print((df.count(), len(df.columns)))

#schema
print(df.schema)

df.show()

                                                                                

(3, 1)
StructType(List(StructField(language,StringType,true)))
+--------+
|language|
+--------+
|  Python|
|       R|
|  MATLAB|
+--------+



In [118]:
#datasets
mpg = spark.createDataFrame(data('mpg'))
tips = spark.createDataFrame(data('tips'))
sea = spark.createDataFrame(
    vega.seattle_weather().assign(
        date=lambda df: df.date.astype(str)
    ))
sea = sea.withColumn('month', month('date'))\
            .withColumn('year', year('date'))\
            .withColumn('quarter', quarter('date'))

In [23]:
#2
mpg.select(concat(lit('The '),
                  mpg.year,
                  lit(' '),
                  mpg.manufacturer,
                  lit(' '),
                  mpg.model,
                  lit(' has a '),
                  mpg.cyl,
                  lit(' cylinder engine.')\
                 ).alias('Message')\
          ).show()

+--------------------+
|             Message|
+--------------------+
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a6 ...|
|The 2008 audi a6 ...|
|The 2008 audi a6 ...|
|The 2008 chevrole...|
|The 2008 chevrole...|
+--------------------+
only showing top 20 rows



In [25]:
#transform the trans column so that it only contains either manual or auto
mpg.select('trans',
           regexp_replace('trans', r'\(\w+\)$', '')\
           .alias('transmission')\
          ).show()

+----------+------------+
|     trans|transmission|
+----------+------------+
|  auto(l5)|        auto|
|manual(m5)|      manual|
|manual(m6)|      manual|
|  auto(av)|        auto|
|  auto(l5)|        auto|
|manual(m5)|      manual|
|  auto(av)|        auto|
|manual(m5)|      manual|
|  auto(l5)|        auto|
|manual(m6)|      manual|
|  auto(s6)|        auto|
|  auto(l5)|        auto|
|manual(m5)|      manual|
|  auto(s6)|        auto|
|manual(m6)|      manual|
|  auto(l5)|        auto|
|  auto(s6)|        auto|
|  auto(s6)|        auto|
|  auto(l4)|        auto|
|  auto(l4)|        auto|
+----------+------------+
only showing top 20 rows



In [48]:
#3
#percentage of observations smokers
print(tips.filter(tips.smoker=='Yes').count() / tips.count())
#column that contains the tip percentage
tips = tips.select('*',
                   (tips.tip/tips.total_bill)\
                   .alias('tip%'))
#average tip percentage for each combination of sex, smoker
tips.groupby('sex').pivot('smoker').mean('tip%').show()

0.38114754098360654
+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941035|
|  Male|0.1606687151291298| 0.1527711752024851|
+------+------------------+-------------------+



In [103]:
#4
#November Rain
(sea.groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("total_rainfall")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    7|              48.2|
|    6|             132.9|
|    8|             163.7|
|    5|             207.5|
|    9|235.49999999999997|
|    4|             375.4|
|    2|             422.0|
|    1|465.99999999999994|
|   10|             503.4|
|    3|             606.2|
|   12|             622.7|
|   11|             642.5|
+-----+------------------+



In [102]:
(sea.groupBy("year")
    .agg(sum("wind").alias("total_wind"))
    .sort("total_wind")
    .show()
)

+----+------------------+
|year|        total_wind|
+----+------------------+
|2013|1100.8000000000002|
|2015|            1153.3|
|2014|1236.5000000000005|
|2012|1244.6999999999998|
+----+------------------+



In [87]:
#A Foggy Day in Vietnam
sea.crosstab('month', 'weather').show()

+-------------+-------+---+----+----+---+
|month_weather|drizzle|fog|rain|snow|sun|
+-------------+-------+---+----+----+---+
|            5|      1| 25|  16|   0| 82|
|           10|      4| 55|  20|   0| 45|
|            1|     10| 38|  35|   8| 33|
|            6|      2| 14|  19|   0| 85|
|            9|      5| 40|   4|   0| 71|
|            2|      4| 36|  40|   3| 30|
|           12|      2| 54|  23|   5| 40|
|            7|      8| 13|  14|   0| 89|
|            3|      3| 36|  37|   6| 42|
|           11|      3| 50|  25|   0| 42|
|            8|      8| 16|   6|   0| 94|
|            4|      4| 34|  20|   1| 61|
+-------------+-------+---+----+----+---+



In [133]:
#percentage of days rainy q3 of 2015
r = sea.filter(expr('year == 2015 AND quarter == 3'))
r.where(r.weather=='rain').count() / r.count()

0.021739130434782608

In [132]:
#percentage of days it rained by year
for y in range(2012, 2016):
    print(y, sea.where(expr(f'year=={y}'))\
          .where(expr('precipitation>0')).count()/365)

2012 0.4849315068493151
2013 0.41643835616438357
2014 0.410958904109589
2015 0.39452054794520547
