## PySpark-Basic

**PySpark** is the Python API for **Spark**. **Spark** is an open-source distributed general-purpose cluster-computing framework. Core of **Spark** is **Resilient Distributed Dataset (RDD)**. **RDD** has 4 main features:

1. Distributed collection of data.
2. Fault tolerance.
3. Parallel operation.
4. Ability to use many data sources.

There are two main operations in **Spark**, transformations and actions. 

Transformations are recipe to follow. Actions actually perform what the recipe says to do so and return a result back.

In the notebook below we will demonstrate some uses of **PySpark**.

In [1]:
#Lets import PySpark
from pyspark.sql import SparkSession

In [2]:
#Lets start a spark session
spark = SparkSession.builder.appName('basic').getOrCreate()

Load Seoul bike data

https://archive.ics.uci.edu/ml/datasets/Seoul+Bike+Sharing+Demand

In [3]:
df = spark.read.csv('../data/SeoulBikeData/SeoulBikeData.csv',
                    header=True)

In [4]:
#Show the data
df.show()

+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+
|      Date|Rented Bike Count|Hour|Temperature(�C)|Humidity(%)|Wind speed (m/s)|Visibility (10m)|Dew point temperature(�C)|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|Functioning Day|
+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+
|01/12/2017|              254|   0|           -5.2|         37|             2.2|            2000|                    -17.6|                      0|           0|            0| Winter|No Holiday|            Yes|
|01/12/2017|              204|   1|           -5.5|         38|             0.8|            2000|                    -17.6|                      0|           0|

