## Dataframes

### Reading the Dataset
### Checking the schema of the data
### Selecting columns and indexing
### Adding columns and Dropping columns
### Handling missing values
### Filter operations
### Group by and Aggregate operations

In [143]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, mean, to_date, col, to_timestamp
#from pyspark.ml.feature import Imputer


In [52]:
spark = SparkSession.builder\
.appName("Pyspark-DF")\
.getOrCreate()

In [53]:
spark

## About Dataset

#### This dataset is related to power consumption of three different distribution networks of Tetouan city which is located in north Morocco.

### Variable Name:

### DateTime : Each ten minutes
### Temperature : Weather Temperature of Tetouan city
### Humidity : Weather Humidity of Tetouan city
### Wind Speed : Wind speed of Tetouan city
### general diffuse flows : general diffuse flows
### diffuse flows : diffuse flows
### Zone 1 : power consumption of zone 1 of Tetouan city
### Zone 2 : power consumption of zone 2 of Tetouan city
### Zone 3 : power consumption of zone 3 of Tetouan city

In [54]:
#If we are not using inferSchema below then by default all columns will be read as strings
power_consume_df = spark.read.csv("power_consumption.csv", header = True, inferSchema=True)
power_consume_df.show(5)

+----------------+-----------+--------+----------+---------------------+-------------+-----------+-----------+-----------+
|        DateTime|Temperature|Humidity|Wind Speed|general diffuse flows|diffuse flows|     Zone 1|   Zone 2  |   Zone 3  |
+----------------+-----------+--------+----------+---------------------+-------------+-----------+-----------+-----------+
|01/01/2017 00:00|      6.559|    73.8|     0.083|                0.051|        0.119| 34055.6962|16128.87538|20240.96386|
|01/01/2017 00:10|      6.414|    74.5|     0.083|                 0.07|        0.085|29814.68354|19375.07599|20131.08434|
|01/01/2017 00:20|      6.313|    74.5|      0.08|                0.062|          0.1|29128.10127|19006.68693|19668.43373|
|01/01/2017 00:30|      6.121|    75.0|     0.083|                0.091|        0.096|28228.86076|18361.09422|18899.27711|
|01/01/2017 00:40|      5.921|    75.7|     0.081|                0.048|        0.085| 27335.6962|17872.34043|18442.40964|
+---------------

In [55]:
# Checking the schema
power_consume_df.printSchema()

root
 |-- DateTime: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed: double (nullable = true)
 |-- general diffuse flows: double (nullable = true)
 |-- diffuse flows: double (nullable = true)
 |-- Zone 1: double (nullable = true)
 |-- Zone 2  : double (nullable = true)
 |-- Zone 3  : double (nullable = true)



In [56]:
power_consume_df.dtypes

[('DateTime', 'string'),
 ('Temperature', 'double'),
 ('Humidity', 'double'),
 ('Wind Speed', 'double'),
 ('general diffuse flows', 'double'),
 ('diffuse flows', 'double'),
 ('Zone 1', 'double'),
 ('Zone 2  ', 'double'),
 ('Zone 3  ', 'double')]

In [57]:
type(power_consume_df)

pyspark.sql.dataframe.DataFrame

In [58]:
#List all columns of dataframe
power_consume_df.columns

['DateTime',
 'Temperature',
 'Humidity',
 'Wind Speed',
 'general diffuse flows',
 'diffuse flows',
 'Zone 1',
 'Zone 2  ',
 'Zone 3  ']

In [59]:
#Renaming the columns
power_consume_df = power_consume_df.withColumnRenamed("general diffuse flows","General Diffuse Flows") \
.withColumnRenamed("diffuse flows","Diffuse Flows") \
.withColumnRenamed("Zone 2  ","Zone 2") \
.withColumnRenamed("Zone 3  ","Zone 3")

In [60]:
#Adding Index column to the data frame
power_consume_df = power_consume_df.withColumn("Index", monotonically_increasing_id())

'''
For sequential indexing:

from pyspark.sql.functions import row_number

# Define a window specification
window_spec = Window.orderBy("DateTime")

# Add sequential index using row_number
power_consume_df = power_consume_df.withColumn("index", row_number().over(window_spec))
'''

'\nFor sequential indexing:\n\nfrom pyspark.sql.functions import row_number\n\n# Define a window specification\nwindow_spec = Window.orderBy("DateTime")\n\n# Add sequential index using row_number\npower_consume_df = power_consume_df.withColumn("index", row_number().over(window_spec))\n'

