In [12]:
import duckdb
import os
import pandas as pd
import numpy as np
import re

from duckdb.typing import *

## Qulaity assessment

In [13]:
input_root_dir = '../data/'
output_root_dir = '../clean_data/'

# input_files_dirs = [os.path.join(input_root_dir,'2023')]
output_files_dirs = [os.path.join(output_root_dir,'2023')]

input_files_dirs = [os.path.join(input_root_dir,x) for x in os.listdir(input_root_dir)]
# output_files_dirs = [os.path.join(output_root_dir,x) for x in os.listdir(input_root_dir)]

In [14]:
metadata_headers = ['region','federative_unit','state','code','latitude','longitude','altitude','foundation_date']
column_names = ['date','time','total_precipitation','avg_atmospheric_pressure','max_atmospheric_pressure','min_atmospheric_pressure','global_radiation','avg_air_temperature','dew_point','max_temperature','min_temperature','max_dew_point','min_dew_point','max_relative_air_humidity','min_relative_air_humidity','relative_air_humidity','wind_direction','max_wind_gust','wind_speed']
# column_types = [VARCHAR,VARCHAR,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE]
# column_names = ['column00','column01','column02','column03','column04','column05','column06','column07','column08','column09','column10','column11','column12','column13','column14','column15','column16','column17','column18']


In [15]:
file_path = '../data/2022/INMET_CO_DF_A045_AGUAS EMENDADAS_01-01-2022_A_31-12-2022.CSV'

df = pd.read_csv(file_path,
                 encoding='ISO-8859-1',
                 delimiter=';',
                 skiprows=9,
                 decimal=',',
                 names=column_names,
                 usecols=column_names, # We have to specify `usecols, because the files have a trailing ;, which causes an extra column to be created`
                 na_values=-9999,)

df.drop_duplicates(subset=['date', 'time'])


Unnamed: 0,date,time,total_precipitation,avg_atmospheric_pressure,max_atmospheric_pressure,min_atmospheric_pressure,global_radiation,avg_air_temperature,dew_point,max_temperature,min_temperature,max_dew_point,min_dew_point,max_relative_air_humidity,min_relative_air_humidity,relative_air_humidity,wind_direction,max_wind_gust,wind_speed
0,2022/01/01,0000 UTC,0.0,897.1,897.1,896.3,,19.7,18.4,19.8,19.6,18.4,18.1,92.0,90.0,92.0,24.0,4.1,2.0
1,2022/01/01,0100 UTC,0.0,897.8,897.9,897.2,,19.6,18.0,19.8,19.5,18.4,18.0,92.0,90.0,90.0,21.0,4.3,2.0
2,2022/01/01,0200 UTC,0.0,897.7,897.9,897.7,,18.9,17.6,19.6,18.9,18.0,17.6,92.0,90.0,92.0,31.0,3.9,1.7
3,2022/01/01,0300 UTC,0.0,897.3,897.7,897.3,,18.6,17.5,18.9,18.6,17.6,17.4,93.0,92.0,93.0,25.0,4.2,2.1
4,2022/01/01,0400 UTC,0.0,896.7,897.3,896.7,,18.8,17.7,18.8,18.6,17.7,17.5,94.0,93.0,93.0,26.0,4.2,1.8
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8755,2022/12/31,1900 UTC,0.0,898.7,899.7,898.6,1088.3,25.7,18.6,25.7,25.2,20.6,18.4,75.0,64.0,65.0,98.0,3.3,1.0
8756,2022/12/31,2000 UTC,0.0,898.5,898.9,898.5,1001.6,25.8,18.1,26.4,25.6,19.0,17.2,67.0,57.0,63.0,46.0,3.0,0.8
8757,2022/12/31,2100 UTC,0.0,899.3,899.3,898.5,334.9,23.8,19.2,26.1,23.8,19.3,17.2,75.0,60.0,75.0,5.0,2.8,0.7
8758,2022/12/31,2200 UTC,0.0,899.8,899.8,899.2,43.4,21.3,18.1,23.8,21.3,19.0,17.0,84.0,70.0,82.0,160.0,1.7,0.4


