# Header splitters

In [13]:
import re
def head_split(header):
    positions = []
    items = []
    for m in re.finditer(r"\S+", header):
        position, item = m.start(), m.group()
        if item != "#":
            items.append(item)
            positions.append(position)

    item_positions = dict()
    for i in range(len(items) - 1):
        if i == 0:
            item_positions[items[i]] = [0, positions[i + 1] - 1]
            continue
        item_positions[items[i]] = [positions[i], positions[i + 1] - 1]

    item_positions[items[-1]] = positions[-1], len(header)

    return item_positions

In [14]:
from enum import Enum
class ColumnTypes(Enum):
    TimeStampType = 0
    String = 1
    Integer = 2
    Float = 3

# Row splitters

In [15]:
from datetime import datetime as dt
from numba import jit

def cast_string_value_to_type(var_value, column_type):
    result = None

    if var_value == "":
        return result

    if column_type == ColumnTypes.TimeStampType:
        result = dt.strptime(var_value, "%Y-%m-%d %H:%M:%S")

    if column_type == ColumnTypes.Float:
        result = float(var_value)

    if column_type == ColumnTypes.String:
        result = str(var_value)

    if column_type == ColumnTypes.Integer:
        result = int(var_value)

    return result


def row_split(r, item_positions, column_names, column_types):
    result = []
    num_requested_items = len(column_names)

    for column_name in column_names:

        if len(result) == num_requested_items:
            break

        if column_name not in item_positions.keys():
            continue

        var_value = r[
            item_positions[column_name][0] : item_positions[column_name][1]
        ].strip()

        result.append(cast_string_value_to_type(var_value, column_types[column_name]))

    return result

# File parser

In [23]:
class FileParser:
    def __init__(
        self,
        file_path,
        spark_context,
        header_symbol,
        filter_value,
        header_estimated_length,
        column_names,
        column_types,
    ):
        self.rdd = spark_context.textFile(file_path,minPartitions= 100) #minPartitions= 10
        self.file_path = file_path

        self.header_symbol = header_symbol
        self.header_estimated_length = header_estimated_length
        self.column_names = column_names
        self.column_types = column_types
        self.filter_value = filter_value

        self.first_rows = self.rdd.take(self.header_estimated_length)

    def get_temporal_extend(self):
        self.header_extract_period_function(self.first_rows)

    def parse(self):

        # assume contained in the first self.length_header rows
        if self.first_rows[-1] == self.header_symbol:
            raise ValueError(
                "Estimated length of the header is too small, please increase it"
            )

        num_header_rows = 0
        for row in self.first_rows:
            if row[0] == self.header_symbol:
                num_header_rows += 1

        header = self.first_rows[num_header_rows - 1]
        item_positions = head_split(header)
        all_items_found = all(item in item_positions.keys() for item in self.column_names)

        if not all_items_found:
            raise ValueError("Not all required columns ar found for " + self.file_path)

        column_names = self.column_names
        column_types = self.column_types
        header_symbol = self.header_symbol
        filter_value = 'De Bilt' 
        filter_column = 'NAME'
        return self.rdd.filter(lambda line: line[0] != header_symbol).map(
            lambda x: row_split(x, item_positions, column_names, column_types)
        )

# Utilities

In [24]:
def pairwise_union(dataframes_list):
    while len(dataframes_list) > 1:
        unified_df = [
            df1.union(df2).distinct()
            for df1, df2 in zip(dataframes_list[::2], dataframes_list[1::2])
        ]
        if len(dataframes_list) > 1 and len(unified_df) % 2 == 1:
            unified_df[-1] = unified_df[-1].union(dataframes_list[-1]).distinct()
        dataframes_list = unified_df
    return unified_df[0]

# Test application