In [61]:
# Re-ordering columns
columns_order = ['Index']+[col for col in power_consume_df.columns if col!='Index' ]

power_consume_df = power_consume_df.select(columns_order)

In [62]:
#List rows of dataframe
power_consume_df.head(1)

[Row(Index=0, DateTime='01/01/2017 00:00', Temperature=6.559, Humidity=73.8, Wind Speed=0.083, General Diffuse Flows=0.051, Diffuse Flows=0.119, Zone 1=34055.6962, Zone 2=16128.87538, Zone 3=20240.96386)]

In [63]:
#Selecting specific columns of dataframe
power_consume_df.select(['DateTime','Humidity','Diffuse flows','Zone 1']).show(5)

+----------------+--------+-------------+-----------+
|        DateTime|Humidity|Diffuse flows|     Zone 1|
+----------------+--------+-------------+-----------+
|01/01/2017 00:00|    73.8|        0.119| 34055.6962|
|01/01/2017 00:10|    74.5|        0.085|29814.68354|
|01/01/2017 00:20|    74.5|          0.1|29128.10127|
|01/01/2017 00:30|    75.0|        0.096|28228.86076|
|01/01/2017 00:40|    75.7|        0.085| 27335.6962|
+----------------+--------+-------------+-----------+
only showing top 5 rows



In [64]:
# Describing the dataframe similar to Pandas
power_consume_df.select(["Temperature","Diffuse Flows","Zone 1","Zone 2","Zone 3"]).describe().show()

+-------+------------------+------------------+-----------------+------------------+------------------+
|summary|       Temperature|     Diffuse Flows|           Zone 1|            Zone 2|            Zone 3|
+-------+------------------+------------------+-----------------+------------------+------------------+
|  count|             52416|             52417|            52416|             52416|             52417|
|   mean|18.810023962149028| 75.02659457809085|32344.97056358616|21042.509082321798|17835.277931633376|
| stddev| 5.815475838908468|124.21019434184817|7130.562564198581| 5201.465892178914| 6622.167063631066|
|    min|             3.247|             0.011|       13895.6962|       8560.081466|        5935.17407|
|    max|             40.01|             936.0|      52204.39512|       37408.86076|       47598.32636|
+-------+------------------+------------------+-----------------+------------------+------------------+



In [65]:
#Adding new column to the pyspark dataframe
power_consume_df = power_consume_df.withColumn("Total power consumption",power_consume_df['Zone 1']+power_consume_df['Zone 2']+power_consume_df['Zone 3'])

In [66]:
power_consume_df.select(['Index','Zone 1','Zone 2','Zone 3','Total power consumption']).show(5)

+-----+-----------+-----------+-----------+-----------------------+
|Index|     Zone 1|     Zone 2|     Zone 3|Total power consumption|
+-----+-----------+-----------+-----------+-----------------------+
|    0| 34055.6962|16128.87538|20240.96386|      70425.53543999999|
|    1|29814.68354|19375.07599|20131.08434|      69320.84387000001|
|    2|29128.10127|19006.68693|19668.43373|            67803.22193|
|    3|28228.86076|18361.09422|18899.27711|      65489.23208999999|
|    4| 27335.6962|17872.34043|18442.40964|            63650.44627|
+-----+-----------+-----------+-----------+-----------------------+
only showing top 5 rows



In [67]:
#Deleting unwanted column
power_consume_df = power_consume_df.drop('Zone 1','Zone 2','Zone 3')
power_consume_df.printSchema()

root
 |-- Index: long (nullable = false)
 |-- DateTime: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed: double (nullable = true)
 |-- General Diffuse Flows: double (nullable = true)
 |-- Diffuse Flows: double (nullable = true)
 |-- Total power consumption: double (nullable = true)



## Handling Missing values

In [68]:
power_consume_df.count()

52417

In [69]:
power_consume_df.na.drop().count()

52416

In [70]:
## drop how== any or how==all

#For the row, all columns values should be null, then only row will be dropped
power_consume_df.na.drop(how="all").count() 

## power_consume_df.na.drop(how="any").count() ## default one

52417

In [71]:
# If row is crossing the threshold of missing values then the row will be dropped
power_consume_df.na.drop(how="any",thresh=4).count()

52416

In [72]:
# usage of Subset in drop
power_consume_df.na.drop(how="any",subset=['Wind speed']).count()

52416

