# Import Libraries

In [9]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import glob
import dask.dataframe as dd

# Function to read CSV files
This can be more enhanced using parameters if needed rather than writing new code

In [16]:
def read_csv(filename):
    return pd.read_csv(
        filename
    )

# Start of main analysis

Reading the entire forlder for CSV files

In [15]:
files = glob.glob("data/*.csv")
files

['data\\weather.20160201.csv', 'data\\weather.20160301.csv']

Mapping all the CSV files to our function

In [18]:
dfs = list(map(read_csv, files))
dfs

[       ForecastSiteCode  ObservationTime      ObservationDate  WindDirection  \
 0                  3002                0  2016-02-01T00:00:00             12   
 1                  3005                0  2016-02-01T00:00:00             10   
 2                  3008                0  2016-02-01T00:00:00              8   
 3                  3017                0  2016-02-01T00:00:00              6   
 4                  3023                0  2016-02-01T00:00:00             10   
 ...                 ...              ...                  ...            ...   
 93250              3797               23  2016-02-29T00:00:00              8   
 93251              3866               23  2016-02-29T00:00:00             11   
 93252              3872               23  2016-02-29T00:00:00             10   
 93253              3876               23  2016-02-29T00:00:00             11   
 93254              3882               23  2016-02-29T00:00:00             10   
 
        WindSpeed  WindGus

Use the first table to create schema for the writer

In [19]:
table = pa.Table.from_pandas(dfs[0], preserve_index=False)
writer = pq.ParquetWriter('weather-rowgroups.parquet', table.schema)

In [21]:
table

pyarrow.Table
ForecastSiteCode: int64
ObservationTime: int64
ObservationDate: string
WindDirection: int64
WindSpeed: int64
WindGust: double
Visibility: double
ScreenTemperature: double
Pressure: double
SignificantWeatherCode: int64
SiteName: string
Latitude: double
Longitude: double
Region: string
Country: string

Using Writer and the dataframes to create table

In [22]:
for df in dfs:
    table = pa.Table.from_pandas(df, preserve_index=False)
    writer.write_table(table)
writer.close()

Some analysis on the parquet file and its row groups to identify characteristics of our data structure

In [23]:
filename = "weather-rowgroups.parquet"
pq_file = pq.ParquetFile(filename)

In [24]:
data = []
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, rg_meta.num_rows, rg_meta.total_byte_size])
data

[[0, 93255, 537181], [1, 101442, 560608]]

In [26]:
# To get number of rows
pq_file.metadata.num_rows

194697

In [27]:
# To get number of columns
pq_file.metadata.num_columns

15

In [28]:
# To get metadata of column
rg_meta.column(7)

<pyarrow._parquet.ColumnChunkMetaData object at 0x000001DD36E0C130>
  file_offset: 913160
  file_path: 
  physical_type: DOUBLE
  num_values: 101442
  path_in_schema: ScreenTemperature
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x000001DD36E0CD60>
      has_min_max: True
      min: -99.0
      max: 15.8
      null_count: 0
      distinct_count: 0
      num_values: 101442
      physical_type: DOUBLE
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 810485
  data_page_offset: 811425
  total_compressed_size: 102675
  total_uncompressed_size: 103687

Find min and max statistics of a column for each row group

In [29]:
column = 7
data = [["rowgroup", "min", "max"]]

for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, str(rg_meta.column(column).statistics.min), str(rg_meta.column(column).statistics.max)])
    
print(data)


[['rowgroup', 'min', 'max'], [0, '-99.0', '15.6'], [1, '-99.0', '15.8']]


In [30]:
rg_meta.column(column).statistics.max

15.8

Using the maximum tempreture to filter our tour data and columns to avoid fetching extra data and limit the load to what we really need.

In [10]:
df = dd.read_parquet("weather-rowgroups.parquet", columns=['ObservationDate', 'Region', 'ScreenTemperature'])




In [11]:
df = df[df.ScreenTemperature == 15.8]

In [12]:
df.compute()

Unnamed: 0,ObservationDate,Region,ScreenTemperature
147768,2016-03-17T00:00:00,Highland & Eilean Siar,15.8


# Result

Hottest day =  2016-03-17T00:00:00	

Tempreture on that day = 15.8

Region = Highland & Eilean Siar	
