In [112]:
import csv
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
# User Defined Functions
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
# Stats
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from math import sqrt
import numpy as np
import pandas as pd

In [2]:
# Creates a new Spark session w/in Python
spark = SparkSession.builder.appName("Final Project").getOrCreate()

# Data Setup
Importing the data

In [165]:
# Reads the local csv stored on my computer, it does have a header
energy_df = spark.read.csv("C:\\Users\\Wes\\Desktop\\CS 490\\energy_dataset.csv", header = True, inferSchema=True)
weather_df = spark.read.csv("C:\\Users\\Wes\\Desktop\\CS 490\\weather_features.csv", header = True, inferSchema=True)
# Creates a temporary view from the dataframe we read from the file
# This is how we can read SQL from it
energy_df.createOrReplaceTempView("Energy")
weather_df.createOrReplaceTempView("Weather")

Creating the Combined dataframe

In [166]:
joined_df = energy_df.join(weather_df, energy_df.time == weather_df.dt_iso)
joined_df.show(1, vertical=True)

-RECORD 0----------------------------------------------------------
 time                                        | 2015-01-01 00:00:00 
 generation_biomass                          | 447.0               
 generation_fossil_brown_coal_lignite        | 329.0               
 generation_fossil_coal_derived_gas          | 0.0                 
 generation_fossil_gas                       | 4844.0              
 generation_fossil_hard_coal                 | 4821.0              
 generation_fossil_oil                       | 162.0               
 generation_fossil_oil_shale                 | 0.0                 
 generation_fossil_peat                      | 0.0                 
 generation_geothermal                       | 0.0                 
 generation_hydro_pumped_storage_aggregated  | null                
 generation_hydro_pumped_storage_consumption | 863.0               
 generation_hydro_run_of_river_and_poundage  | 1051.0              
 generation_hydro_water_reservoir            | 1

Creating columns for Farenheight temperatures and filling na values

In [167]:
# Convert Kelvin temps to Farhenheight
k_to_f_udf = udf(lambda kelvin: (float(kelvin) - 273.15) * (9/5) + 32, DoubleType())
joined_df = joined_df.withColumn("temp_f", k_to_f_udf(joined_df.temp))
joined_df = joined_df.withColumn("temp_min_f", k_to_f_udf(joined_df.temp_min))
joined_df = joined_df.withColumn("temp_max_f", k_to_f_udf(joined_df.temp_max))
joined_df.na.fill(0)