In [73]:
power_consume_df.na.fill(0,['Temperature','Humidity']).tail(1)

[Row(Index=8589935416, DateTime='12/30/2017 23:50', Temperature=0.0, Humidity=0.0, Wind Speed=None, General Diffuse Flows=None, Diffuse Flows=0.211, Total power consumption=None)]

In [74]:
# Fill null values with mean values
# Compute the mean using agg()
mean_temp = power_consume_df.agg(mean("Temperature").alias("mean_temperature")).first()["mean_temperature"]
mean_hum = power_consume_df.agg(mean("Humidity").alias("mean_humidity")).first()["mean_humidity"]
mean_wspeed = power_consume_df.agg(mean("Wind Speed").alias("mean_wspeed")).first()["mean_wspeed"]
mean_gdflow = power_consume_df.agg(mean("General Diffuse Flows").alias("mean_gdflow")).first()["mean_gdflow"]
mean_tpc = power_consume_df.agg(mean("Total power consumption").alias("mean_tpc")).first()["mean_tpc"]

In [80]:
# Replacing Null values with mean value
power_consume_df = power_consume_df.fillna({"Temperature": mean_temp, "Humidity": mean_hum, \
                         "Wind Speed": mean_wspeed, "General Diffuse Flows": mean_gdflow, \
                         "Total power consumption": mean_tpc})
power_consume_df.tail(1)

[Row(Index=8589935416, DateTime='12/30/2017 23:50', Temperature=18.810023962149028, Humidity=68.25951846764373, Wind Speed=1.9594888583639225, General Diffuse Flows=182.69661376298254, Diffuse Flows=0.211, Total power consumption=71222.88586428417)]

In [79]:
power_consume_df.na.drop().count()

52417

## Filter operations

In [103]:
power_consume_df.filter("Temperature>40").show()

+-----+---------------+-----------+--------+----------+---------------------+-------------+-----------------------+
|Index|       DateTime|Temperature|Humidity|Wind Speed|General Diffuse Flows|Diffuse Flows|Total power consumption|
+-----+---------------+-----------+--------+----------+---------------------+-------------+-----------------------+
|29034|7/21/2017 15:00|      40.01|   14.54|      0.07|                798.0|         85.4|      94051.40112000001|
+-----+---------------+-----------+--------+----------+---------------------+-------------+-----------------------+



In [107]:
power_consume_df.filter("`Wind Speed`>4").show()

+-----+----------------+-----------+--------+----------+---------------------+-------------+-----------------------+
|Index|        DateTime|Temperature|Humidity|Wind Speed|General Diffuse Flows|Diffuse Flows|Total power consumption|
+-----+----------------+-----------+--------+----------+---------------------+-------------+-----------------------+
|   62|01/01/2017 10:20|      5.996|   69.85|      4.93|                282.7|        31.96|            57125.30575|
|   63|01/01/2017 10:30|       6.22|   68.81|     4.924|                307.0|        32.42|            58220.10518|
|   64|01/01/2017 10:40|      6.703|   68.01|     4.923|                327.6|        33.22|             59553.8367|
|   65|01/01/2017 10:50|      6.993|   66.14|     4.918|                349.6|        33.41|            60558.95241|
|   66|01/01/2017 11:00|       7.54|   64.21|     4.916|                371.1|        33.43|     61813.860790000006|
|   67|01/01/2017 11:10|       8.22|    61.9|     4.916|        

In [108]:
power_consume_df.filter("`Wind Speed`>4").select(['DateTime','Wind Speed','Total power consumption']).show()

+----------------+----------+-----------------------+
|        DateTime|Wind Speed|Total power consumption|
+----------------+----------+-----------------------+
|01/01/2017 10:20|      4.93|            57125.30575|
|01/01/2017 10:30|     4.924|            58220.10518|
|01/01/2017 10:40|     4.923|             59553.8367|
|01/01/2017 10:50|     4.918|            60558.95241|
|01/01/2017 11:00|     4.916|     61813.860790000006|
|01/01/2017 11:10|     4.916|            62338.32489|
|01/06/2017 13:20|     4.708|            71650.33965|
|  1/20/2017 6:10|     4.917|     54742.979340000005|
|  1/20/2017 6:20|     4.918|     54432.780399999996|
|  1/20/2017 6:30|     4.917|             54697.2089|
|  1/20/2017 6:40|     4.916|            55113.11039|
|  1/20/2017 6:50|     4.914|            54497.62508|
|  1/20/2017 7:00|     4.923|            53470.90623|
|  1/20/2017 7:10|     4.918|     53426.883350000004|
|  1/20/2017 7:20|     4.915|            52547.31064|
|  1/20/2017 7:30|     4.916