In [25]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import (
    FloatType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

In [26]:
spark = SparkSession.builder.appName('gdd').getOrCreate()
#spark.conf.set("spark.executor.memory", "10g")
#spark.conf.set("spark.executor.cores", "4")
#spark_context = spark.sparkContext
#n_workers =  len([executor.host() for executor in spark_context.statusTracker().getExecutorInfos() ]) -1
#print(spark.SparkConf())

In [27]:
spark = SparkSession.builder.appName("compute_heat_waves").getOrCreate()
spark_context = spark.sparkContext

column_names = ["DTG", "NAME", "TX_DRYB_10"]
column_types = {
    column_names[0]: ColumnTypes.TimeStampType,
    column_names[1]: ColumnTypes.String,
    column_names[2]: ColumnTypes.Float,
}
data_frame_schema = [
    StructField(column_names[0], TimestampType(), True),
    StructField(column_names[1], StringType(), True),
    StructField(column_names[2], FloatType(), True),
]
schema = StructType(data_frame_schema)

In [28]:
from os.path import isfile, join
import os
from os import listdir

dir_path = "../../data/uncompressed"
abs_dir_path = os.path.abspath(dir_path)
all_files_path = [
    join(abs_dir_path, f)
    for f in listdir(abs_dir_path)
    if isfile(join(abs_dir_path, f))
]

In [29]:
from pyspark.sql.functions import count, to_date, countDistinct, col, row_number
from pyspark.sql.functions import max as pyspark_max
from pyspark.sql.functions import min as pyspark_min
from pyspark.sql.window import Window
dfs = []
for iteration, file_path in enumerate(all_files_path):
    try:
        file_parser = FileParser(
            file_path=file_path,
            spark_context=spark_context,
            header_symbol="#",
            filter_value = 'NAME',
            header_estimated_length=100,
            column_names=column_names,
            column_types=column_types,
        )
        df = file_parser.parse().toDF(schema=schema).filter("NAME=='De Bilt'")
        print("Iteration {}".format(iteration))
        dfs.append(df)
    except ValueError:
        print("error found for file {}, iteration {}".format(file_path, iteration))

Iteration 0
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Iteration 6
Iteration 7
Iteration 8
Iteration 9
Iteration 10
Iteration 11
Iteration 12
Iteration 13
Iteration 14
Iteration 15
Iteration 16
Iteration 17
Iteration 18
Iteration 19
Iteration 20
Iteration 21
Iteration 22
Iteration 23
Iteration 24
Iteration 25
Iteration 26
Iteration 27
Iteration 28
Iteration 29
Iteration 30
Iteration 31
Iteration 32
Iteration 33
Iteration 34
Iteration 35
Iteration 36
Iteration 37
Iteration 38
Iteration 39
Iteration 40
Iteration 41
Iteration 42
Iteration 43
Iteration 44
Iteration 45
Iteration 46
Iteration 47
Iteration 48
Iteration 49
Iteration 50
Iteration 51
Iteration 52
Iteration 53
Iteration 54
Iteration 55
Iteration 56
Iteration 57
Iteration 58
Iteration 59
Iteration 60
Iteration 61
Iteration 62
Iteration 63
Iteration 64
Iteration 65
Iteration 66
Iteration 67
Iteration 68
Iteration 69
Iteration 70
Iteration 71
Iteration 72
Iteration 73
Iteration 74
Iteration 75
Iteration 76
Iteration

# Set parameters

In [30]:
temperature = 5
duration = 5
max_temperature = 16

## Perform the union of dataframes

In [31]:
union_df = pairwise_union(dfs)

## Add a dates column, count the dinstict timestamps within the day, find the maximum temp, order by dates

In [32]:
union_df = (union_df.withColumn("Dates", to_date(col("DTG"))).groupBy("Dates").agg(countDistinct("DTG"), pyspark_max("TX_DRYB_10"), pyspark_min("TX_DRYB_10")).orderBy("Dates"))

In [33]:
#union_df_final.count()

## Convert to pandas

In [None]:
union_df_final_pd = union_df.toPandas().set_index("Dates")

## Save to csv

In [34]:
union_df_final_pd.to_csv("reduced_panda_df.csv", header=True)

## now print the dates

In [35]:
union_df_final_pd

Unnamed: 0_level_0,count(DTG),max(TX_DRYB_10),min(TX_DRYB_10)
Dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2003-04-01,143,13.200000,-0.2
2003-04-02,144,10.000000,3.6
2003-04-03,144,9.300000,0.5
2003-04-04,144,10.800000,-2.2
2003-04-05,144,11.600000,5.3
...,...,...,...
2019-03-28,144,11.200000,3.2
2019-03-29,144,17.100000,0.4
2019-03-30,144,18.299999,4.0
2019-03-31,138,12.500000,3.7


In [20]:
from datetime import timedelta
is_high_temp = False
high_temp_days_start = None
high_temp_days_end = None
running_max_temp = -1000.0
for index, row in union_df_final_pd.iterrows():
    if row['last_days_min_temp'] > temperature and not is_high_temp:
        running_max_temp = max(running_max_temp, row['last_days_max_temp']) 
        high_temp_days_start = index - timedelta(duration)
        high_temp_days_end = index
        is_high_temp = True
    if row['last_days_min_temp'] < temperature and is_high_temp:
        high_temp_days_end = index
        if running_max_temp > 35:
            heat_wave_end.append([high_temp_days_start,high_temp_days_end ])
        running_max_temp= -1000.0
        high_temp_days_start = None
        high_temp_days_end = None
        is_high_temp = False

#If we still are on a heat wave
if is_in_heat_wave:
    heat_wave_end.append(union_df_final_pd.index[-1])
        
print(heat_wave_start)
print(heat_wave_end)

[]
[datetime.date(2003, 6, 1)]
