In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.rdd import RDD
import matplotlib.pyplot as plt
from pyspark.sql.types import *
from pyspark.sql.functions import col
import glob
import os

In [2]:
spark = SparkSession.builder.appName("EnergyConsumption").getOrCreate()
sc = spark.sparkContext

In [3]:
path_gas = r"./data/Gas/"
path_electricity = r"./data/Electricity/"

In [19]:
def make_rdd(path):
    list_data = []
    for filename in os.listdir(path):
        # Read file and assign data
        data = sc.textFile(path + filename)
        # Get year
        year = filename[-8:]
        year = year[0:4]
        # Create structure
        rdd = data.map(lambda r: r + "," + year)
        rdd = rdd.map(lambda r: r.split(","))
        # Add rdd to list
        list_data.append(rdd)
    # Create one big rdd 
    rdd_full = list_data[0]
    for index, rdd in enumerate(list_data):
        if index != 0:
            rdd_full.union(rdd)
    heading = rdd_full.first()
    rdd_full = rdd_full.filter(lambda r: r != heading)
    return rdd_full

In [20]:
rdd_gas = make_rdd(path_gas)
rdd_gas.first()

['Coteq Netbeheer BV',
 'GAS Gastransport Services (GASUNIE)',
 'Margrietstraat',
 '4175GA',
 '7165BD',
 'HAAFTEN',
 '18',
 '100.0',
 '83.33',
 '78',
 'G4',
 '3457',
 '0.0',
 '0.0',
 '2020']

In [21]:
rdd_electricity = make_rdd(path_electricity)
rdd_electricity.first()

['Coteq Netbeheer BV',
 'Netbeheerder Centraal Overijssel B.V.',
 'Dorpsstraat',
 '7468CP',
 '7471AA',
 'ENTER',
 '19',
 '89.47',
 '94.74',
 '89',
 '1x35',
 '4122',
 '89.47',
 '0.0',
 '2020']

In [23]:
heading = ["net_manager","purchase_area","street","zipcode_from","zipcode_to","city","num_connections","delivery_perc","perc_of_active_connections","type_conn_perc","type_of_connection","annual_consume","annual_consume_lowtarif_perc","smartmeter_perc", "year"]
df_electricity = rdd_electricity.toDF(heading)

df_electricity.show(2, vertical=True)

-RECORD 0--------------------------------------------
 net_manager                  | Coteq Netbeheer BV   
 purchase_area                | Netbeheerder Cent... 
 street                       | Dorpsstraat          
 zipcode_from                 | 7468CP               
 zipcode_to                   | 7471AA               
 city                         | ENTER                
 num_connections              | 19                   
 delivery_perc                | 89.47                
 perc_of_active_connections   | 94.74                
 type_conn_perc               | 89                   
 type_of_connection           | 1x35                 
 annual_consume               | 4122                 
 annual_consume_lowtarif_perc | 89.47                
 smartmeter_perc              | 0.0                  
 year                         | 2020                 
-RECORD 1--------------------------------------------
 net_manager                  | Coteq Netbeheer BV   
 purchase_area              

In [59]:
df_electricity_production = spark.read.option("header","true").option("inferSchema", "true").csv("./data/Elektriciteit__aanbod_en_verbruik_12102021_102309.csv", sep=";")
electricity_production_raw = sc.textFile("./data/Elektriciteit__aanbod_en_verbruik_12102021_102309.csv")

In [None]:
df_electricity_production.show()
df_electricity_production.printSchema()

In [None]:
frame_gas

In [None]:
frame_electricity

In [None]:
# Load a text file and convert each line to a Row, so that a DataFrame can be made.

electricity_production_parts = electricity_production_raw.map(lambda l: l.split(";"))
electricity_production_rows = electricity_production_parts.map(lambda l: Row(periode=l[0], netto_productie=l[3],
                                                                             netto_productie_brandstoffen=l[5],
                                                                             netto_productie_windenergie=l[12],
                                                                             netto_productie_zonnestroom=l[15]))

In [24]:
electricity_production_rows.take(2)


NameError: name 'electricity_production_rows' is not defined

In [None]:

header = electricity_production_rows.first()
df_electricity_production = spark.createDataFrame(electricity_production_rows.filter(lambda row: row != header))

In [None]:
df_electricity_production.show()

df2_electricity_production = df_electricity_production.selectExpr("cast(periode as string) periode",
                                                                  "cast(netto_productie as int) netto_productie",
                                                                  "cast(netto_productie_brandstoffen as int) netto_productie_brandstoffen",
                                                                  "cast(netto_productie_windenergie as int) netto_productie_windenergie",
                                                                  "cast(netto_productie_zonnestroom as int) netto_productie_zonnestroom")
df2_electricity_production.printSchema()

In [None]:
df_pandas_electricity_production = df2_electricity_production.toPandas()
print(df_pandas_electricity_production)

In [None]:
df_pandas_electricity_production.plot(x="periode", y="netto_productie", kind="bar")