In [111]:
# Filter condition with or operation
power_consume_df.filter((power_consume_df['Temperature']>40) | (power_consume_df['Wind Speed']>4)).show()

+-----+----------------+-----------+--------+----------+---------------------+-------------+-----------------------+
|Index|        DateTime|Temperature|Humidity|Wind Speed|General Diffuse Flows|Diffuse Flows|Total power consumption|
+-----+----------------+-----------+--------+----------+---------------------+-------------+-----------------------+
|   62|01/01/2017 10:20|      5.996|   69.85|      4.93|                282.7|        31.96|            57125.30575|
|   63|01/01/2017 10:30|       6.22|   68.81|     4.924|                307.0|        32.42|            58220.10518|
|   64|01/01/2017 10:40|      6.703|   68.01|     4.923|                327.6|        33.22|             59553.8367|
|   65|01/01/2017 10:50|      6.993|   66.14|     4.918|                349.6|        33.41|            60558.95241|
|   66|01/01/2017 11:00|       7.54|   64.21|     4.916|                371.1|        33.43|     61813.860790000006|
|   67|01/01/2017 11:10|       8.22|    61.9|     4.916|        

In [114]:
# filter condition with and operation
power_consume_df.filter((power_consume_df['Temperature']>=40) & (power_consume_df['Wind Speed']>=0)).show()

+-----+---------------+-----------+--------+----------+---------------------+-------------+-----------------------+
|Index|       DateTime|Temperature|Humidity|Wind Speed|General Diffuse Flows|Diffuse Flows|Total power consumption|
+-----+---------------+-----------+--------+----------+---------------------+-------------+-----------------------+
|29034|7/21/2017 15:00|      40.01|   14.54|      0.07|                798.0|         85.4|      94051.40112000001|
+-----+---------------+-----------+--------+----------+---------------------+-------------+-----------------------+



In [122]:
# filter condition with Not operation
power_consume_df.filter(~(power_consume_df['Wind Speed']>=0.06)).select(['DateTime','Wind Speed','Total power consumption']).show(5)

+----------------+----------+-----------------------+
|        DateTime|Wind Speed|Total power consumption|
+----------------+----------+-----------------------+
|04/08/2017 10:00|     0.058|            68446.70147|
|04/08/2017 10:10|     0.057|      69104.12329999999|
|04/08/2017 10:20|     0.053|            70349.10411|
| 4/29/2017 19:40|     0.058|      96610.44357999999|
| 4/29/2017 21:10|     0.059|            93901.35606|
+----------------+----------+-----------------------+
only showing top 5 rows



## GroupBy and Aggregate functions

In [151]:
# Convert the DateTime string to date only (extract date)
#df_with_date = power_consume_df.withColumn("Date", to_date(col("DateTime"), "MM/dd/yyyy HH:mm"))
#df_with_date.select(['Index','DateTime','Temperature']).show(2)
#del df_with_date

power_consume_df = power_consume_df.withColumn("Date", to_date(col("DateTime"), "MM/dd/yyyy HH:mm"))
power_consume_df.select(['Index','Date','DateTime','Temperature']).show(2)

+-----+----------+----------------+-----------+
|Index|      Date|        DateTime|Temperature|
+-----+----------+----------------+-----------+
|    0|2017-01-01|01/01/2017 00:00|      6.559|
|    1|2017-01-01|01/01/2017 00:10|      6.414|
+-----+----------+----------------+-----------+
only showing top 2 rows



In [153]:
#spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [156]:
# showing the data of total power consumption based on date
power_consume_df.groupBy("Date").sum("Total power consumption").show()

+----------+----------------------------+
|      Date|sum(Total power consumption)|
+----------+----------------------------+
|2017-08-11|        1.2848104319480004E7|
|2017-09-11|        1.0002833790764997E7|
|2017-01-06|               9839964.61365|
|2017-01-27|        1.0110594112469997E7|
|2017-02-26|           9259658.997878002|
|2017-09-28|        1.0143342140150003E7|
|2017-01-24|           9969140.384879995|
|2017-06-29|        1.0773404923849994E7|
|2017-09-29|           9964752.171531996|
|2017-02-16|           9945513.795460006|
|2017-07-31|        1.3495833568249997E7|
|2017-08-14|            1.241407166163E7|
|2017-08-18|        1.2853806430820003E7|
|2017-10-23|           9741455.553555999|
|2017-12-02|           8655025.729212001|
|2017-04-09|           9158089.713423997|
|2017-12-25|           9464500.883191997|
|2017-02-28|               9664520.51121|
|2017-03-28|               9542848.38802|
|2017-09-21|               9847875.17363|
+----------+----------------------

