# Prerequisites:

Used Python 3.7 and PySpark
PySpark could be replaced with Apache Beam as well
The file locations for CSV files as well as the filename and path for the destination parquet file may need to be changed in the code below.

# Choosing Row Group

After initial inspection of the data as well as the questions posed it appears that we should organize rows of data together that correspond to readings accross a single day. Since the amount of data per day will vary the optimum solution is to determine the row group size of each day's data as the parquet file is being saved. This will give us a similar value for each row group. This is preferred to setting a fixed row group size.

# Converting CSV data to Parquet 

We read both csv files into separate dataframes and then append then to create a single dataframe.
We take filtered subsets of our dataframe (by filtering for all the data corresponding to each day within each month) and these subset dataframes are stored in a list datastructure. The pyarrow API/Library is used to create a table where each row group comprises data for a single day and in this way we create a parquet file with the appropriate row group (based on how we want to select data in future).
The Temperature readings column has many readings of -99 degrees which is obviously incorrect so such readings are filtered out from the source CSV dataframes and will not be used for further analysis. 

In [56]:
import pandas as pd

import pyarrow as pa
import pyarrow.parquet as pq

file1 = "C:/Samar/input/weather.20160201.csv"
file2 = "C:/Samar/input/weather.20160301.csv"

febdays = range(1,30)
mardays = range(1,32)

def read_csv(day, month,df):
    mylist = []
    mylist.append(df[(df['ObservationDate'].dt.day==day) & (df['ObservationDate'].dt.month==month) & (df['ScreenTemperature']!=int("-99"))])
    return mylist

df = pd.read_csv(
        file1,
        parse_dates=["ObservationDate"],
        index_col=False,
        infer_datetime_format=True,
    )

df2 = pd.read_csv(
        file2,
        parse_dates=["ObservationDate"],
        index_col=False,
        infer_datetime_format=True,
    )

df = df.append(df2)


df_list = []
temp_list = []

for i in febdays:
    temp_list = read_csv(i, 2, df)
    df_list = df_list + temp_list
    
for j in mardays:
    temp_list = read_csv(j, 3, df)
    df_list = df_list + temp_list


table = pa.Table.from_pandas(df_list[0], preserve_index=False)
filename = "C:/Samar/input/converted-rowgroups.parquet"
writer = pq.ParquetWriter(filename, table.schema)

for df in df_list:
    table = pa.Table.from_pandas(df, preserve_index=False)
    writer.write_table(table)
writer.close()
print ("Single Parquet File Created!")

Single Parquet File Created!


# Looking at Metadata

In [None]:
We can do a sanity check of the parquet file by looking at metadata to ensure the conversion was successful. 
We print out the number of rows of data in each row group as well as the number of bytes in each row group. The number of rows 
in each row group can be checked against the original excel files to ensure correctness. We could also examine datatypes etc.

In [59]:
import pyarrow.parquet as pq

pq_file = pq.ParquetFile(filename)

data = [["columns:", pq_file.metadata.num_columns],
        ["rows:", pq_file.metadata.num_rows],
        ["row_roups:", pq_file.metadata.num_row_groups]
        ]



s = pq_file.metadata.schema
#data2 = [[s.column(i).name, s.column(i).physical_type,  s.column(i).logical_type] for i in range(len(s))]

data3 = []
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data3.append([rg, rg_meta.num_rows, rg_meta.total_byte_size])


print("" + str(data3))