In [24]:
def parse_time(time: str) -> str:
    if re.match('^([01][\d]|2[0-3])([0-5][\d]) UTC$', time) is not None:
        time = time[:2] + ':' + time[2:4]

    return time+':00'


def parse_date(date: str) -> str:
    if re.match('^[\d]{4}/[\d]{2}/[\d]{2}$', date):
        return date.replace('/', '-')
    if re.match('^[\d]{2}/[\d]{2}/[\d]{2}$', date):
        date = date.split('/')
        date[0], date[-1] = '20'+date[-1], date[0]
        return '-'.join(date)

    return date


def parse_float(value: str) -> float:
    if not re.match('[\d\-,]', value):
        return np.nan
    return float(value.replace(',', '.'))


def read_csv(file_path: str) -> pd.DataFrame:
    return pd.read_csv(file_path,
                       encoding='ISO-8859-1',
                       delimiter=';',
                       skiprows=9,
                       decimal=',',
                       names=column_names,
                       # We have to specify `usecols`, because the files have a trailing ;, which causes an extra column to be created`
                       usecols=column_names,
                       na_values=-9999,)


def exclude_null_rows(df: pd.DataFrame) -> pd.DataFrame:
    return duckdb.sql("SELECT * FROM df EXCEPT SELECT * FROM df WHERE COLUMNS(* EXCLUDE (date,time)) IS NULL").to_df()


In [17]:
try:
    duckdb.create_function('parse_time', parse_time, [VARCHAR], VARCHAR)
except duckdb.NotImplementedException:
    pass

try:
    duckdb.create_function('parse_date', parse_date, [VARCHAR], VARCHAR)
except duckdb.NotImplementedException:
    pass

In [18]:
duckdb.sql(f"SELECT parse_time(time) AS time,COLUMNS(* EXCLUDE time) FROM df WHERE COLUMNS(* EXCLUDE (date,time)) IS NULL ORDER BY date, time")

┌──────────┬────────────┬─────────────────────┬───┬──────────────────────┬────────────────┬───────────────┬────────────┐
│   time   │    date    │ total_precipitation │ … │ relative_air_humid…  │ wind_direction │ max_wind_gust │ wind_speed │
│ varchar  │  varchar   │       double        │   │        double        │     double     │    double     │   double   │
├──────────┼────────────┼─────────────────────┼───┼──────────────────────┼────────────────┼───────────────┼────────────┤
│ 23:00:00 │ 2022/12/08 │                NULL │ … │                 NULL │           NULL │          NULL │       NULL │
│ 00:00:00 │ 2022/12/09 │                NULL │ … │                 NULL │           NULL │          NULL │       NULL │
│ 01:00:00 │ 2022/12/09 │                NULL │ … │                 NULL │           NULL │          NULL │       NULL │
│ 02:00:00 │ 2022/12/09 │                NULL │ … │                 NULL │           NULL │          NULL │       NULL │
│ 01:00:00 │ 2022/12/10 │       

In [19]:
df = duckdb.sql("SELECT * FROM df EXCEPT SELECT * FROM df WHERE COLUMNS(* EXCLUDE (date,time)) IS NULL").to_df()

In [20]:
df = duckdb.sql("""
           SELECT concat(parse_date(date),' ',parse_time(time))::DATETIME AS datetime,
           COLUMNS(* EXCLUDE (wind_direction,max_relative_air_humidity,min_relative_air_humidity,relative_air_humidity,time,date,wind_speed)),
           max_relative_air_humidity::UTINYINT AS max_relative_air_humidity,
           min_relative_air_humidity::UTINYINT AS min_relative_air_humidity,
           relative_air_humidity::UTINYINT AS relative_air_humidity,
           wind_direction::USMALLINT AS wind_direction,
           wind_speed
           FROM df""").to_df()