In [158]:
# showing the average of total power consumption based on date
power_consume_df.groupBy("Date").mean("Total power consumption").show()

+----------+----------------------------+
|      Date|avg(Total power consumption)|
+----------+----------------------------+
|2017-08-11|           89222.94666305558|
|2017-09-11|           69464.12354697914|
|2017-01-06|           68333.08759479166|
|2017-01-27|           70212.45911437499|
|2017-02-26|            64303.1874852639|
|2017-09-28|           70439.87597326392|
|2017-01-24|           69230.14156166663|
|2017-06-29|           74815.31197118052|
|2017-09-29|           69199.66785786109|
|2017-02-16|           69066.06802402782|
|2017-07-31|           93721.06644618053|
|2017-08-14|           86208.83098354167|
|2017-08-18|           89262.54465847224|
|2017-10-23|           67648.99689969444|
|2017-12-02|          60104.345341750006|
|2017-04-09|           63597.84523211109|
|2017-12-25|           65725.70057772221|
|2017-02-28|           67114.72577229167|
|2017-03-28|            66269.7804723611|
|2017-09-21|           68388.02203909721|
+----------+----------------------

In [161]:
# showing the average of total power consumption based on date
power_consume_df.groupBy("Temperature").count().show()

+-----------+-----+
|Temperature|count|
+-----------+-----+
|      10.65|   15|
|      4.917|    1|
|      3.681|    1|
|       14.9|   33|
|       8.51|   12|
|       9.13|   11|
|      12.32|   21|
|       13.4|   33|
|      17.95|   30|
|      17.56|   27|
|       15.5|   18|
|      17.52|   22|
|      23.04|   24|
|      19.98|   34|
|      26.72|   15|
|       26.7|   18|
|      30.49|    5|
|      35.17|    1|
|       15.4|   26|
|      16.75|   38|
+-----------+-----+
only showing top 20 rows



In [163]:
power_consume_df.agg({"Total power consumption":"mean"}).show()

+----------------------------+
|avg(Total power consumption)|
+----------------------------+
|           71222.88586428417|
+----------------------------+



In [165]:
power_consume_df.groupBy("Temperature").min("Total power consumption").show()

+-----------+----------------------------+
|Temperature|min(Total power consumption)|
+-----------+----------------------------+
|      10.65|          46376.578420000005|
|      4.917|                 54121.87833|
|      3.681|          51626.732780000006|
|       14.9|          40261.433582000005|
|       8.51|           45998.82797300001|
|       9.13|                 46051.37935|
|      12.32|                  42504.5549|
|       13.4|                 48382.66485|
|      17.95|                 43940.77385|
|      17.56|                 46909.80901|
|       15.5|                49809.499618|
|      17.52|           49507.34366300001|
|      23.04|                 53204.56939|
|      19.98|                 42391.51917|
|      26.72|                 64252.93414|
|       26.7|                 68236.78355|
|      30.49|           97792.96624000001|
|      35.17|                 99269.91935|
|       15.4|                 38841.52845|
|      16.75|          41078.848118999995|
+----------

In [166]:
power_consume_df.groupBy("Temperature").max("Total power consumption").show()

+-----------+----------------------------+
|Temperature|max(Total power consumption)|
+-----------+----------------------------+
|      10.65|           91501.23148000002|
|      4.917|                 54121.87833|
|      3.681|          51626.732780000006|
|       14.9|                  94563.4659|
|       8.51|                  77609.2501|
|       9.13|                 71105.22051|
|      12.32|           98380.32831000001|
|       13.4|                 94711.46412|
|      17.95|                101016.03083|
|      17.56|           92848.35106999999|
|       15.5|                 93721.00999|
|      17.52|                 96310.31974|
|      23.04|                 93738.36103|
|      19.98|                 88705.29548|
|      26.72|          121382.31991999998|
|       26.7|          124074.15114999999|
|      30.49|          122461.40011000002|
|      35.17|                 99269.91935|
|       15.4|                 93041.08353|
|      16.75|                 94017.12197|
+----------