[[0, 3216, 29180], [1, 3197, 28590], [2, 3203, 26494], [3, 3187, 26257], [4, 3181, 26794], [5, 3187, 27514], [6, 3155, 27374], [7, 3152, 27500], [8, 3135, 25698], [9, 2774, 24106], [10, 3171, 25011], [11, 3225, 25357], [12, 3104, 26503], [13, 3216, 26317], [14, 3085, 24844], [15, 3062, 28160], [16, 3107, 26582], [17, 3211, 24722], [18, 3202, 26963], [19, 3226, 27357], [20, 3217, 27615], [21, 3211, 26345], [22, 3221, 24870], [23, 3233, 23940], [24, 3259, 23022], [25, 3240, 24054], [26, 3233, 23565], [27, 3225, 23288], [28, 3234, 25676], [29, 3189, 27046], [30, 3210, 27321], [31, 3195, 25846], [32, 3211, 25746], [33, 3213, 24716], [34, 3204, 23594], [35, 3193, 24062], [36, 3204, 25274], [37, 3199, 26382], [38, 3210, 23970], [39, 3215, 24315], [40, 3218, 24787], [41, 3217, 23584], [42, 3214, 23325], [43, 3215, 23700], [44, 3371, 24232], [45, 3213, 23389], [46, 3205, 23311], [47, 3207, 22249], [48, 3225, 23307], [49, 3221, 24654], [50, 3233, 22936], [51, 3196, 22749], [52, 3201, 25193], [5

# Which day was the hottest day?

The manner in which this question is posed creates great uncertainty as to what it is we are trying to find out. Temperature is dependant upon location and we have data accross many different locations so what do we mean by the hottest day. For example it might be a very cold day in certain locations in Scotland while the same day might have record high temperatures in the South of the country. Simply getting an average of all readings accros multiple locations per day doesn't adequately answer this question. We will need to take the average of the 24 readings per day per site and then determine the highest average reading which will be for a given site which is part of a particular region and this reading will occur on a particular day.

It might be considered that Median is a better measure than average for the mean value since the median is really the mean which disregards outliers (making it more accurate). However in this scenario do we really want to disregard outliers when we are looking for the hottest day? 

The question as to what the temperature was is inextricably linked with the location as well as the time of day. However since we are relying on location we can look at the average temperature for a particular location.

The location we choose will be in a particular region.

# Using pyspark.sql

In [62]:


from pyspark.sql.functions import mean
from pyspark.sql import SparkSession
import pyspark.sql as py
#import org.apache.spark.sql.SQLContext
#import sqlContext.implicits._

#sqlContext = new SQLContext(sc)

spark = SparkSession.builder \
    .master("local") \
    .appName("Temperature") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df_parquet = spark.read.format("parquet").load(filename)

#df_parquet.groupBy("ObservationDate","SiteName").agg(mean("ScreenTemperature").alias("AvgTemperature")).orderBy("AvgTemperature", ascending=False).limit(1).show()

df_parquet.createOrReplaceTempView("ParquetTable")
#spark.sql("select ObservationDate, AVG(ScreenTemperature) as AverageTemperature, SiteName, Region from ParquetTable Group By ObservationDate, SiteName, Region Order By AverageTemperature DESC LIMIT 1").explain()
sparkSQL = spark.sql("select ObservationDate, AVG(ScreenTemperature) as AverageTemperature, SiteName, Region from ParquetTable Group By ObservationDate, SiteName, Region Order By AverageTemperature DESC")
sparkSQL.collect()
sparkSQL.show(1)
sparkSQL.show(200)
#sparkSQL.createOrReplaceTempView("ParquetTable2")
#sparkSQL2 = spark.sql("select count(distinct ObservationDate), first_value(ObservationDate) from ParquetTable2 where AverageTemperature > 10.0" )
#sparkSQL2.show()


+-------------------+------------------+--------------------+------------------+
|    ObservationDate|AverageTemperature|            SiteName|            Region|
+-------------------+------------------+--------------------+------------------+
|2016-02-21 00:00:00|12.666666666666664|EXETER AIRPORT (3...|South West England|
+-------------------+------------------+--------------------+------------------+
only showing top 1 row

+-------------------+------------------+--------------------+--------------------+
|    ObservationDate|AverageTemperature|            SiteName|              Region|
+-------------------+------------------+--------------------+--------------------+
|2016-02-21 00:00:00|12.666666666666664|EXETER AIRPORT (3...|  South West England|
|2016-02-21 00:00:00|           12.6625|    YEOVILTON (3853)|  South West England|
|2016-02-21 00:00:00| 12.64583333333333|     HEATHROW (3772)|London & South Ea...|
|2016-02-21 00:00:00|12.608695652173912|     NORTHOLT (3672)|London & Sou

We can see that the hottest day was on 21 Feb 2016 at Exeter Airport which is in South West England with an average temperature accross the day of 12.66 degrees. If we examine other records arranged by temperature we can see that the 1st of Feb is also quite a hot day as well. 