In [5]:
#printSchema
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Rented Bike Count: string (nullable = true)
 |-- Hour: string (nullable = true)
 |-- Temperature(�C): string (nullable = true)
 |-- Humidity(%): string (nullable = true)
 |-- Wind speed (m/s): string (nullable = true)
 |-- Visibility (10m): string (nullable = true)
 |-- Dew point temperature(�C): string (nullable = true)
 |-- Solar Radiation (MJ/m2): string (nullable = true)
 |-- Rainfall(mm): string (nullable = true)
 |-- Snowfall (cm): string (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- Holiday: string (nullable = true)
 |-- Functioning Day: string (nullable = true)



Note Spark infered all the columns as strings, below we will convert numeric columns to numeric types.

In [6]:
#Get the column names
print(df.columns)

#Rename some Columns
df = df.withColumnRenamed('Temperature(�C)','Temperature') \
        .withColumnRenamed('Humidity(%)','Humidity') \
        .withColumnRenamed('Dew point temperature(�C)', 'Dew point temperature')

['Date', 'Rented Bike Count', 'Hour', 'Temperature(�C)', 'Humidity(%)', 'Wind speed (m/s)', 'Visibility (10m)', 'Dew point temperature(�C)', 'Solar Radiation (MJ/m2)', 'Rainfall(mm)', 'Snowfall (cm)', 'Seasons', 'Holiday', 'Functioning Day']


In [7]:
print(df.columns)

['Date', 'Rented Bike Count', 'Hour', 'Temperature', 'Humidity', 'Wind speed (m/s)', 'Visibility (10m)', 'Dew point temperature', 'Solar Radiation (MJ/m2)', 'Rainfall(mm)', 'Snowfall (cm)', 'Seasons', 'Holiday', 'Functioning Day']


In [8]:
#Convert the data to the types we want
from pyspark.sql.types import (StructField, 
                               StringType, 
                               IntegerType,
                               DateType,
                               DoubleType,
                               StructType)

In [9]:
data_schema = [StructField('Date', StringType(), True), 
               StructField('Rented Bike Count', IntegerType(), True),
               StructField('Hour', IntegerType(), True),
               StructField('Temperature', DoubleType(), True),
               StructField('Humidity', DoubleType(), True),
               StructField('Wind speed (m/s)', DoubleType(), True),
               StructField('Visibility (10m)', DoubleType(), True),
               StructField('Dew point temperature', DoubleType(), True),
               StructField('Solar Radiation (MJ/m2)', DoubleType(), True),
               StructField('Rainfall(mm)', DoubleType(), True),
               StructField('Snowfall (cm)', DoubleType(), True),
               StructField('Seasons', StringType(), True),
               StructField('Holiday', StringType(), True),
               StructField('Functioning Day',  StringType(), True)
              ]
final_struct = StructType(fields=data_schema)

Reload the data again with correct data types in schema

In [10]:
df = spark.read.csv('../data/SeoulBikeData/SeoulBikeData.csv', header=True, schema=final_struct)

In [11]:
#Show the dataframe
df.show()

+----------+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+----------+---------------+
|      Date|Rented Bike Count|Hour|Temperature|Humidity|Wind speed (m/s)|Visibility (10m)|Dew point temperature|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|Functioning Day|
+----------+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+----------+---------------+
|01/12/2017|              254|   0|       -5.2|    37.0|             2.2|          2000.0|                -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|
|01/12/2017|              204|   1|       -5.5|    38.0|             0.8|          2000.0|                -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|
|01/1

In [12]:
#Describe the dataframe for all the numerical columns
df.describe().show()

+-------+----------+-----------------+-----------------+------------------+------------------+------------------+-----------------+---------------------+-----------------------+------------------+-------------------+-------+----------+---------------+
|summary|      Date|Rented Bike Count|             Hour|       Temperature|          Humidity|  Wind speed (m/s)| Visibility (10m)|Dew point temperature|Solar Radiation (MJ/m2)|      Rainfall(mm)|      Snowfall (cm)|Seasons|   Holiday|Functioning Day|
+-------+----------+-----------------+-----------------+------------------+------------------+------------------+-----------------+---------------------+-----------------------+------------------+-------------------+-------+----------+---------------+
|  count|      8760|             8760|             8760|              8760|              8760|              8760|             8760|                 8760|                   8760|              8760|               8760|   8760|      8760|         

In [13]:
#Double check the schema
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Rented Bike Count: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind speed (m/s): double (nullable = true)
 |-- Visibility (10m): double (nullable = true)
 |-- Dew point temperature: double (nullable = true)
 |-- Solar Radiation (MJ/m2): double (nullable = true)
 |-- Rainfall(mm): double (nullable = true)
 |-- Snowfall (cm): double (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- Holiday: string (nullable = true)
 |-- Functioning Day: string (nullable = true)



In [14]:
#Select the Rented Bike Count
df.select('Rented Bike Count').show()

+-----------------+
|Rented Bike Count|
+-----------------+
|              254|
|              204|
|              173|
|              107|
|               78|
|              100|
|              181|
|              460|
|              930|
|              490|
|              339|
|              360|
|              449|
|              451|
|              447|
|              463|
|              484|
|              555|
|              862|
|              600|
+-----------------+
only showing top 20 rows



In [15]:
#Select two columns Rented Bike Count and Hour
df.select(['Rented Bike Count', 'Hour']).show()

+-----------------+----+
|Rented Bike Count|Hour|
+-----------------+----+
|              254|   0|
|              204|   1|
|              173|   2|
|              107|   3|
|               78|   4|
|              100|   5|
|              181|   6|
|              460|   7|
|              930|   8|
|              490|   9|
|              339|  10|
|              360|  11|
|              449|  12|
|              451|  13|
|              447|  14|
|              463|  15|
|              484|  16|
|              555|  17|
|              862|  18|
|              600|  19|
+-----------------+----+
only showing top 20 rows



In [16]:
#Select top 2 rows
df.head(2)

[Row(Date='01/12/2017', Rented Bike Count=254, Hour=0, Temperature=-5.2, Humidity=37.0, Wind speed (m/s)=2.2, Visibility (10m)=2000.0, Dew point temperature=-17.6, Solar Radiation (MJ/m2)=0.0, Rainfall(mm)=0.0, Snowfall (cm)=0.0, Seasons='Winter', Holiday='No Holiday', Functioning Day='Yes'),
 Row(Date='01/12/2017', Rented Bike Count=204, Hour=1, Temperature=-5.5, Humidity=38.0, Wind speed (m/s)=0.8, Visibility (10m)=2000.0, Dew point temperature=-17.6, Solar Radiation (MJ/m2)=0.0, Rainfall(mm)=0.0, Snowfall (cm)=0.0, Seasons='Winter', Holiday='No Holiday', Functioning Day='Yes')]

In [17]:
#Creating a new columns
df.withColumn('2xHumidity', df['Humidity']*2).select(['Humidity','2xHumidity']).show()

+--------+----------+
|Humidity|2xHumidity|
+--------+----------+
|    37.0|      74.0|
|    38.0|      76.0|
|    39.0|      78.0|
|    40.0|      80.0|
|    36.0|      72.0|
|    37.0|      74.0|
|    35.0|      70.0|
|    38.0|      76.0|
|    37.0|      74.0|
|    27.0|      54.0|
|    24.0|      48.0|
|    21.0|      42.0|
|    23.0|      46.0|
|    25.0|      50.0|
|    26.0|      52.0|
|    36.0|      72.0|
|    54.0|     108.0|
|    58.0|     116.0|
|    66.0|     132.0|
|    77.0|     154.0|
+--------+----------+
only showing top 20 rows



In [18]:
#You can also use sql queries in Spark
df.createOrReplaceTempView("sqlTest")

In [19]:
#Here we performed a sql query to get data fromspark dataframe 
results = spark.sql("SELECT Hour from sqlTest")
results.show()

+----+
|Hour|
+----+
|   0|
|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
|  10|
|  11|
|  12|
|  13|
|  14|
|  15|
|  16|
|  17|
|  18|
|  19|
+----+
only showing top 20 rows



In [20]:
#use of filter to filter out rows with Hour=0
df.filter("Hour=0").show()

+----------+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+----------+---------------+
|      Date|Rented Bike Count|Hour|Temperature|Humidity|Wind speed (m/s)|Visibility (10m)|Dew point temperature|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|Functioning Day|
+----------+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+----------+---------------+
|01/12/2017|              254|   0|       -5.2|    37.0|             2.2|          2000.0|                -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|
|02/12/2017|              328|   0|       -1.8|    87.0|             1.1|           994.0|                 -3.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|
|03/1

In [21]:
#Select Rented Bike Count when Hour=0
df.filter("Hour=0").select(['Rented Bike Count']).show()

+-----------------+
|Rented Bike Count|
+-----------------+
|              254|
|              328|
|              342|
|              285|
|              216|
|              145|
|              198|
|              233|
|              267|
|              326|
|              125|
|              133|
|              168|
|              171|
|              227|
|              217|
|              175|
|              151|
|              122|
|              142|
+-----------------+
only showing top 20 rows



In [22]:
#Select Rented Bike Count and Hour when Hour>20
result = df.filter("Hour>20").select(['Hour','Rented Bike Count']).collect()
print(result[0])
print(type(result[0]))

#PySpark row can also be converted to a dictionary
result[0].asDict()

Row(Hour=21, Rented Bike Count=405)
<class 'pyspark.sql.types.Row'>


{'Hour': 21, 'Rented Bike Count': 405}

In [23]:
#groupby in pySpark
#mean
df.select(['Hour','Rented Bike Count']).groupBy('Hour').mean().show()
#min
df.select(['Hour','Rented Bike Count']).groupBy('Hour').min().show()
#max
df.select(['Hour','Rented Bike Count']).groupBy('Hour').max().show()
#count
df.select(['Hour','Rented Bike Count']).groupBy('Hour').count().show()

+----+---------+----------------------+
|Hour|avg(Hour)|avg(Rented Bike Count)|
+----+---------+----------------------+
|  12|     12.0|     699.4410958904109|
|  22|     22.0|     922.7972602739726|
|   1|      1.0|     426.1835616438356|
|  13|     13.0|     733.2465753424658|
|   6|      6.0|     287.5643835616438|
|  16|     16.0|     930.6219178082192|
|   3|      3.0|    203.33150684931508|
|  20|     20.0|     1068.964383561644|
|   5|      5.0|    139.08219178082192|
|  19|     19.0|    1195.1479452054793|
|  15|     15.0|      829.186301369863|
|   9|      9.0|     645.9835616438356|
|  17|     17.0|     1138.509589041096|
|   4|      4.0|     132.5917808219178|
|   8|      8.0|    1015.7013698630137|
|  23|     23.0|     671.1260273972603|
|   7|      7.0|     606.0054794520548|
|  10|     10.0|     527.8219178082192|
|  21|     21.0|    1031.4493150684932|
|  11|     11.0|     600.8520547945205|
+----+---------+----------------------+
only showing top 20 rows

+----+--------

In [24]:
#using pyspark functions
from pyspark.sql.functions import countDistinct, avg, stddev

In [25]:
#Count Distinct
df.select(countDistinct("Hour")).show()

+--------------------+
|count(DISTINCT Hour)|
+--------------------+
|                  24|
+--------------------+



In [26]:
#Avg
df.select(avg("Temperature")).show()

+------------------+
|  avg(Temperature)|
+------------------+
|12.882922374429233|
+------------------+



In [27]:
#stddev
df.select(stddev("Temperature")).show()

+------------------------+
|stddev_samp(Temperature)|
+------------------------+
|      11.944825230027929|
+------------------------+



In [28]:
#using aliases
df.select(stddev("Temperature").alias("STDDev Temperature")).show()

+------------------+
|STDDev Temperature|
+------------------+
|11.944825230027929|
+------------------+



In [29]:
#formatting numbers
from pyspark.sql.functions import format_number
humidity_std = df.select(stddev("Humidity"))
humidity_std.show()
humidity_std.select(format_number('stddev_samp(Humidity)',2).alias('std')).show()

+---------------------+
|stddev_samp(Humidity)|
+---------------------+
|   20.362413301565603|
+---------------------+

+-----+
|  std|
+-----+
|20.36|
+-----+



In [30]:
#Order by
df.select(['Hour','Rented Bike Count']) \
    .groupBy('Hour').sum() \
    .orderBy('sum(Rented Bike Count)') \
    .select(['Hour','sum(Rented Bike Count)']).show(24)

+----+----------------------+
|Hour|sum(Rented Bike Count)|
+----+----------------------+
|   4|                 48396|
|   5|                 50765|
|   3|                 74216|
|   6|                104961|
|   2|                110095|
|   1|                155557|
|  10|                192655|
|   0|                197633|
|  11|                219311|
|   7|                221192|
|   9|                235784|
|  23|                244961|
|  12|                255296|
|  13|                267635|
|  14|                276971|
|  15|                302653|
|  22|                336821|
|  16|                339677|
|   8|                370731|
|  21|                376479|
|  20|                390172|
|  17|                415556|
|  19|                436229|
|  18|                548568|
+----+----------------------+



In [31]:
#Order By Desc
res = df.select(['Hour','Rented Bike Count']) \
    .groupBy('Hour').sum()

res.orderBy(res['sum(Rented Bike Count)'].desc()) \
    .select(['Hour','sum(Rented Bike Count)']).show(24)

+----+----------------------+
|Hour|sum(Rented Bike Count)|
+----+----------------------+
|  18|                548568|
|  19|                436229|
|  17|                415556|
|  20|                390172|
|  21|                376479|
|   8|                370731|
|  16|                339677|
|  22|                336821|
|  15|                302653|
|  14|                276971|
|  13|                267635|
|  12|                255296|
|  23|                244961|
|   9|                235784|
|   7|                221192|
|  11|                219311|
|   0|                197633|
|  10|                192655|
|   1|                155557|
|   2|                110095|
|   6|                104961|
|   3|                 74216|
|   5|                 50765|
|   4|                 48396|
+----+----------------------+



In [32]:
#Checking nulls in colums
from pyspark.sql.functions import isnan, when, count
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
#No nans in the data

+----+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+-------+---------------+
|Date|Rented Bike Count|Hour|Temperature|Humidity|Wind speed (m/s)|Visibility (10m)|Dew point temperature|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|Holiday|Functioning Day|
+----+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+-------+---------------+
|   0|                0|   0|          0|       0|               0|               0|                    0|                      0|           0|            0|      0|      0|              0|
+----+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+-------+---------------+



In [33]:
#Convert Date to dateType in PySpark
from pyspark.sql.functions import to_date
df = df.withColumn('New_date', to_date(df['Date'],format='dd/MM/yyyy'))

In [34]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Rented Bike Count: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind speed (m/s): double (nullable = true)
 |-- Visibility (10m): double (nullable = true)
 |-- Dew point temperature: double (nullable = true)
 |-- Solar Radiation (MJ/m2): double (nullable = true)
 |-- Rainfall(mm): double (nullable = true)
 |-- Snowfall (cm): double (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- Holiday: string (nullable = true)
 |-- Functioning Day: string (nullable = true)
 |-- New_date: date (nullable = true)



In [35]:
#Drop old date column
df = df.drop(df['Date'])
#rename New_date as date
df = df.withColumnRenamed('New_Date', 'Date')
df.show()

+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+----------+---------------+----------+
|Rented Bike Count|Hour|Temperature|Humidity|Wind speed (m/s)|Visibility (10m)|Dew point temperature|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|Functioning Day|      Date|
+-----------------+----+-----------+--------+----------------+----------------+---------------------+-----------------------+------------+-------------+-------+----------+---------------+----------+
|              254|   0|       -5.2|    37.0|             2.2|          2000.0|                -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|2017-12-01|
|              204|   1|       -5.5|    38.0|             0.8|          2000.0|                -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|2017-12-01|
|    

In [36]:
#Some datetime operations
from pyspark.sql.functions import (dayofmonth, 
                                   dayofyear,
                                   month,
                                   year,
                                   weekofyear)

#Dateofmonth operation
df.select(dayofmonth('Date').alias('DOM')).show()
df.select(dayofmonth('Date').alias('DOM')).distinct().show()

#Date of year
df.select(dayofyear('Date').alias('DOY')).distinct().show()

#month
df.select(month('Date').alias('Month')).distinct().show()

#year
df.select(year('Date').alias('Year')).distinct().show()

#weekofyear
df.select(weekofyear('Date').alias('WOY')).distinct().show()

+---+
|DOM|
+---+
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
+---+
only showing top 20 rows

+---+
|DOM|
+---+
| 31|
| 28|
| 26|
| 27|
| 12|
| 22|
|  1|
| 13|
|  6|
| 16|
|  3|
| 20|
|  5|
| 19|
| 15|
|  9|
| 17|
|  4|
|  8|
| 23|
+---+
only showing top 20 rows

+---+
|DOY|
+---+
|148|
|243|
| 31|
| 85|
|137|
|251|
| 65|
| 53|
|255|
|133|
|296|
| 78|
|322|
|362|
|321|
|108|
|155|
| 34|
|193|
|211|
+---+
only showing top 20 rows

+-----+
|Month|
+-----+
|   12|
|    1|
|    6|
|    3|
|    5|
|    9|
|    4|
|    8|
|    7|
|   10|
|   11|
|    2|
+-----+

+----+
|Year|
+----+
|2018|
|2017|
+----+

+---+
|WOY|
+---+
| 31|
| 34|
| 28|
| 26|
| 27|
| 44|
| 12|
| 22|
| 47|
|  1|
| 52|
| 13|
|  6|
| 16|
|  3|
| 20|
| 40|
| 48|
|  5|
| 19|
+---+
only showing top 20 rows

