## DATA ENGINEER PYTHON TEST

Convert the weather data into parquet format. Set the row group to the appropriate value you see fit
for this data
The converted data should be able to answer the following question.
- Which date was the hottest day?
- What was the temperature on that day?
- In which region was the hottest day?

Please provide the source code, tests, documentations and any assumptions you have made.

Note: We are looking for the candidate’s “Data Engineering” ability, not just the Python programming skills.

The weather data is provided separately

Copying the weather datasets into local directory

In [0]:
dbutils.fs.cp("/FileStore/tables/weather_20160301.csv","file:/input/weather_20160301.csv")
dbutils.fs.cp("/FileStore/tables/weather_20160201.csv","file:/input/weather_20160201.csv")

Checking the directory to validate that the files have been copied

In [0]:
%sh

ls /input

Reading the files as a pandas dataframe to perform initial analysis

In [0]:
import glob
import pandas as pd

files = glob.glob("/input/weather_*.csv")

def read_csv(filename):
    return pd.read_csv(
        filename,
        parse_dates=["ObservationDate"],
        index_col=False,
        infer_datetime_format=True,
        true_values=["Y"],
        false_values=["N"],
    )
dfs = list(map(read_csv, files))
df = pd.concat(dfs)
df.describe

Looking at the problem statement - we need to set disjunct values per rowgroup. This means without reading the full data we should be able to know which values to expect in which rowgroup.
Since we have data for 2 months - The appropriate row group wouldn't make much sense to be set at month level - this would be quite broad.

Also the problem statement asks us to find out the below -
1) hottest temperature
2) The day for the above 
3) The region for the above

Two possible options to set the rowgroup would be at weekly level or at the Region level - looking at the data granularity . 
I would proceed with the Region level - because the problem statement asks to calulate for the region of the hottest day as well - and leveraging predicate pushdown the parquet reader can look at the rowgroup statistics, compare the predicate max value against it and only read the parts of the file that potentially include the required row.

In [0]:
[df['Region'].unique()]

In [0]:
import pyarrow as pa
import pyarrow.parquet as pq

regions = df['Region'].unique().tolist() # converting the distinct regions from the dataframe to a list

def read_csv(region):
  li =[]
  path = r'/input/' # using my local path
  all_files = glob.glob(path + "/*.csv")
  for filename in all_files:  
    df = pd.read_csv(
        filename,
        parse_dates=["ObservationDate"],
        index_col=False,
        infer_datetime_format=True,
        true_values=["Y"],
        false_values=["N"],
        header=0
    )
    li.append(df)
  frame = pd.concat(li, axis=0, ignore_index=True)
  a = frame[(frame['Region']==str(region))]
  print (a['Region'].unique())
  return a

 
dfs = list(map(read_csv, regions))
  

table = pa.Table.from_pandas(dfs[0], preserve_index=False)
writer = pq.ParquetWriter('/input/weatherdata_2016-rowgroups.parquet', table.schema)

for df in dfs:
    table = pa.Table.from_pandas(df, preserve_index=False).replace_schema_metadata()
    writer.write_table(table)
writer.close()
print ('Parquet File created')


We can look at the local directory to see the parquet file weatherdata_2016-rowgroups created

In [0]:
%sh

ls /input

Looking at the statistics - we can see 16 rowgroups have been created - each for a particular region

In [0]:
filename = "/input/weatherdata_2016-rowgroups.parquet"
pq_file = pq.ParquetFile(filename)

data = [["columns:", pq_file.metadata.num_columns],
        ["rows:", pq_file.metadata.num_rows],
        ["row_roups:", pq_file.metadata.num_row_groups]
        ]

print(data)

## Data checks

Checking the schema

In [0]:
s = pq_file.metadata.schema
data = [[s.column(i).name, s.column(i).physical_type,  s.column(i).logical_type] for i in range(len(s))]
data

In [0]:
def sizeof_fmt(num, suffix='B'): # fucntion to calculate size
    for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
        if abs(num) < 1024.0:
            return "%3.1f%s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.1f%s%s" % (num, 'Yi', suffix)


Checking the size of the data per row groups

In [0]:
s = pq_file.metadata.schema
data = []
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, rg_meta.num_rows, sizeof_fmt(rg_meta.total_byte_size)])
data    

Checking the metrics for the field Region per row groups

In [0]:
column = 13 #Region
data = [["rowgroup", "Region"]]
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, str(rg_meta.column(column).statistics.min)])
data

Checking the metrics for the field ScreenTemperature per row groups

In [0]:
column = 7 #ScreenTemperature
data = [["rowgroup", "min", "max"]]
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, str(rg_meta.column(column).statistics.min), str(rg_meta.column(column).statistics.max)])
data

Checking the metadata and statistical information about the values stored in the rowgroup 1 (which has the max temperature from above statistics) for the particular column.
Also from above we know Row group - 1 is Highland & Eilean Siar.

In [0]:
rg_meta = pq_file.metadata.row_group(1)
rg_meta.column(column)

Finally reading the parquet file and querying for the hottest day.

In [0]:
df = pd.read_parquet("/input/weatherdata_2016-rowgroups.parquet", engine="pyarrow")
df.loc[df['ScreenTemperature'].idxmax()]

Q - Which date was the hottest day?

In [0]:
df.loc[df['ScreenTemperature'].idxmax(), ['ObservationDate']]

Q - What was the temperature on that day?

In [0]:
df.loc[df['ScreenTemperature'].idxmax(), ['ScreenTemperature']]

Q - In which region was the hottest day?

In [0]:
df.loc[df['ScreenTemperature'].idxmax(), ['Region']]

## Unit Testing


For unit testing We will read the datasets in Spark and query for the above asked questions

In [0]:
# File location and type
file_location1 = "/FileStore/tables/weather_20160201.csv"
file_location2 = "/FileStore/tables/weather_20160301.csv"

file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

df1 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location1)

df2 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location2)


In [0]:
df = df1.union(df2) # joining both dataframes
temp_table_name = "weather_2016_csv"

df.createOrReplaceTempView(temp_table_name)

Which date was the hottest day?

In [0]:
%sql

SELECT ObservationDate 
FROM `weather_2016_csv` where  ScreenTemperature = (SELECT MAX(CAST(ScreenTemperature AS DECIMAL(6,2))) FROM `weather_2016_csv`);





ObservationDate
2016-03-17T00:00:00


What was the temperature on that day?

In [0]:
%sql

SELECT MAX(CAST(ScreenTemperature AS DECIMAL(6,2))) as max_temp FROM `weather_2016_csv`;



max_temp
15.8


In which region was the hottest day?

In [0]:
%sql

SELECT Region 
FROM `weather_2016_csv` where  ScreenTemperature = (SELECT MAX(CAST(ScreenTemperature AS DECIMAL(6,2))) FROM `weather_2016_csv`);



Region
Highland & Eilean Siar


## Assumptions Made

1) There are only 2 months of data present - and hence no provisions have been made to create row groups at a month level.

2) The total number of regions across all datasets is 16 .

3) ScreenTemperature denotes actual temperature to calculate the hottest day.