DataFrame[time: string, generation_biomass: double, generation_fossil_brown_coal_lignite: double, generation_fossil_coal_derived_gas: double, generation_fossil_gas: double, generation_fossil_hard_coal: double, generation_fossil_oil: double, generation_fossil_oil_shale: double, generation_fossil_peat: double, generation_geothermal: double, generation_hydro_pumped_storage_aggregated: string, generation_hydro_pumped_storage_consumption: double, generation_hydro_run_of_river_and_poundage: double, generation_hydro_water_reservoir: double, generation_marine: double, generation_nuclear: double, generation_other: double, generation_other_renewable: double, generation_solar: double, generation_waste: double, generation_wind_offshore: double, generation_wind_onshore: double, forecast_solar_day_ahead: double, forecast_wind_offshore_eday_ahead: string, forecast_wind_onshore_day_ahead: double, total_load_forecast: double, total_load_actual: double, price_day_ahead: double, price_actual: double, dt_

# Data Aggregation

## By City

In [168]:
joined_df.groupBy("city_name", "weather_main")\
    .agg({"time" : "count", "price_actual" : "avg"})\
    .withColumnRenamed('count(time)', 'count')\
    .withColumnRenamed('avg(price_actual)', 'avg_price')\
    .filter("count > 100")\
    .orderBy("city_name", "weather_main")\
    .show(100, False)

+----------+------------+------------------+-----+
|city_name |weather_main|avg_price         |count|
+----------+------------+------------------+-----+
| Barcelona|clear       |59.30181183960987 |14764|
| Barcelona|clouds      |57.059017502482575|16112|
| Barcelona|drizzle     |56.17348837209302 |301  |
| Barcelona|mist        |55.9887133182844  |443  |
| Barcelona|rain        |55.62026551226556 |3465 |
| Barcelona|thunderstorm|62.42364820846908 |307  |
|Bilbao    |clear       |60.56761825922418 |8456 |
|Bilbao    |clouds      |57.37178233815964 |16714|
|Bilbao    |drizzle     |60.67769072164944 |485  |
|Bilbao    |fog         |62.91078397212544 |1148 |
|Bilbao    |mist        |59.97895819508959 |1507 |
|Bilbao    |rain        |54.971499514495804|7209 |
|Bilbao    |snow        |55.17807228915661 |166  |
|Bilbao    |thunderstorm|58.42384615384614 |208  |
|Madrid    |clear       |58.671028877320325|20362|
|Madrid    |clouds      |56.2445749178017  |10645|
|Madrid    |drizzle     |57.666

## By Wind Speed

In [38]:
joined_df.groupBy("wind_speed")\
    .agg({"time" : "count", "price_actual" : "avg"})\
    .withColumnRenamed('count(time)', 'count')\
    .withColumnRenamed('avg(price_actual)', 'avg_price')\
    .filter("count > 100")\
    .sort("wind_speed")\
    .show(100, False)

+----------+------------------+-----+
|wind_speed|avg_price         |count|
+----------+------------------+-----+
|0         |58.48113213667815 |18496|
|1         |59.43090143599589 |55223|
|10        |43.998986866791746|533  |
|11        |45.793608562691134|327  |
|12        |48.36364532019704 |203  |
|2         |59.01943278830858 |34555|
|3         |58.0480008785592  |25041|
|4         |57.06761207885105 |18313|
|5         |55.98622442865706 |11683|
|6         |54.275993523697394|6794 |
|7         |51.87357766604919 |3779 |
|8         |48.61905970850964 |2127 |
|9         |46.72045927209707 |1154 |
+----------+------------------+-----+



## By Month and Day

In [50]:
joined_df.groupBy(f.month('time'), f.dayofmonth('time'))\
    .agg({"time" : "count", "price_actual" : "avg"})\
    .withColumnRenamed('dayofmonth(time)', 'day_of_month')\
    .withColumnRenamed('month(time)', 'month')\
    .withColumnRenamed('count(time)', 'count')\
    .withColumnRenamed('avg(price_actual)', 'avg_price')\
    .orderBy('month', 'day_of_month')\
    .filter("count > 100")\
    .show(100, False)

+-----+------------+------------------+-----+
|month|day_of_month|avg_price         |count|
+-----+------------+------------------+-----+
|1    |1           |46.19534161490682 |483  |
|1    |2           |56.790477178423245|482  |
|1    |3           |54.05941414141414 |495  |
|1    |4           |55.018605577689236|502  |
|1    |5           |60.3349007936508  |504  |
|1    |6           |55.52820512820514 |507  |
|1    |7           |59.728073217726376|519  |
|1    |8           |65.08919087136927 |482  |
|1    |9           |64.61051792828685 |502  |
|1    |10          |56.60917647058824 |510  |
|1    |11          |58.63578313253013 |498  |
|1    |12          |66.3585655737705  |488  |
|1    |13          |63.68467065868262 |501  |
|1    |14          |62.59476861167003 |497  |
|1    |15          |60.28846790890269 |483  |
|1    |16          |61.2948065173116  |491  |
|1    |17          |61.06637113402061 |485  |
|1    |18          |64.53259336099586 |482  |
|1    |19          |70.24232179226

# Correlation

In [188]:
def correlation_matrix(df, corr_columns, method='pearson'):
    vector_col = "corr_features"
    assembler = VectorAssembler(inputCols=corr_columns, outputCol=vector_col)
    df_vector = assembler.setHandleInvalid("keep").transform(df).select(vector_col)
    np.nan_to_num(df_vector)
    matrix = Correlation.corr(df_vector, vector_col, method)

    result = matrix.collect()[0]["pearson({})".format(vector_col)].values
    return pd.DataFrame(result.reshape(-1, len(corr_columns)), columns=corr_columns, index=corr_columns)

## Temperature, price, and actual load by year and month

In [198]:
test_agg = joined_df.groupBy(f.year('time'), f.month('time'))\
    .agg({"price_actual" : "avg", \
          "total_load_actual" : "avg",\
          "temp_f": "avg",\
          "temp_min_f": "avg",\
          "temp_max_f": "avg"})\
    .withColumnRenamed('year(time)', 'year')\
    .withColumnRenamed('month(time)', 'month')\
    .withColumnRenamed('avg(price_actual)', 'avg_price')\
    .withColumnRenamed('avg(total_load_actual)', 'avg_total_load_actual')\
    .orderBy('year','month')

In [200]:
correlation_matrix(test_agg, test_agg.columns)

Unnamed: 0,year,month,avg(temp_f),avg(temp_max_f),avg(temp_min_f),avg_total_load_actual,avg_price
year,1.0,0.0,-0.024369,-0.061868,0.012814,0.304222,0.201002
month,0.0,1.0,0.276436,0.28641,0.260331,-0.210858,0.389448
avg(temp_f),-0.024369,0.276436,1.0,0.989586,0.989929,-0.104184,0.023399
avg(temp_max_f),-0.061868,0.28641,0.989586,1.0,0.959802,-0.124596,-0.008685
avg(temp_min_f),0.012814,0.260331,0.989929,0.959802,1.0,-0.084801,0.04778
avg_total_load_actual,0.304222,-0.210858,-0.104184,-0.124596,-0.084801,1.0,0.283745
avg_price,0.201002,0.389448,0.023399,-0.008685,0.04778,0.283745,1.0


## Generation_ by Year

In [194]:
test_agg = joined_df.groupBy(f.year('time'))\
    .agg({"price_actual" : "avg", \
          "total_load_actual" : "avg",\
          "generation_biomass" : "avg",\
          "generation_fossil_brown_coal_lignite" : "avg",\
          "generation_fossil_coal_derived_gas" : "avg",\
          "generation_fossil_gas" : "avg",\
          "generation_fossil_hard_coal" : "avg",\
          "generation_fossil_oil" : "avg",\
          "generation_fossil_oil_shale" : "avg",\
          "generation_fossil_peat" : "avg",\
          "generation_geothermal" : "avg",\
          "generation_hydro_pumped_storage_aggregated" : "avg",\
          "generation_hydro_pumped_storage_consumption" : "avg",\
          "generation_hydro_run_of_river_and_poundage" : "avg",\
          "generation_hydro_water_reservoir" : "avg",\
          "generation_marine" : "avg",\
          "generation_nuclear" : "avg",\
          "generation_other" : "avg",\
          "generation_other_renewable" : "avg",\
          "generation_solar" : "avg",\
          "generation_waste" : "avg",\
          "generation_wind_offshore" : "avg",\
          "generation_wind_onshore" : "avg",\
         })\
    .withColumnRenamed('year(time)', 'year')\
    .orderBy('year')

In [195]:
correlation_matrix(test_agg, test_agg.columns)

Unnamed: 0,year,avg(generation_fossil_gas),avg(generation_fossil_brown_coal_lignite),avg(generation_hydro_pumped_storage_aggregated),avg(generation_fossil_peat),avg(generation_wind_offshore),avg(generation_hydro_water_reservoir),avg(total_load_actual),avg(generation_solar),avg(generation_geothermal),...,avg(generation_hydro_pumped_storage_consumption),avg(generation_biomass),avg(generation_marine),avg(generation_waste),avg(generation_fossil_hard_coal),avg(generation_fossil_coal_derived_gas),avg(generation_hydro_run_of_river_and_poundage),avg(generation_fossil_oil_shale),avg(generation_wind_onshore),avg(generation_fossil_oil)
year,1.0,0.707407,-0.44356,,,,-0.112402,0.98803,-0.376027,,...,-0.870629,-0.863918,,0.950379,-0.701993,,0.522804,,0.514543,-0.796692
avg(generation_fossil_gas),0.707407,1.0,0.296033,,,,-0.750713,0.77263,0.386652,,...,-0.819886,-0.678732,,0.855207,-0.241981,,-0.230797,,-0.067438,-0.424401
avg(generation_fossil_brown_coal_lignite),-0.44356,0.296033,1.0,,,,-0.840437,-0.32071,0.982195,,...,0.040494,0.420684,,-0.24019,0.771253,,-0.948702,,-0.608724,0.661934
avg(generation_hydro_pumped_storage_aggregated),,,,1.0,,,,,,,...,,,,,,,,,,
avg(generation_fossil_peat),,,,,1.0,,,,,,...,,,,,,,,,,
avg(generation_wind_offshore),,,,,,1.0,,,,,...,,,,,,,,,,
avg(generation_hydro_water_reservoir),-0.112402,-0.750713,-0.840437,,,,1.0,-0.243325,-0.857189,,...,0.488484,0.04326,,-0.302046,-0.442907,,0.728486,,0.345629,-0.264612
avg(total_load_actual),0.98803,0.77263,-0.32071,,,,-0.243325,1.0,-0.266476,,...,-0.935592,-0.807328,,0.946351,-0.586183,,0.435614,,0.513215,-0.701659
avg(generation_solar),-0.376027,0.386652,0.982195,,,,-0.857189,-0.266476,1.0,,...,0.030115,0.274732,,-0.131752,0.644033,,-0.976062,,-0.710615,0.525526
avg(generation_geothermal),,,,,,,,,,1.0,...,,,,,,,,,,


## Generation_ By Month

In [196]:
test_agg = joined_df.groupBy(f.month('time'))\
    .agg({"price_actual" : "avg", \
          "total_load_actual" : "avg",\
          "generation_biomass" : "avg",\
          "generation_fossil_brown_coal_lignite" : "avg",\
          "generation_fossil_coal_derived_gas" : "avg",\
          "generation_fossil_gas" : "avg",\
          "generation_fossil_hard_coal" : "avg",\
          "generation_fossil_oil" : "avg",\
          "generation_fossil_oil_shale" : "avg",\
          "generation_fossil_peat" : "avg",\
          "generation_geothermal" : "avg",\
          "generation_hydro_pumped_storage_aggregated" : "avg",\
          "generation_hydro_pumped_storage_consumption" : "avg",\
          "generation_hydro_run_of_river_and_poundage" : "avg",\
          "generation_hydro_water_reservoir" : "avg",\
          "generation_marine" : "avg",\
          "generation_nuclear" : "avg",\
          "generation_other" : "avg",\
          "generation_other_renewable" : "avg",\
          "generation_solar" : "avg",\
          "generation_waste" : "avg",\
          "generation_wind_offshore" : "avg",\
          "generation_wind_onshore" : "avg",\
         })\
    .withColumnRenamed('month(time)', 'month')\
    .orderBy('month')

In [197]:
correlation_matrix(test_agg, test_agg.columns)

Unnamed: 0,month,avg(generation_fossil_gas),avg(generation_fossil_brown_coal_lignite),avg(generation_hydro_pumped_storage_aggregated),avg(generation_fossil_peat),avg(generation_wind_offshore),avg(generation_hydro_water_reservoir),avg(total_load_actual),avg(generation_solar),avg(generation_geothermal),...,avg(generation_hydro_pumped_storage_consumption),avg(generation_biomass),avg(generation_marine),avg(generation_waste),avg(generation_fossil_hard_coal),avg(generation_fossil_coal_derived_gas),avg(generation_hydro_run_of_river_and_poundage),avg(generation_fossil_oil_shale),avg(generation_wind_onshore),avg(generation_fossil_oil)
month,1.0,0.721932,0.627535,,,,-0.801728,-0.250626,-0.152388,,...,-0.496155,-0.007149,,0.534335,0.566349,,-0.707855,,-0.610899,-0.00934
avg(generation_fossil_gas),0.721932,1.0,0.893011,,,,-0.815705,0.326877,-0.12717,,...,-0.462019,0.408114,,0.713947,0.915238,,-0.766604,,-0.501046,0.339675
avg(generation_fossil_brown_coal_lignite),0.627535,0.893011,1.0,,,,-0.859472,0.35137,-0.162395,,...,-0.438496,0.545007,,0.804039,0.976464,,-0.872877,,-0.542033,0.429313
avg(generation_hydro_pumped_storage_aggregated),,,,1.0,,,,,,,...,,,,,,,,,,
avg(generation_fossil_peat),,,,,1.0,,,,,,...,,,,,,,,,,
avg(generation_wind_offshore),,,,,,1.0,,,,,...,,,,,,,,,,
avg(generation_hydro_water_reservoir),-0.801728,-0.815705,-0.859472,,,,1.0,-0.044412,-0.047851,,...,0.65048,-0.430973,,-0.850827,-0.83556,,0.960015,,0.751825,-0.349102
avg(total_load_actual),-0.250626,0.326877,0.35137,,,,-0.044412,1.0,0.01462,,...,0.061434,0.664148,,0.289135,0.46499,,-0.062771,,0.218531,0.626397
avg(generation_solar),-0.152388,-0.12717,-0.162395,,,,-0.047851,0.01462,1.0,,...,-0.725989,0.067916,,0.155315,-0.12825,,0.047322,,-0.554381,0.294699
avg(generation_geothermal),,,,,,,,,,1.0,...,,,,,,,,,,
