<a href="https://colab.research.google.com/github/tallerzalan/Energinet/blob/main/SpotPrice/EL_Spot_Prices.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Install pyspark
!pip install pyspark

# Import SparkSession
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Check Spark Session Information
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 44.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=e9a46997c2d9cf0d98fb9ebe0d60ba3afc954edecb67ba9f91153eff4e7bc6fc
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
import subprocess
import os
import pandas as pd

import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql.functions import col, to_timestamp

In [3]:
def add_date_columns(df, timestamp_column = 'timestamp'):
    """
    Create a year, month and day column from the given timestamp column.
    Month and day column values will be zero-padded, i.e.: '01', '02', '03' etc.
    """
    # Remove columns created by spark data loading process, i.e.: 'day', 'month', 'year'
    df = df.drop('day', 'month', 'year')
    
    df = df.withColumn('year', F.year(timestamp_column))\
           .withColumn('month', F.date_format(timestamp_column, 'MM'))\
           .withColumn('day', F.date_format(timestamp_column, 'dd'))
    
    return df

In [4]:
# Municipality numbers in DK1
dk1_list = ['751', '851', '461', '630', '561', '730', '791', '740', '621', '615', '657', '540', '860', '746', '813', '661', '479', '580', '760', '510', '430', '607', '573', '710', '766', '779', '706', '787', '575', '846',
            '756', '420', '410', '849', '550', '707', '820', '810', '450', '840', '480', '530', '440', '727', '671', '773', '665', '482', '492', '741', '563', '825']

# Municipality numbers in DK2
dk2_list = ['101', '147', '265', '370', '330', '157', '316', '159', '217', '259', '376', '230', '173', '167', '169', '219', '253', '151', '326', '250', '390', '240', '185', '175', '190', '210', '270', '360', '400', '320',
            '153', '329', '306', '260', '340', '163', '350', '165', '201', '223', '269', '183', '336', '161', '187', '155']

In [5]:
# Loading the EL Spot Prices data
df = spark.read.json('/content/drive/MyDrive/Colab Notebooks/Energinet/Spot Price/Elspotprices.json')\
               .dropDuplicates()\
               .drop('HourDK', 'SpotPriceEUR')

# Cleaning and manipulating the EL Spot Prices data
df = df\
     .withColumn('converted', F.to_timestamp('HourUTC'))\
     .drop('HourUTC')

dk_1 = df\
       .filter(col('PriceArea') == 'DK1')\
       .groupBy(F.date_trunc('hour', F.col('converted')).alias('date'))\
       .agg(F.sum('SpotPriceDKK').alias('value'))

dk_2 = df\
       .filter(col('PriceArea') == 'DK2')\
       .groupBy(F.date_trunc('hour', F.col('converted')).alias('date'))\
       .agg(F.sum('SpotPriceDKK').alias('value'))

dk_1_clean = add_date_columns(dk_1, timestamp_column = 'date')\
             .withColumn('timestamp', F.date_format('date', "yyyy-MM-dd'T'HH:mm:ss'Z'"))\
             .withColumn('category', F.lit('total'))\
             .drop('date')

dk_2_clean = add_date_columns(dk_2, timestamp_column = 'date')\
             .withColumn('timestamp', F.date_format('date', "yyyy-MM-dd'T'HH:mm:ss'Z'"))\
             .withColumn('category', F.lit('total'))\
             .drop('date')

dk_1_clean.sort('timestamp').show(25, truncate = False)
dk_2_clean.sort('timestamp').show(25, truncate = False)

+-----------+----+-----+---+--------------------+--------+
|value      |year|month|day|timestamp           |category|
+-----------+----+-----+---+--------------------+--------+
|478.950012 |2022|09   |30 |2022-09-30T22:00:00Z|total   |
|472.850006 |2022|09   |30 |2022-09-30T23:00:00Z|total   |
|371.799988 |2022|10   |01 |2022-10-01T00:00:00Z|total   |
|159.720001 |2022|10   |01 |2022-10-01T01:00:00Z|total   |
|125.739998 |2022|10   |01 |2022-10-01T02:00:00Z|total   |
|111.760002 |2022|10   |01 |2022-10-01T03:00:00Z|total   |
|111.690002 |2022|10   |01 |2022-10-01T04:00:00Z|total   |
|177.270004 |2022|10   |01 |2022-10-01T05:00:00Z|total   |
|334.769989 |2022|10   |01 |2022-10-01T06:00:00Z|total   |
|543.200012 |2022|10   |01 |2022-10-01T07:00:00Z|total   |
|816.840027 |2022|10   |01 |2022-10-01T08:00:00Z|total   |
|793.940002 |2022|10   |01 |2022-10-01T09:00:00Z|total   |
|669.23999  |2022|10   |01 |2022-10-01T10:00:00Z|total   |
|550.559998 |2022|10   |01 |2022-10-01T11:00:00Z|total  