In [1]:
# **Imports and Constants**

In [2]:
# imports
from __future__ import print_function
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from mpl_toolkits.mplot3d import Axes3D
from pyspark.sql.functions import col
from functools import reduce
from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql.functions import col, split, size, isnan, array_contains, array_min, when, count
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, FloatType
import pyspark.sql.functions as f
import matplotlib.pyplot as plt
import pandas as pd
import glob
import re
import pathlib
import json
import datetime as dt
import numpy as np
import os
import random
import statistics
import platform

In [3]:
# paths
DATASET_PATH = '/Users/Soroush/Desktop/Thesis/Code/Datasets/Irish/CER Electricity Revised March 2012/'

# BASE_PATH=pathlib.Path().absolute()
# KMEANS_REL_PATH="kmeans models"
# DATASET_REL_PATH="dataset"
# DATASET_PATH=os.path.join(BASE_PATH,DATASET_REL_PATH)
# KMEANS_PATH=os.path.join(BASE_PATH,KMEANS_REL_PATH)

# from google.colab import drive
# drive.mount('/gdrive')

In [4]:
# env variables
if platform.system() == 'Windows':
    %env PYSPARK_DRIVER_PYTHON = python
    %env PYSPARK_PYTHON = python
elif platform.system() == 'Linux':
    %env PYSPARK_DRIVER_PYTHON = python
    %env PYSPARK_PYTHON = python3
else:
    %env PYSPARK_DRIVER_PYTHON = python3.6
    %env PYSPARK_PYTHON = python3.6

# incompatibility with Pyarrow
# need to install Pyarrow 0.14.1 or lower or Set the environment variable ARROW_PRE_0_15_IPC_FORMAT=1
%env ARROW_PRE_0_15_IPC_FORMAT = 1

env: PYSPARK_DRIVER_PYTHON=python3.6
env: PYSPARK_PYTHON=python3.6
env: ARROW_PRE_0_15_IPC_FORMAT=1


In [5]:
# create appropriate dataset
# load


def load_irish_dataset(dataset_path):

    #data_column_names = ["id", "date", "power"]
    #all_data = pd.DataFrame(columns = data_column_names)

    all_data = pd.DataFrame({'id': pd.Series([], dtype='int'),
                             'date': pd.Series([], dtype='int'),
                             'power': pd.Series([], dtype='float')})

    files = glob.glob(dataset_path + "File*.txt")
    for file in files:
        # print(file)
        data = pd.read_csv(file, sep=" ", header=None)
        data.columns = ["id", "date", "power"]
        # print(data.info())
        all_data = all_data.append(data, ignore_index=True)

    # save
    return all_data


dataset = load_irish_dataset(DATASET_PATH)
print(dataset.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 157992996 entries, 0 to 157992995
Data columns (total 3 columns):
 #   Column  Dtype  
---  ------  -----  
 0   id      int64  
 1   date    int64  
 2   power   float64
dtypes: float64(1), int64(2)
memory usage: 3.5 GB
None


In [6]:
# date column
def create_date_column(dataset):

    def code_to_datetime(code):
        output = list(map(int, str(code)))
        day_code = int(output[0]*100+output[1]*10+output[2])
        time_code = int(output[3]*10+output[4])
        # unixtime in 1 January 2009 00:00:00 is 1230768000
        unixtime = 1230768000+(day_code-1)*86400+time_code*1800
        #dt = time.ctime(unixtime)
        out = dt.datetime.utcfromtimestamp(
            unixtime).strftime('%Y-%m-%d %H:%M:%S')
        return out

    # code_to_datetime(19941)
    dataset['date'] = dataset['date'].apply(code_to_datetime)
    dataset['date'] = pd.to_datetime(dataset['date'])
    return dataset


dataset = create_date_column(dataset)
print(dataset.info())
dataset.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 157992996 entries, 0 to 157992995
Data columns (total 3 columns):
 #   Column  Dtype         
---  ------  -----         
 0   id      int64         
 1   date    datetime64[ns]
 2   power   float64       
dtypes: datetime64[ns](1), float64(1), int64(1)
memory usage: 3.5 GB
None


Unnamed: 0,id,date,power
0,2113,2009-07-15 15:00:00,0.038
1,2113,2009-07-15 15:30:00,0.125
2,2113,2009-07-14 00:30:00,0.189
3,2113,2009-07-14 01:00:00,0.139
4,2113,2009-07-14 01:30:00,0.149


In [None]:
# aggregate data
def agg_by_date(temp):
    temp = temp.set_index('date')
    temp = temp.groupby(temp['id']).resample(
        '60T').mean()  # to 1 hour sampling rate
    temp = temp.reset_index(level='id', drop=True)
    temp = temp.groupby(temp['id']).resample('D').aggregate(
        {'power': lambda x: x.tolist()})  # to 1 day list
    temp = temp.reset_index()
    temp.id = temp.id.astype(int)
    return temp


aggregated_dataset = agg_by_date(dataset)
print(aggregated_dataset.info())
aggregated_dataset.head(10)

In [None]:
# save
aggregated_dataset.to_pickle(DATASET_PATH+"irish_aggregated_dataset.pkl")

#load
#aggregated_dataset=pd.read_pickle(DATASET_PATH+"irish_aggregated_dataset.pkl")

In [15]:
# other

# len(aggregated_dataset.iloc[10][2])
# aggregated_dataset.id.nunique()#.nunique()
# a.to_csv(DATASET_PATH+"irish_ids.csv")
# m=aggregated_dataset
# m.id = m.id.astype(int)
# aggregated_dataset.tail()

Unnamed: 0,id,date,power
3307944,7444,2010-10-14,"[0.7715000000000001, 0.2835, 0.229499999999999..."
3307945,7444,2010-10-15,"[0.35650000000000004, 0.3285, 0.2275, 0.208500..."
3307946,7444,2010-10-16,"[0.491, 0.5035000000000001, 0.4835000000000000..."
3307947,7444,2010-10-17,"[0.3305, 0.38549999999999995, 0.382, 0.241, 0...."
3307948,7444,2010-10-18,[1.003]