In [21]:
stations = {}
failed_files = []
header_keys = ['region', 'federative_unit', 'name', 'code', 'latitude', 'longitude', 'altitude', 'foundation_year']

In [22]:
for input_files_dir in input_files_dirs:
    for file_name in os.listdir(input_files_dir):
        file_path = os.path.join(input_files_dir,file_name)
        station_measurements = read_csv(file_path)
        station_measurements.drop_duplicates(subset=['date', 'time'])
        station_measurements = exclude_null_rows(station_measurements)
        if station_measurements.size != 0:
            with open(file_path, 'r', encoding='iso-8859-1') as f:
                file_header = {k:v for k, v in zip(header_keys, [f.readline().strip().split(':;')[-1] for _ in range(8)])}
            if stations.get(file_header['code']) == None:
                file_header['latitude'] = parse_float(file_header['latitude'])
                file_header['longitude'] = parse_float(file_header['longitude'])
                file_header['altitude'] = parse_float(file_header['altitude'])
                file_header['foundation_year'] = parse_date(file_header['foundation_year'])
                station_code = file_header.pop('code')
                stations[station_code] = file_header


processing ../data/2000/INMET_CO_DF_A001_BRASILIA_07-05-2000_A_31-12-2000.CSV
processing ../data/2000/INMET_NE_BA_A401_SALVADOR_13-05-2000_A_31-12-2000.CSV
processing ../data/2000/INMET_N_AM_A101_MANAUS_09-05-2000_A_31-12-2000.CSV
processing ../data/2000/INMET_SE_RJ_A601_ECOLOGIA AGRICOLA_07-05-2000_A_31-12-2000.CSV
processing ../data/2000/INMET_S_RS_A801_PORTO ALEGRE_22-09-2000_A_31-12-2000.CSV
processing ../data/2001/INMET_CO_DF_A001_BRASILIA_01-01-2001_A_31-12-2001.CSV
processing ../data/2001/INMET_CO_GO_A002_GOIANIA_29-05-2001_A_31-12-2001.CSV
processing ../data/2001/INMET_CO_GO_A003_MORRINHOS_25-05-2001_A_31-12-2001.CSV
processing ../data/2001/INMET_CO_MS_A702_CAMPO GRANDE_10-09-2001_A_31-12-2001.CSV
processing ../data/2001/INMET_CO_MS_A703_PONTA PORA_07-09-2001_A_31-12-2001.CSV
processing ../data/2001/INMET_CO_MS_A704_TRES LAGOAS_03-09-2001_A_31-12-2001.CSV
processing ../data/2001/INMET_NE_BA_A401_SALVADOR_01-01-2001_A_31-12-2001.CSV
processing ../data/2001/INMET_NE_BA_A402_BARRE

In [23]:
pd.DataFrame.from_dict(stations, orient='index')

Unnamed: 0,region,federative_unit,name,latitude,longitude,altitude,foundation_year
A001,CO,DF,BRASILIA,-15.789444,-47.925833,1159.54,2000-05-07
A401,NE,BA,SALVADOR,-13.016667,-38.516667,51.41,2000-05-13
A101,N,AM,MANAUS,-3.103333,-60.016389,61.25,2000-05-09
A601,SE,RJ,ECOLOGIA AGRICOLA,-22.800000,-43.683333,33.00,2000-05-07
A801,S,RS,PORTO ALEGRE,-30.050000,-51.166667,46.97,2000-09-22
...,...,...,...,...,...,...,...
A256,N,PA,SANTA MARIA DAS BARREIRAS,-8.729722,-49.856389,165.00,2020-12-18
A560,SE,MG,POMPEU,-19.232500,-44.964167,705.00,2015-08-21
A245,N,PA,PORTO DE MOZ,-1.821944,-52.111667,19.00,2022-04-07
A637,SE,RJ,Paty do Alferes - Avelar,-22.347222,-43.417778,508.00,2022-11-10
