#### **Trabalho de Processamento de Big Data | Licenciatura em Ciência de Dados 2023/24 | CDB1**

Docente: João Oliveira<br><br>

- David Franco, nº110733

- Felipe Pereira, nº110861

- João Dias, nº110305

- Samuel Ricardo, nº110884<br><br>

https://www.kaggle.com/datasets/PROPPG-PPG/hourly-weather-surface-brazil-southeast-region

*Version 9 (10.11 GB) | Created by John Holz | Data Update 2023/01/30*

# Data Import and Cleaning

## Problem formulation

Neste notebook, procedemos à realização das operações do processo ETL (extração, transformação e processamento). Este processo é essencial para fazer a integração dos dados, sendo organizado e concatenado num único conjunto de dados que esteja pronto para armazenamento e análise numa plataforma de big data.

No nosso caso, os dados serão armazenados localmente no formato parquet, sendo esta uma forma eficiente para leitura e escrita de grandes volumes de dados. Os principais pontos são:

- `Extração`: Importar os dados do dataset escolhido *(Climate Weather Surface of Brazil - Hourly)*.
- `Transformação`: Limpar os dados, tratar valores omissos, realizar ajustes para garantir a qualidade dos dados.
- `Processamento`: Organizar e armazenar os dados trabalhados no formato parquet de forma a poderem ser utilizados nos modelos de machine learning que iremos desenvolver nas etapas seguintes.

Este processo garante que os dados utilizados nos modelos de previsão de temperatura e precipitação sejam de alta qualidade e estejam prontos para análises detalhadas e precisas.

### Setup inicial

In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col, count, date_format, sum as spark_sum, when

import pandas as pd  
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")

In [2]:
# Create the Spark session

spark = SparkSession.builder \
    .appName("WeatherBrazil") \
    .config("spark.sql.shuffle.partitions", 512) \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [3]:
spark

### Utils

In [4]:
def compute_nulls_and_uniques(df, cols):
    total = df.count()
    results = []
    for cl in cols:
        knulls = df.select(cl).filter(F.col(cl).isNull()).count()
        knullsperc = knulls / total
        knans = df.select(cl).filter(F.isnan(cl)).count()
        knansperc = knans / total
        kuniques = df.select(cl).distinct().count()
        kuniquesperc = kuniques / total
        results.append(Row(feature = cl, count_nulls = knulls, percentage_nulls = knullsperc,
                           count_nans = knans, percentage_nans = knansperc,
                           count_uniques = kuniques, percentage_uniques = kuniquesperc))

    return spark.createDataFrame(results)

Esta função calcula e retorna o número de valores nulos, NaNs e únicos para cada coluna num DataFrame.

### Coleção de dados inicial

In [5]:
pwd

'/home/jovyan/code/trabalho'

In [6]:
! head -n 3 ./datasets_originais/central_west.csv

"index","Data","Hora","PRECIPITAÇÃO TOTAL, HORÁRIO (mm)","PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)","PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)","PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)","RADIACAO GLOBAL (Kj/m²)","TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)","TEMPERATURA DO PONTO DE ORVALHO (°C)","TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)","UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)","UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)","UMIDADE RELATIVA DO AR, HORARIA (%)","VENTO, DIREÇÃO HORARIA (gr) (° (gr))","VENTO, RAJADA MAXIMA (m/s)","VENTO, VELOCIDADE HORARIA (m/s)","region","state","station","station_code","latitude","longitude","height"
138998,"2017-12-20","14:00",0.0,899.6,900.0,899.6,3391,26.5,17.7,26.5,24.4,18.3,16.5,65,57,59,39,9.6,3.9,CO,DF,PARANOA (COOPA-DF),A047,-16.01111111,-47.5575,1043.0
138999,"2017-12-20

In [7]:
! head -n 3 ./datasets_originais/north.csv

"index","Data","Hora","PRECIPITAÇÃO TOTAL, HORÁRIO (mm)","PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)","PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)","PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)","RADIACAO GLOBAL (Kj/m²)","TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)","TEMPERATURA DO PONTO DE ORVALHO (°C)","TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)","UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)","UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)","UMIDADE RELATIVA DO AR, HORARIA (%)","VENTO, DIREÇÃO HORARIA (gr) (° (gr))","VENTO, RAJADA MAXIMA (m/s)","VENTO, VELOCIDADE HORARIA (m/s)","region","state","station","station_code","latitude","longitude","height"
0,"2000-05-09","00:00",-9999.0,-9999.0,-9999.0,-9999.0,-9999,-9999.0,-9999.0,-9999.0,-9999.0,-9999.0,-9999.0,-9999,-9999,-9999,-9999,-9999.0,-9999.0,N,AM,MANAUS,A101,-3.10333333,-60.

In [8]:
! head -n 3 ./datasets_originais/northeast.csv

"index","Data","Hora","PRECIPITAÇÃO TOTAL, HORÁRIO (mm)","PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)","PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)","PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)","RADIACAO GLOBAL (Kj/m²)","TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)","TEMPERATURA DO PONTO DE ORVALHO (°C)","TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)","UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)","UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)","UMIDADE RELATIVA DO AR, HORARIA (%)","VENTO, DIREÇÃO HORARIA (gr) (° (gr))","VENTO, RAJADA MAXIMA (m/s)","VENTO, VELOCIDADE HORARIA (m/s)","region","state","station","station_code","latitude","longitude","height"
0,"2000-05-13","00:00",-9999.0,-9999,-9999.0,-9999.0,-9999,-9999.0,-9999.0,-9999.0,-9999.0,-9999.0,-9999.0,-9999,-9999,-9999,-9999,-9999.0,-9999.0,NE,BA,SALVADOR,A401,-13.01666666,-3

In [9]:
! head -n 3 ./datasets_originais/south.csv

"index","Data","Hora","PRECIPITAÇÃO TOTAL, HORÁRIO (mm)","PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)","PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)","PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)","RADIACAO GLOBAL (Kj/m²)","TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)","TEMPERATURA DO PONTO DE ORVALHO (°C)","TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)","UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)","UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)","UMIDADE RELATIVA DO AR, HORARIA (%)","VENTO, DIREÇÃO HORARIA (gr) (° (gr))","VENTO, RAJADA MAXIMA (m/s)","VENTO, VELOCIDADE HORARIA (m/s)","region","state","station","station_code","latitude","longitude","height"
0,"2000-09-22","00:00",-9999.0,-9999.0,-9999.0,-9999.0,-9999,-9999.0,-9999.0,-9999.0,-9999.0,-9999.0,-9999.0,-9999,-9999,-9999,-9999,-9999.0,-9999.0,S,RS,PORTO ALEGRE,A801,-30.05,-51

In [10]:
! head -n 3 ./datasets_originais/southeast.csv

"index","Data","Hora","PRECIPITAÇÃO TOTAL, HORÁRIO (mm)","PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)","PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)","PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)","RADIACAO GLOBAL (Kj/m²)","TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)","TEMPERATURA DO PONTO DE ORVALHO (°C)","TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)","TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)","UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)","UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)","UMIDADE RELATIVA DO AR, HORARIA (%)","VENTO, DIREÇÃO HORARIA (gr) (° (gr))","VENTO, RAJADA MAXIMA (m/s)","VENTO, VELOCIDADE HORARIA (m/s)","region","state","station","station_code","latitude","longitude","height"
0,"2000-05-07","00:00",-9999.0,-9999.0,-9999.0,-9999.0,-9999,-9999.0,-9999.0,-9999.0,-9999.0,-9999.0,-9999.0,-9999,-9999,-9999,-9999,-9999.0,-9999.0,SE,RJ,ECOLOGIA AGRICOLA,A601,-22.

In [11]:
! head -n 3 ./datasets_originais/stations.csv

"station","region","state","station_code","first date","height","longitude","latitude"
QUEIMADAS,NE,BA,A436,"2008-05-23",315.0,-39.61694443,-10.984645
MACAU,NE,RN,A317,"2007-01-06",32.0,-36.57305554,-5.15111111


Estes comandos exibem as 3 primeiras linhas de cada arquivo. Isto é útil para visualizar a estrutura e os dados iniciais.

### Leitura dos datasets

In [12]:
data_dir = data_dir = "./datasets_originais/"

In [13]:
file_path = data_dir + "central_west.csv"
df_central_west = spark.read.csv(file_path, header="true", inferSchema="true", sep=',')

In [14]:
file_path = data_dir + "north.csv"
df_north = spark.read.csv(file_path, header="true", inferSchema="true", sep=',')

In [15]:
file_path = data_dir + "northeast.csv"
df_northeast = spark.read.csv(file_path, header="true", inferSchema="true", sep=',')

In [16]:
file_path = data_dir + "south.csv"
df_south = spark.read.csv(file_path, header="true", inferSchema="true", sep=',')

In [17]:
file_path = data_dir + "southeast.csv"
df_southeast = spark.read.csv(file_path, header="true", inferSchema="true", sep=',')

In [18]:
file_path = data_dir + "stations.csv"
df_stations = spark.read.csv(file_path, header="true", inferSchema="true", sep=',')

Leram-se os arquivos com as informações meteorológicas relativas às regiões do Brasil.

### Verificar os dados e a estrutura

#### Tratamento do Central West

In [19]:
# Schema, show e count do DataFrame de Central West 

df_central_west.printSchema()
df_central_west.show(5, truncate=False) 
df_central_west.count()

root
 |-- index: integer (nullable = true)
 |-- Data: date (nullable = true)
 |-- Hora: timestamp (nullable = true)
 |-- PRECIPITAÇÃO TOTAL, HORÁRIO (mm): double (nullable = true)
 |-- PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- RADIACAO GLOBAL (Kj/m²): integer (nullable = true)
 |-- TEMPERATURA DO AR - BULBO SECO, HORARIA (°C): double (nullable = true)
 |-- TEMPERATURA DO PONTO DE ORVALHO (°C): double (nullable = true)
 |-- TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- UMIDADE REL. MAX. NA HORA ANT. (AUT) (%): integer (nullable = t

11427120

Estes comandos são usados para exibir a estrutura, as primeiras 5 linhas e o total de linhas do dataframe, proporcionando uma visão rápida da estrutura, dos dados iniciais e da quantidade de linhas do dataframe.

In [20]:
# renomear as colunas
new_column_names = [
    "Index",
    "Date",
    "Hour",
    "TotalHourlyPrecipitationMm",
    "HourlyStationLevelAtmosphericPressureMb",
    "LastHourMaxAtmosphericPressureMb",
    "LastHourMinAtmosphericPressureMb",
    "GlobalRadiationKjM2",
    "HourlyDryBulbAirTemperatureC",
    "DewPointTemperatureC",
    "LastHourMaxTemperatureC",
    "LastHourMinTemperatureC",
    "LastHourMaxDewPointTemperatureC",
    "LastHourMinDewPointTemperatureC",
    "LastHourMaxRelativeHumidityPercentage",
    "LastHourMinRelativeHumidityPercentage",
    "HourlyRelativeHumidityPercentage",
    "HourlyWindDirectionRadiusDegrees",
    "MaximumWindGustMs",
    "HourlyWindSpeedMs",
    "Region",
    "State",
    "Station",
    "StationCode",
    "Latitude",
    "Longitude",
    "Height"
]

df_central_west = df_central_west.toDF(*new_column_names)

Foram renomeadas as colunas de forma a obedecer a uma estrutura que não implique com a leitura das mesmas.

In [21]:
df_central_west = df_central_west.withColumn("Hour", date_format("Hour", "HH:mm"))

Este comando ajusta o tipo de dados da coluna "Hora" , removendo a parte da data que foi adicionada durante a importação para o Spark e mantendo apenas o formato de hora no estilo "HH:mm".

In [23]:
cols_to_check = [col for col in df_central_west.columns if col not in ["Index", "Date", "Hour"]]

df_central_west_nulls_uniques = compute_nulls_and_uniques(df_central_west, cols_to_check)

Este chunk de código cria uma lista de colunas `cols_to_check`, excluindo as colunas "Index", "Date" e "Hour", e em seguida, chama a função `compute_nulls_and_uniques` para calcular valores nulos e únicos nas colunas selecionadas.

In [24]:
df_central_west_nulls_uniques.show(n=30)

+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|             feature|count_nulls|percentage_nulls|count_nans|percentage_nans|count_uniques|  percentage_uniques|
+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|TotalHourlyPrecip...|          0|             0.0|         0|            0.0|          370| 3.23791121472427E-5|
|HourlyStationLeve...|          0|             0.0|         0|            0.0|         1605|1.404553378279041...|
|LastHourMaxAtmosp...|          0|             0.0|         0|            0.0|         1701|1.488564047634049...|
|LastHourMinAtmosp...|          0|             0.0|         0|            0.0|         1692|1.480688047382017...|
| GlobalRadiationKjM2|          0|             0.0|         0|            0.0|        29575|0.002588141193931...|
|HourlyDryBulbAirT...|          0|             0.0|         0|            0.0|          

Exibe as primeiras 30 linhas do dataframe `df_central_west_nulls_uniques`, fornecendo informações sobre valores nulos e únicos para as colunas selecionadas.

In [15]:
# verificar as instâncias duplicadas
df_central_west.dropDuplicates().count()

11427120

Uma vez que este valor se manteve igual, não foram encontradas instâncias duplicadas.

In [22]:
# summary stats para verificar outliers
cols_num = [
 'TotalHourlyPrecipitationMm',
 'HourlyStationLevelAtmosphericPressureMb',
 'LastHourMaxAtmosphericPressureMb',
 'LastHourMinAtmosphericPressureMb',
 'GlobalRadiationKjM2',
 'HourlyDryBulbAirTemperatureC',
 'DewPointTemperatureC',
 'LastHourMaxTemperatureC',
 'LastHourMinTemperatureC',
 'LastHourMaxDewPointTemperatureC',
 'LastHourMinDewPointTemperatureC',
 'LastHourMaxRelativeHumidityPercentage',
 'LastHourMinRelativeHumidityPercentage',
 'HourlyRelativeHumidityPercentage',
 'HourlyWindDirectionRadiusDegrees',
 'MaximumWindGustMs',
 'HourlyWindSpeedMs',
 'Height']
summary_stats = df_central_west.select(cols_num).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,11427120,-1783.1188566495225,3827.688372063957,-9999.0,0.0,0.0,0.0,96.0
HourlyStationLevelAtmosphericPressureMb,11427120,-530.6894289287202,3750.987412483837,-9999.0,923.3,955.3,974.1,1028.8
LastHourMaxAtmosphericPressureMb,11427120,-534.5709728873072,3755.4394535228616,-9999.0,923.4,955.6,974.4,1030.6
LastHourMinAtmosphericPressureMb,11427120,-536.0151633744968,3756.223400667947,-9999.0,922.8,954.9,973.8,1028.1
GlobalRadiationKjM2,11427120,-4589.726086800523,5970.79602161803,-9999.0,-9999.0,-9999.0,1125.0,48898.0
HourlyDryBulbAirTemperatureC,11427120,-1281.0137210425646,3373.2470075854244,-9999.0,19.2,22.9,26.9,45.0
DewPointTemperatureC,11427120,-1463.8416488844114,3555.3765012405474,-9999.0,10.6,17.3,20.5,44.8
LastHourMaxTemperatureC,11427120,-1286.5708290540117,3380.2724200282432,-9999.0,19.7,23.5,27.9,45.0
LastHourMinTemperatureC,11427120,-1288.5510757040934,3380.611551346321,-9999.0,18.6,22.4,26.0,45.0
LastHourMaxDewPointTemperatureC,11427120,-1465.9391088830844,3558.14952559601,-9999.0,11.3,17.9,21.0,44.9


Aqui são calculadas as *summary stats* para as colunas numéricas, com o objetivo de verificar a presença de outliers.

In [24]:
# Inicializa uma lista para armazenar os resultados
resultados = []

# Itera sobre todas as colunas e conta as ocorrências de -9999
for coluna in cols_num:
    contagem = df_central_west.select(spark_sum(when(col(coluna) == -9999, 1).otherwise(0)).alias(coluna)).collect()[0][coluna]
    resultados.append((coluna, contagem))

# Cria um DataFrame a partir dos resultados
resultado_df = spark.createDataFrame(resultados, ["Coluna", "Contagem_de_-9999"])

# Mostra o DataFrame de resultados
resultado_df.show()

+--------------------+-----------------+
|              Coluna|Contagem_de_-9999|
+--------------------+-----------------+
|TotalHourlyPrecip...|          2037944|
|HourlyStationLeve...|          1550070|
|LastHourMaxAtmosp...|          1554352|
|LastHourMinAtmosp...|          1555323|
| GlobalRadiationKjM2|          6074754|
|HourlyDryBulbAirT...|          1488024|
|DewPointTemperatureC|          1689639|
|LastHourMaxTemper...|          1495074|
|LastHourMinTemper...|          1495926|
|LastHourMaxDewPoi...|          1692594|
|LastHourMinDewPoi...|          1699292|
|LastHourMaxRelati...|          1683063|
|LastHourMinRelati...|          1688184|
|HourlyRelativeHum...|          1678001|
|HourlyWindDirecti...|          1698790|
|   MaximumWindGustMs|          1622867|
|   HourlyWindSpeedMs|          1610863|
|              Height|                0|
+--------------------+-----------------+



Verificámos que havia muitas observações com o valor -9999, que foram usadas sobre os valores omissos. No entanto, para garantir que esses valores não influenciem negativamente a estrutura e o padrão da nossa regressão, decidimos removê-los. No nosso caso, vamos proceder à eliminação da coluna `GlobalRadiation` e, nas outras colunas, vamos eliminar todas as observações que contêm o valor -9999. Esta abordagem assegura que a qualidade dos dados não comprometerá a precisão e a robustez dos nossos modelos preditivos.

In [40]:
# drop da coluna GlobalRadiationKjM2 por ter muitos missings
df_central_west = df_central_west.drop("GlobalRadiationKjM2")

In [47]:
# remover linhas com valor -9999 em qualquer coluna
df_central_west = df_central_west.filter(~col("TotalHourlyPrecipitationMm").eqNullSafe(-9999) &
                                                  ~col("HourlyStationLevelAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMinAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("HourlyDryBulbAirTemperatureC").eqNullSafe(-9999) &
                                                  ~col("DewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("LastHourMinRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyWindDirectionRadiusDegrees").eqNullSafe(-9999) &
                                                  ~col("MaximumWindGustMs").eqNullSafe(-9999) &
                                                  ~col("HourlyWindSpeedMs").eqNullSafe(-9999))

In [48]:
df_central_west.printSchema()
df_central_west.show()
df_central_west.count()

root
 |-- Index: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Hour: string (nullable = true)
 |-- TotalHourlyPrecipitationMm: double (nullable = true)
 |-- HourlyStationLevelAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMaxAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMinAtmosphericPressureMb: double (nullable = true)
 |-- HourlyDryBulbAirTemperatureC: double (nullable = true)
 |-- DewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxTemperatureC: double (nullable = true)
 |-- LastHourMinTemperatureC: double (nullable = true)
 |-- LastHourMaxDewPointTemperatureC: double (nullable = true)
 |-- LastHourMinDewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxRelativeHumidityPercentage: integer (nullable = true)
 |-- LastHourMinRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyWindDirectionRadiusDegrees: integer (nullable = true)
 |-- Maximum

8888790

O número de instâncias reduziu de 11427120 para 8888790 após a eliminação das observações com o valor -9999.

In [49]:
# summary stats após transformações
cols_num_2 = [
 'TotalHourlyPrecipitationMm',
 'HourlyStationLevelAtmosphericPressureMb',
 'LastHourMaxAtmosphericPressureMb',
 'LastHourMinAtmosphericPressureMb',
 'HourlyDryBulbAirTemperatureC',
 'DewPointTemperatureC',
 'LastHourMaxTemperatureC',
 'LastHourMinTemperatureC',
 'LastHourMaxDewPointTemperatureC',
 'LastHourMinDewPointTemperatureC',
 'LastHourMaxRelativeHumidityPercentage',
 'LastHourMinRelativeHumidityPercentage',
 'HourlyRelativeHumidityPercentage',
 'HourlyWindDirectionRadiusDegrees',
 'MaximumWindGustMs',
 'HourlyWindSpeedMs',
 'Height']
summary_stats = df_central_west.select(cols_num_2).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,8888790,0.1558783366464602,1.3538619632991884,0.0,0.0,0.0,0.0,96.0
HourlyStationLevelAtmosphericPressureMb,8888790,954.4505846352528,29.181205537167283,855.7,934.6,960.2,976.1,1028.3
LastHourMaxAtmosphericPressureMb,8888790,954.7528316452548,29.197534953225965,865.3,934.9,960.5,976.4,1029.2
LastHourMinAtmosphericPressureMb,8888790,954.1539553190022,29.164794959954367,855.4,934.3,959.9,975.8,1028.1
HourlyDryBulbAirTemperatureC,8888790,24.150351510160405,5.025169092192316,-2.4,21.0,23.7,27.6,44.7
DewPointTemperatureC,8888790,17.05682287465459,4.8678836396229,-10.0,14.1,18.4,20.8,42.7
LastHourMaxTemperatureC,8888790,24.875164414953872,5.159634604214579,-2.0,21.5,24.4,28.6,45.0
LastHourMinTemperatureC,8888790,23.45057701891924,4.874272823640853,-9.5,20.5,23.1,26.6,43.6
LastHourMaxDewPointTemperatureC,8888790,17.635270942389038,4.809922302073821,-9.1,14.7,18.9,21.3,43.0
LastHourMinDewPointTemperatureC,8888790,16.503919667356143,4.936963214102215,-10.0,13.5,17.8,20.3,38.5


#### Tratamento do North

In [26]:
# Schema, show e count do DataFrame de North

df_north.printSchema()
df_north.show(5, truncate=False) 
df_north.count()

root
 |-- index: integer (nullable = true)
 |-- Data: date (nullable = true)
 |-- Hora: timestamp (nullable = true)
 |-- PRECIPITAÇÃO TOTAL, HORÁRIO (mm): double (nullable = true)
 |-- PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- RADIACAO GLOBAL (Kj/m²): integer (nullable = true)
 |-- TEMPERATURA DO AR - BULBO SECO, HORARIA (°C): double (nullable = true)
 |-- TEMPERATURA DO PONTO DE ORVALHO (°C): double (nullable = true)
 |-- TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- UMIDADE REL. MAX. NA HORA ANT. (AUT) (%): integer (nullable = t

8392320

Estes comandos são usados para exibir a estrutura, as primeiras 5 linhas e o total de linhas do dataframe, proporcionando uma visão rápida da estrutura, dos dados iniciais e da quantidade de linhas do dataframe.

In [27]:
# renomear as colunas
df_north = df_north.toDF(*new_column_names)

# corrigir o data type de hora que ao ser importado para o Spark adiciona a data 
df_north = df_north.withColumn("Hour", date_format("Hour", "HH:mm"))

Foram renomeadas as colunas e foi ajustado o tipo de dados da coluna "Hora", removendo a parte da data que foi adicionada durante a importação para o Spark e mantendo apenas o formato de hora no estilo "HH:mm".

In [21]:
cols_to_check = [col for col in df_north.columns if col not in ["Index", "Date", "Hour"]]
df_north_nulls_uniques = compute_nulls_and_uniques(df_north, cols_to_check)

Este chunk de código cria uma lista de colunas `cols_to_check`, excluindo as colunas "Index", "Date" e "Hour", e em seguida, chama a função `compute_nulls_and_uniques` para calcular valores nulos e únicos nas colunas selecionadas.

In [22]:
df_north_nulls_uniques.show(n=30)

+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|             feature|count_nulls|percentage_nulls|count_nans|percentage_nans|count_uniques|  percentage_uniques|
+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|TotalHourlyPrecip...|          0|             0.0|         0|            0.0|          386|4.599443300541448E-5|
|HourlyStationLeve...|          0|             0.0|         0|            0.0|         1489|1.774241211011973E-4|
|LastHourMaxAtmosp...|          0|             0.0|         0|            0.0|         1494|1.780199039121482...|
|LastHourMinAtmosp...|          0|             0.0|         0|            0.0|         1501|1.788539998474796E-4|
| GlobalRadiationKjM2|          0|             0.0|         0|            0.0|        23702|0.002824248837031953|
|HourlyDryBulbAirT...|          0|             0.0|         0|            0.0|          

Exibe as primeiras 30 linhas do dataframe `df_north_nulls_uniques`, fornecendo informações sobre valores nulos e únicos para as colunas selecionadas.

In [23]:
# verificar as instâncias duplicadas
df_north.dropDuplicates().count()

8392320

Uma vez que este valor se manteve igual, não foram encontradas instâncias duplicadas.


In [28]:
# summary stats para verificar outliers
summary_stats = df_north.select(cols_num).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,8392320,-2368.6857935585504,4251.53525509746,-9999.0,0.0,0.0,0.0,97.2
HourlyStationLevelAtmosphericPressureMb,8392320,-1091.1952161738325,4308.732662564336,-9999.0,974.4,991.5,1004.0,1050.0
LastHourMaxAtmosphericPressureMb,8392320,-1099.6736151624286,4315.760881254735,-9999.0,974.5,991.7,1004.3,1049.8
LastHourMinAtmosphericPressureMb,8392320,-1100.1618348204067,4315.489035326974,-9999.0,973.8,991.1,1003.7,1050.0
GlobalRadiationKjM2,8392320,-4753.662215215816,5871.869395941868,-9999.0,-9999.0,-9999.0,953.0,45305.0
HourlyDryBulbAirTemperatureC,8392320,-1826.681974162092,3891.5006837930287,-9999.0,22.1,24.7,28.1,42.2
DewPointTemperatureC,8392320,-2016.0256888798265,4032.835306610167,-9999.0,14.9,21.6,22.9,43.5
LastHourMaxTemperatureC,8392320,-1834.73801164637,3898.670436457656,-9999.0,22.4,25.1,29.0,45.0
LastHourMinTemperatureC,8392320,-1835.5571699839809,3898.0808105383735,-9999.0,21.8,24.3,27.3,45.0
LastHourMaxDewPointTemperatureC,8392320,-2024.6788539045183,4039.704257783332,-9999.0,15.5,22.0,23.3,44.4


Aqui são calculadas as *summary stats* para as colunas numéricas selecionadas, com o objetivo de verificar a presença de outliers.

In [29]:
# Inicializa uma lista para armazenar os resultados
resultados = []

# Itera sobre todas as colunas e conta as ocorrências de -9999
for coluna in cols_num:
    contagem = df_north.select(spark_sum(when(col(coluna) == -9999, 1).otherwise(0)).alias(coluna)).collect()[0][coluna]
    resultados.append((coluna, contagem))

# Cria um DataFrame a partir dos resultados
resultado_df = spark.createDataFrame(resultados, ["Coluna", "Contagem_de_-9999"])

# Mostra o DataFrame de resultados
resultado_df.show()

+--------------------+-----------------+
|              Coluna|Contagem_de_-9999|
+--------------------+-----------------+
|TotalHourlyPrecip...|          1988221|
|HourlyStationLeve...|          1591222|
|LastHourMaxAtmosp...|          1597892|
|LastHourMinAtmosp...|          1597871|
| GlobalRadiationKjM2|          4540804|
|HourlyDryBulbAirT...|          1551208|
|DewPointTemperatureC|          1706311|
|LastHourMaxTemper...|          1558370|
|LastHourMinTemper...|          1558241|
|LastHourMaxDewPoi...|          1713899|
|LastHourMinDewPoi...|          1715709|
|LastHourMaxRelati...|          1707478|
|LastHourMinRelati...|          1711836|
|HourlyRelativeHum...|          1706495|
|HourlyWindDirecti...|          1847536|
|   MaximumWindGustMs|          1810415|
|   HourlyWindSpeedMs|          1797836|
|              Height|           273960|
+--------------------+-----------------+



Verificámos que havia muitas observações com o valor -9999, que foram usadas sobre os valores omissos. No entanto, para garantir que esses valores não influenciem negativamente a estrutura e o padrão da nossa regressão, decidimos removê-los. No nosso caso, vamos proceder à eliminação da coluna `GlobalRadiation` e, nas outras colunas, vamos eliminar todas as observações que contêm o valor -9999. Esta abordagem assegura que a qualidade dos dados não comprometerá a precisão e a robustez dos nossos modelos preditivos.

In [51]:
# drop da coluna GlobalRadiationKjM2 por ter muitos missings
df_north = df_north.drop("GlobalRadiationKjM2")

In [55]:
# remover linhas com valor -9999 em qualquer coluna
df_north = df_north.filter(~col("TotalHourlyPrecipitationMm").eqNullSafe(-9999) &
                                                  ~col("HourlyStationLevelAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMinAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("HourlyDryBulbAirTemperatureC").eqNullSafe(-9999) &
                                                  ~col("DewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("LastHourMinRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyWindDirectionRadiusDegrees").eqNullSafe(-9999) &
                                                  ~col("MaximumWindGustMs").eqNullSafe(-9999) &
                                                  ~col("HourlyWindSpeedMs").eqNullSafe(-9999) &
                                                  ~col("Height").eqNullSafe(-9999))

In [56]:
df_north.printSchema()
df_north.show()
df_north.count()

root
 |-- Index: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Hour: string (nullable = true)
 |-- TotalHourlyPrecipitationMm: double (nullable = true)
 |-- HourlyStationLevelAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMaxAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMinAtmosphericPressureMb: double (nullable = true)
 |-- HourlyDryBulbAirTemperatureC: double (nullable = true)
 |-- DewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxTemperatureC: double (nullable = true)
 |-- LastHourMinTemperatureC: double (nullable = true)
 |-- LastHourMaxDewPointTemperatureC: double (nullable = true)
 |-- LastHourMinDewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxRelativeHumidityPercentage: integer (nullable = true)
 |-- LastHourMinRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyWindDirectionRadiusDegrees: integer (nullable = true)
 |-- Maximum

5864479

O número de instâncias reduziu de 8392320 para 5864479 após a eliminação das observações com o valor -9999.

In [57]:
# summary stats após transformações
summary_stats = df_north.select(cols_num_2).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,5864479,0.2259871337248828,1.7118436663346557,0.0,0.0,0.0,0.0,97.2
HourlyStationLevelAtmosphericPressureMb,5864479,992.990236012442,17.80273933444465,887.2,985.9,997.5,1005.6,1049.2
LastHourMaxAtmosphericPressureMb,5864479,993.3103363146214,17.8098707677178,904.9,986.3,997.9,1005.9,1049.8
LastHourMinAtmosphericPressureMb,5864479,992.6755037915613,17.793643229124495,839.5,985.6,997.2,1005.3,1048.0
HourlyDryBulbAirTemperatureC,5864479,26.41689324831757,3.689271322863084,6.0,23.7,25.7,29.0,42.2
DewPointTemperatureC,5864479,21.270238055247628,3.236428171335968,-10.0,20.7,22.2,23.1,37.2
LastHourMaxTemperatureC,5864479,27.034341123908888,3.884501086326331,6.2,24.1,26.3,30.0,45.0
LastHourMinTemperatureC,5864479,25.834302160515904,3.479671628972961,-5.6,23.4,25.2,28.1,41.3
LastHourMaxDewPointTemperatureC,5864479,21.784918268101794,3.171992041502342,-9.8,21.2,22.6,23.6,39.7
LastHourMinDewPointTemperatureC,5864479,20.786230371018547,3.3164380388523367,-10.0,20.2,21.8,22.7,33.9


### Tratamento do Northeast

In [30]:
# Schema, show e count do DataFrame de Northeast

df_northeast.printSchema()
df_northeast.show(5, truncate=False) 
df_northeast.count()

root
 |-- index: integer (nullable = true)
 |-- Data: date (nullable = true)
 |-- Hora: timestamp (nullable = true)
 |-- PRECIPITAÇÃO TOTAL, HORÁRIO (mm): double (nullable = true)
 |-- PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB): integer (nullable = true)
 |-- PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- RADIACAO GLOBAL (Kj/m²): integer (nullable = true)
 |-- TEMPERATURA DO AR - BULBO SECO, HORARIA (°C): double (nullable = true)
 |-- TEMPERATURA DO PONTO DE ORVALHO (°C): double (nullable = true)
 |-- TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- UMIDADE REL. MAX. NA HORA ANT. (AUT) (%): integer (nullable = 

16260936

Estes comandos são usados para exibir a estrutura, as primeiras 5 linhas e o total de linhas do dataframe, proporcionando uma visão rápida da estrutura, dos dados iniciais e da quantidade de linhas do dataframe.

In [31]:
# renomear as colunas
df_northeast = df_northeast.toDF(*new_column_names)

# corrigir o data type de hora que ao ser importado para o Spark adiciona a data 
df_northeast = df_northeast.withColumn("Hour", date_format("Hour", "HH:mm"))

Foram renomeadas as colunas e foi ajustado o tipo de dados da coluna "Hora", removendo a parte da data que foi adicionada durante a importação para o Spark e mantendo apenas o formato de hora no estilo "HH:mm".

In [27]:
cols_to_check = [col for col in df_northeast.columns if col not in ["Index", "Date", "Hour"]]
df_northeast_nulls_uniques = compute_nulls_and_uniques(df_northeast, cols_to_check)

Este chunk de código cria uma lista de colunas `cols_to_check`, excluindo as colunas "Index", "Date" e "Hour" do dataframe `df_northeast`, e em seguida, chama a função `compute_nulls_and_uniques` para calcular valores nulos e únicos nas colunas selecionadas.

In [28]:
df_northeast_nulls_uniques.show(n=30)

+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|             feature|count_nulls|percentage_nulls|count_nans|percentage_nans|count_uniques|  percentage_uniques|
+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|TotalHourlyPrecip...|          0|             0.0|         0|            0.0|          375| 2.30614031074226E-5|
|HourlyStationLeve...|          0|             0.0|         0|            0.0|          169|1.039300566707845E-5|
|LastHourMaxAtmosp...|          0|             0.0|         0|            0.0|         1605|9.870280529976873E-5|
|LastHourMinAtmosp...|          0|             0.0|         0|            0.0|         1613|9.919478189939374E-5|
| GlobalRadiationKjM2|          0|             0.0|         0|            0.0|        42475|0.002612088258634...|
|HourlyDryBulbAirT...|          0|             0.0|         0|            0.0|          

Exibe as primeiras 30 linhas do dataframe `df_northeast_nulls_uniques`, fornecendo informações sobre valores nulos e únicos para as colunas selecionadas.

In [29]:
# verificar as instâncias duplicadas
df_northeast.dropDuplicates().count()

16260936

Uma vez que este valor se manteve igual, não foram encontradas instâncias duplicadas.

In [32]:
# summary stats para verificar outliers
summary_stats = df_northeast.select(cols_num).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,16260936,-1566.483971562102,3634.595764188936,-9999.0,0.0,0.0,0.0,97.6
HourlyStationLevelAtmosphericPressureMb,16260936,-266.7612432027283,3484.407567701511,-9999.0,956.0,980.0,1000.0,1050.0
LastHourMaxAtmosphericPressureMb,16260936,-271.93856585499816,3491.025856150667,-9999.0,956.1,979.7,1000.6,1050.0
LastHourMinAtmosphericPressureMb,16260936,-272.3187664227925,3490.7173440381625,-9999.0,955.5,979.2,1000.0,1050.0
GlobalRadiationKjM2,16260936,-4245.189333381547,6251.677538169608,-9999.0,-9999.0,-9999.0,1360.0,51588.0
HourlyDryBulbAirTemperatureC,16260936,-1110.7853815487615,3178.5290798786423,-9999.0,21.7,24.8,28.3,45.0
DewPointTemperatureC,16260936,-1286.8160757474195,3372.4417909626536,-9999.0,14.4,18.7,21.1,45.0
LastHourMaxTemperatureC,16260936,-1108.9471762080525,3177.1899589644763,-9999.0,22.1,25.4,29.1,45.0
LastHourMinTemperatureC,16260936,-1108.4590956449206,3174.831241616291,-9999.0,21.3,24.3,27.5,45.0
LastHourMaxDewPointTemperatureC,16260936,-1283.3920284232072,3369.330955444254,-9999.0,15.1,19.2,21.6,45.0


Aqui são calculadas as *summary stats* para as colunas numéricas, com o objetivo de verificar a presença de outliers.


In [33]:
# Inicializa uma lista para armazenar os resultados
resultados = []

# Itera sobre todas as colunas e conta as ocorrências de -9999
for coluna in cols_num:
    contagem = df_northeast.select(spark_sum(when(col(coluna) == -9999, 1).otherwise(0)).alias(coluna)).collect()[0][coluna]
    resultados.append((coluna, contagem))

# Cria um DataFrame a partir dos resultados
resultado_df = spark.createDataFrame(resultados, ["Coluna", "Contagem_de_-9999"])

# Mostra o DataFrame de resultados
resultado_df.show()

+--------------------+-----------------+
|              Coluna|Contagem_de_-9999|
+--------------------+-----------------+
|TotalHourlyPrecip...|          2547648|
|HourlyStationLeve...|          1847478|
|LastHourMaxAtmosp...|          1855451|
|LastHourMinAtmosp...|          1855289|
| GlobalRadiationKjM2|          8305221|
|HourlyDryBulbAirT...|          1843754|
|DewPointTemperatureC|          2119052|
|LastHourMaxTemper...|          1841701|
|LastHourMinTemper...|          1839098|
|LastHourMaxDewPoi...|          2114207|
|LastHourMinDewPoi...|          2123267|
|LastHourMaxRelati...|          2114644|
|LastHourMinRelati...|          2128352|
|HourlyRelativeHum...|          2112360|
|HourlyWindDirecti...|          2270606|
|   MaximumWindGustMs|          2200138|
|   HourlyWindSpeedMs|          2201550|
|              Height|            18264|
+--------------------+-----------------+



Verificámos que havia muitas observações com o valor -9999, que foram usadas sobre os valores omissos. No entanto, para garantir que esses valores não influenciem negativamente a estrutura e o padrão da nossa regressão, decidimos removê-los. No nosso caso, vamos proceder à eliminação da coluna `GlobalRadiation` e, nas outras colunas, vamos eliminar todas as observações que contêm o valor -9999. Esta abordagem assegura que a qualidade dos dados não comprometerá a precisão e a robustez dos nossos modelos preditivos.

In [58]:
# drop da coluna GlobalRadiationKjM2 por ter muitos missings
df_northeast = df_northeast.drop("GlobalRadiationKjM2")

In [59]:
# remover linhas com valor -9999 em qualquer coluna
df_northeast = df_northeast.filter(~col("TotalHourlyPrecipitationMm").eqNullSafe(-9999) &
                                                  ~col("HourlyStationLevelAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMinAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("HourlyDryBulbAirTemperatureC").eqNullSafe(-9999) &
                                                  ~col("DewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("LastHourMinRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyWindDirectionRadiusDegrees").eqNullSafe(-9999) &
                                                  ~col("MaximumWindGustMs").eqNullSafe(-9999) &
                                                  ~col("HourlyWindSpeedMs").eqNullSafe(-9999) &
                                                  ~col("Height").eqNullSafe(-9999))

In [60]:
df_northeast.printSchema()
df_northeast.show()
df_northeast.count()

root
 |-- Index: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Hour: string (nullable = true)
 |-- TotalHourlyPrecipitationMm: double (nullable = true)
 |-- HourlyStationLevelAtmosphericPressureMb: integer (nullable = true)
 |-- LastHourMaxAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMinAtmosphericPressureMb: double (nullable = true)
 |-- HourlyDryBulbAirTemperatureC: double (nullable = true)
 |-- DewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxTemperatureC: double (nullable = true)
 |-- LastHourMinTemperatureC: double (nullable = true)
 |-- LastHourMaxDewPointTemperatureC: double (nullable = true)
 |-- LastHourMinDewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxRelativeHumidityPercentage: integer (nullable = true)
 |-- LastHourMinRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyWindDirectionRadiusDegrees: integer (nullable = true)
 |-- Maximu

12934432

O número de instâncias reduziu de 16260936 para 12934432 após a eliminação das observações com o valor -9999.

In [61]:
# summary stats após transformações
summary_stats = df_northeast.select(cols_num_2).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,12934432,0.1006954770027583,0.9943471991509556,0.0,0.0,0.0,0.0,97.6
HourlyStationLevelAtmosphericPressureMb,12934432,979.8653690397848,26.114924718914704,865.0,965.0,983.0,1002.0,1050.0
LastHourMaxAtmosphericPressureMb,12934432,980.0964460905658,26.11880519769385,864.6,964.8,983.4,1002.2,1050.0
LastHourMinAtmosphericPressureMb,12934432,979.541730537534,26.108905719377745,864.5,964.2,982.8,1001.6,1049.6
HourlyDryBulbAirTemperatureC,12934432,25.912948059875884,4.287781801370853,0.2,22.9,25.5,28.8,45.0
DewPointTemperatureC,12934432,18.552767558714606,3.794941641025548,-10.0,16.5,19.3,21.3,44.7
LastHourMaxTemperatureC,12934432,26.566760364892566,4.4587584206909465,8.7,23.4,26.2,29.7,45.0
LastHourMinTemperatureC,12934432,25.295624523751734,4.107442798547635,-0.2,22.5,25.0,28.0,44.9
LastHourMaxDewPointTemperatureC,12934432,19.067260131716587,3.7218634155189942,-9.8,17.0,19.8,21.8,44.7
LastHourMinDewPointTemperatureC,12934432,18.05691530946243,3.880074513594841,-10.0,15.9,18.8,20.9,44.6


### Tratamento de South

In [34]:
# Schema, show e count do DataFrame de South

df_south.printSchema()
df_south.show(5, truncate=False) 
df_south.count()

root
 |-- index: integer (nullable = true)
 |-- Data: date (nullable = true)
 |-- Hora: timestamp (nullable = true)
 |-- PRECIPITAÇÃO TOTAL, HORÁRIO (mm): double (nullable = true)
 |-- PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- RADIACAO GLOBAL (Kj/m²): integer (nullable = true)
 |-- TEMPERATURA DO AR - BULBO SECO, HORARIA (°C): double (nullable = true)
 |-- TEMPERATURA DO PONTO DE ORVALHO (°C): double (nullable = true)
 |-- TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- UMIDADE REL. MAX. NA HORA ANT. (AUT) (%): integer (nullable = t

10284888

Estes comandos são usados para exibir a estrutura, as primeiras 5 linhas e o total de linhas do dataframe, proporcionando uma visão rápida da estrutura, dos dados iniciais e da quantidade de linhas do dataframe.

In [35]:
# renomear as colunas
df_south = df_south.toDF(*new_column_names)

# corrigir o data type de hora que ao ser importado para o Spark adiciona a data 
df_south = df_south.withColumn("Hour", date_format("Hour", "HH:mm"))

Foram renomeadas as colunas e foi ajustado o tipo de dados da coluna "Hora", removendo a parte da data que foi adicionada durante a importação para o Spark e mantendo apenas o formato de hora no estilo "HH:mm".

In [33]:
cols_to_check = [col for col in df_south.columns if col not in ["Index", "Date", "Hour"]]
df_south_nulls_uniques = compute_nulls_and_uniques(df_south, cols_to_check)

Este chunk de código cria uma lista de colunas `cols_to_check`, excluindo as colunas "Index", "Date" e "Hour", e em seguida, chama a função `compute_nulls_and_uniques` para calcular valores nulos e únicos nas colunas selecionadas.

In [34]:
df_south_nulls_uniques.show(n=30)

+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|             feature|count_nulls|percentage_nulls|count_nans|percentage_nans|count_uniques|  percentage_uniques|
+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|TotalHourlyPrecip...|          0|             0.0|         0|            0.0|          332|3.228037096757884E-5|
|HourlyStationLeve...|          0|             0.0|         0|            0.0|         2249|2.186703442954361...|
|LastHourMaxAtmosp...|          0|             0.0|         0|            0.0|         2275|2.211983251543429...|
|LastHourMinAtmosp...|          0|             0.0|         0|            0.0|         2269|2.206149449561336...|
| GlobalRadiationKjM2|          0|             0.0|         0|            0.0|         5434|5.283479995115163E-4|
|HourlyDryBulbAirT...|          0|             0.0|         0|            0.0|          

Exibe as primeiras 30 linhas do dataframe `df_south_nulls_uniques`, fornecendo informações sobre valores nulos e únicos para as colunas selecionadas.

In [35]:
# verificar as instâncias duplicadas
df_south.dropDuplicates().count()

10284888

Uma vez que este valor se manteve igual, não foram encontradas instâncias duplicadas.

In [36]:
# summary stats para verificar outliers
summary_stats = df_south.select(cols_num).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,10284888,-1003.1757941943388,3004.349113108417,-9999.0,0.0,0.0,0.0,94.4
HourlyStationLevelAtmosphericPressureMb,10284888,136.11775491380934,2891.0813475500845,-9999.0,918.2,958.5,1001.2,1049.9
LastHourMaxAtmosphericPressureMb,10284888,131.34381513925746,2899.225736685094,-9999.0,918.4,958.7,1001.5,1049.9
LastHourMinAtmosphericPressureMb,10284888,130.85825540346087,2899.0481747071203,-9999.0,917.9,958.2,1000.9,1049.5
GlobalRadiationKjM2,10284888,-4277.233784947391,5682.026341777482,-9999.0,-9999.0,0.0,1014.0,8259.0
HourlyDryBulbAirTemperatureC,10284888,-741.401004707103,2653.1419806747035,-9999.0,13.9,18.5,22.6,44.4
DewPointTemperatureC,10284888,-922.4772457901424,2915.8609065892,-9999.0,9.7,14.2,17.6,43.5
LastHourMaxTemperatureC,10284888,-742.3154842814149,2655.65014467017,-9999.0,14.3,19.0,23.2,44.7
LastHourMinTemperatureC,10284888,-743.5768328833522,2655.6239937520822,-9999.0,13.4,18.0,21.9,44.9
LastHourMaxDewPointTemperatureC,10284888,-923.1730407856724,2917.66445792524,-9999.0,10.3,14.7,18.1,43.5


Aqui são calculadas as *summary stats* para as colunas numéricas, com o objetivo de verificar a presença de outliers.


In [37]:
# Inicializa uma lista para armazenar os resultados
resultados = []

# Itera sobre todas as colunas e conta as ocorrências de -9999
for coluna in cols_num:
    contagem = df_south.select(spark_sum(when(col(coluna) == -9999, 1).otherwise(0)).alias(coluna)).collect()[0][coluna]
    resultados.append((coluna, contagem))

# Cria um DataFrame a partir dos resultados
resultado_df = spark.createDataFrame(resultados, ["Coluna", "Contagem_de_-9999"])

# Mostra o DataFrame de resultados
resultado_df.show()

+--------------------+-----------------+
|              Coluna|Contagem_de_-9999|
+--------------------+-----------------+
|TotalHourlyPrecip...|          1032035|
|HourlyStationLeve...|           773754|
|LastHourMaxAtmosp...|           778468|
|LastHourMinAtmosp...|           778449|
| GlobalRadiationKjM2|          5055598|
|HourlyDryBulbAirT...|           780622|
|DewPointTemperatureC|           962138|
|LastHourMaxTemper...|           782129|
|LastHourMinTemper...|           782312|
|LastHourMaxDewPoi...|           963351|
|LastHourMinDewPoi...|           971713|
|LastHourMaxRelati...|           963029|
|LastHourMinRelati...|           981391|
|HourlyRelativeHum...|           972192|
|HourlyWindDirecti...|           982663|
|   MaximumWindGustMs|           968521|
|   HourlyWindSpeedMs|           964049|
|              Height|                0|
+--------------------+-----------------+



Verificámos que havia muitas observações com o valor -9999, que foram usadas sobre os valores omissos. No entanto, para garantir que esses valores não influenciem negativamente a estrutura e o padrão da nossa regressão, decidimos removê-los. No nosso caso, vamos proceder à eliminação da coluna `GlobalRadiation` e, nas outras colunas, vamos eliminar todas as observações que contêm o valor -9999. Esta abordagem assegura que a qualidade dos dados não comprometerá a precisão e a robustez dos nossos modelos preditivos.

In [62]:
# drop da coluna GlobalRadiationKjM2 por ter muitos missings
df_south = df_south.drop("GlobalRadiationKjM2")

In [63]:
# remover linhas com valor -9999 em qualquer coluna
df_south = df_south.filter(~col("TotalHourlyPrecipitationMm").eqNullSafe(-9999) &
                                                  ~col("HourlyStationLevelAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMinAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("HourlyDryBulbAirTemperatureC").eqNullSafe(-9999) &
                                                  ~col("DewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("LastHourMinRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyWindDirectionRadiusDegrees").eqNullSafe(-9999) &
                                                  ~col("MaximumWindGustMs").eqNullSafe(-9999) &
                                                  ~col("HourlyWindSpeedMs").eqNullSafe(-9999))

In [64]:
df_south.printSchema()
df_south.show()
df_south.count()

root
 |-- Index: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Hour: string (nullable = true)
 |-- TotalHourlyPrecipitationMm: double (nullable = true)
 |-- HourlyStationLevelAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMaxAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMinAtmosphericPressureMb: double (nullable = true)
 |-- HourlyDryBulbAirTemperatureC: double (nullable = true)
 |-- DewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxTemperatureC: double (nullable = true)
 |-- LastHourMinTemperatureC: double (nullable = true)
 |-- LastHourMaxDewPointTemperatureC: double (nullable = true)
 |-- LastHourMinDewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxRelativeHumidityPercentage: integer (nullable = true)
 |-- LastHourMinRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyWindDirectionRadiusDegrees: integer (nullable = true)
 |-- Maximum

8796918

O número de instâncias reduziu de 10284888 para 8796918 após a eliminação das observações com o valor -9999.

In [65]:
# summary stats após transformações
summary_stats = df_south.select(cols_num_2).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,8796918,0.1866027624674363,1.247617554987454,0.0,0.0,0.0,0.0,94.4
HourlyStationLevelAtmosphericPressureMb,8796918,961.0452742540124,42.82285323426238,803.9,926.7,963.4,1002.5,1049.9
LastHourMaxAtmosphericPressureMb,8796918,961.3195431286294,42.8362673886135,804.5,926.9,963.7,1002.8,1049.9
LastHourMinAtmosphericPressureMb,8796918,960.7721982176078,42.810302553026176,803.1,926.4,963.1,1002.2,1048.0
HourlyDryBulbAirTemperatureC,8796918,19.027324638015177,5.932462471913904,-7.7,15.2,19.2,23.0,44.2
DewPointTemperatureC,8796918,14.31168588817134,4.972600679615887,-27.5,11.3,14.9,18.0,38.1
LastHourMaxTemperatureC,8796918,19.632101788376204,6.046407248221974,-7.6,15.7,19.7,23.7,44.7
LastHourMinTemperatureC,8796918,18.45229506515794,5.81633607203578,-7.8,14.8,18.7,22.3,41.5
LastHourMaxDewPointTemperatureC,8796918,14.85478681283606,4.922473779220011,-25.1,11.9,15.4,18.5,38.1
LastHourMinDewPointTemperatureC,8796918,13.796836767149498,5.049267278314168,-29.8,10.8,14.4,17.5,31.7


### Tratamento de Southeast

In [37]:
# Schema, show e count do DataFrame de Southeast

df_southeast.printSchema()
df_southeast.show(5, truncate=False) 
df_southeast.count()

root
 |-- index: integer (nullable = true)
 |-- Data: date (nullable = true)
 |-- Hora: timestamp (nullable = true)
 |-- PRECIPITAÇÃO TOTAL, HORÁRIO (mm): double (nullable = true)
 |-- PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB): double (nullable = true)
 |-- RADIACAO GLOBAL (Kj/m²): integer (nullable = true)
 |-- TEMPERATURA DO AR - BULBO SECO, HORARIA (°C): double (nullable = true)
 |-- TEMPERATURA DO PONTO DE ORVALHO (°C): double (nullable = true)
 |-- TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C): double (nullable = true)
 |-- UMIDADE REL. MAX. NA HORA ANT. (AUT) (%): integer (nullable = t

15345216

Estes comandos são usados para exibir a estrutura, as primeiras 5 linhas e o total de linhas do dataframe, proporcionando uma visão rápida da estrutura, dos dados iniciais e da quantidade de linhas do dataframe.

In [38]:
# renomear as colunas
df_southeast = df_southeast.toDF(*new_column_names)

# corrigir o data type de hora que ao ser importado para o Spark adiciona a data 
df_southeast = df_southeast.withColumn("Hour", date_format("Hour", "HH:mm"))

Foram renomeadas as colunas e foi ajustado o tipo de dados da coluna "Hora", removendo a parte da data que foi adicionada durante a importação para o Spark e mantendo apenas o formato de hora no estilo "HH:mm".

In [39]:
cols_to_check = [col for col in df_southeast.columns if col not in ["Index", "Date", "Hour"]]
df_southeast_nulls_uniques = compute_nulls_and_uniques(df_southeast, cols_to_check)


Este chunk de código cria uma lista de colunas `cols_to_check`, excluindo as colunas "Index", "Date" e "Hour", e em seguida, chama a função `compute_nulls_and_uniques` para calcular valores nulos e únicos nas colunas selecionadas.


In [40]:
df_southeast_nulls_uniques.show(n=30)

+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|             feature|count_nulls|percentage_nulls|count_nans|percentage_nans|count_uniques|  percentage_uniques|
+--------------------+-----------+----------------+----------+---------------+-------------+--------------------+
|TotalHourlyPrecip...|          0|             0.0|         0|            0.0|          386|2.515441946206557...|
|HourlyStationLeve...|          0|             0.0|         0|            0.0|         3251|2.118575587336144...|
|LastHourMaxAtmosp...|          0|             0.0|         0|            0.0|         3248|2.116620580642201...|
|LastHourMinAtmosp...|          0|             0.0|         0|            0.0|         3236|2.108800553866429...|
| GlobalRadiationKjM2|          0|             0.0|         0|            0.0|        33430|0.002178529125950394|
|HourlyDryBulbAirT...|          0|             0.0|         0|            0.0|          


Exibe as primeiras 30 linhas do dataframe `df_southeast_uniques`, fornecendo informações sobre valores nulos e únicos para as colunas selecionadas.


In [41]:
df_southeast.dropDuplicates().count()

15345216


Uma vez que este valor se manteve igual, não foram encontradas instâncias duplicadas.


In [42]:
# summary stats para verificar outliers
summary_stats = df_southeast.select(cols_num).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,15345216,-862.9100386074799,2808.022521392235,-9999.0,0.0,0.0,0.0,100.0
HourlyStationLevelAtmosphericPressureMb,15345216,153.35224378073508,2841.400819412801,-9999.0,911.7,944.0,971.9,1050.0
LastHourMaxAtmosphericPressureMb,15345216,147.8214425264539,2850.938764422795,-9999.0,911.9,944.2,972.1,1050.0
LastHourMinAtmosphericPressureMb,15345216,147.38702663422904,2850.7293383761826,-9999.0,911.4,943.7,971.5,1050.0
GlobalRadiationKjM2,15345216,-3847.645457385546,5917.597607164481,-9999.0,-9999.0,1.0,1245.0,51279.0
HourlyDryBulbAirTemperatureC,15345216,-651.4797423900707,2509.008008323635,-9999.0,17.8,21.5,25.0,44.9
DewPointTemperatureC,15345216,-778.0925969761441,2705.903320766113,-9999.0,12.1,16.2,18.9,42.7
LastHourMaxTemperatureC,15345216,-655.5416456894435,2517.2881262770165,-9999.0,18.3,22.0,25.9,45.0
LastHourMinTemperatureC,15345216,-656.7468354045982,2516.8849129417736,-9999.0,17.3,20.9,24.3,45.0
LastHourMaxDewPointTemperatureC,15345216,-783.0376560160536,2714.538036114629,-9999.0,12.7,16.7,19.5,44.8



Aqui são calculadas as *summary stats* para as colunas numéricas, com o objetivo de verificar a presença de outliers.


In [39]:
# Inicializa uma lista para armazenar os resultados
resultados = []

# Itera sobre todas as colunas e conta as ocorrências de -9999
for coluna in cols_num:
    contagem = df_southeast.select(spark_sum(when(col(coluna) == -9999, 1).otherwise(0)).alias(coluna)).collect()[0][coluna]
    resultados.append((coluna, contagem))

# Cria um DataFrame a partir dos resultados
resultado_df = spark.createDataFrame(resultados, ["Coluna", "Contagem_de_-9999"])

# Mostra o DataFrame de resultados
resultado_df.show()

+--------------------+-----------------+
|              Coluna|Contagem_de_-9999|
+--------------------+-----------------+
|TotalHourlyPrecip...|          1324496|
|HourlyStationLeve...|          1114464|
|LastHourMaxAtmosp...|          1122546|
|LastHourMinAtmosp...|          1122482|
| GlobalRadiationKjM2|          7083207|
|HourlyDryBulbAirT...|          1031262|
|DewPointTemperatureC|          1216672|
|LastHourMaxTemper...|          1038460|
|LastHourMinTemper...|          1038400|
|LastHourMaxDewPoi...|          1225038|
|LastHourMinDewPoi...|          1237026|
|LastHourMaxRelati...|          1190074|
|LastHourMinRelati...|          1201616|
|HourlyRelativeHum...|          1189363|
|HourlyWindDirecti...|          1430352|
|   MaximumWindGustMs|          1405538|
|   HourlyWindSpeedMs|          1387730|
|              Height|            18264|
+--------------------+-----------------+




Verificámos que havia muitas observações com o valor -9999, que foram usadas sobre os valores omissos. No entanto, para garantir que esses valores não influenciem negativamente a estrutura e o padrão da nossa regressão, decidimos removê-los. No nosso caso, vamos proceder à eliminação da coluna `GlobalRadiation` e, nas outras colunas, vamos eliminar todas as observações que contêm o valor -9999. Esta abordagem assegura que a qualidade dos dados não comprometerá a precisão e a robustez dos nossos modelos preditivos.


In [66]:
# drop da coluna GlobalRadiationKjM2 por ter muitos missings
df_southeast = df_southeast.drop("GlobalRadiationKjM2")

In [67]:
# remover linhas com valor -9999 em qualquer coluna
df_southeast = df_southeast.filter(~col("TotalHourlyPrecipitationMm").eqNullSafe(-9999) &
                                                  ~col("HourlyStationLevelAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("LastHourMinAtmosphericPressureMb").eqNullSafe(-9999) &
                                                  ~col("HourlyDryBulbAirTemperatureC").eqNullSafe(-9999) &
                                                  ~col("DewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMinDewPointTemperatureC").eqNullSafe(-9999) &
                                                  ~col("LastHourMaxRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("LastHourMinRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyRelativeHumidityPercentage").eqNullSafe(-9999) &
                                                  ~col("HourlyWindDirectionRadiusDegrees").eqNullSafe(-9999) &
                                                  ~col("MaximumWindGustMs").eqNullSafe(-9999) &
                                                  ~col("HourlyWindSpeedMs").eqNullSafe(-9999) &
                                                  ~col("Height").eqNullSafe(-9999))

In [68]:
df_southeast.printSchema()
df_southeast.show()
df_southeast.count()

root
 |-- Index: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Hour: string (nullable = true)
 |-- TotalHourlyPrecipitationMm: double (nullable = true)
 |-- HourlyStationLevelAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMaxAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMinAtmosphericPressureMb: double (nullable = true)
 |-- HourlyDryBulbAirTemperatureC: double (nullable = true)
 |-- DewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxTemperatureC: double (nullable = true)
 |-- LastHourMinTemperatureC: double (nullable = true)
 |-- LastHourMaxDewPointTemperatureC: double (nullable = true)
 |-- LastHourMinDewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxRelativeHumidityPercentage: integer (nullable = true)
 |-- LastHourMinRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyWindDirectionRadiusDegrees: integer (nullable = true)
 |-- Maximum

13274895


O número de instâncias reduziu de 15345216 para 13274895 após a eliminação das observações com o valor -9999.

In [69]:
# summary stats após transformações
summary_stats = df_southeast.select(cols_num_2).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,13274895,0.146647623201494,1.2125863121400342,0.0,0.0,0.0,0.0,100.0
HourlyStationLevelAtmosphericPressureMb,13274895,948.9438967238514,41.99265628887496,720.1,917.6,948.2,978.4,1050.0
LastHourMaxAtmosphericPressureMb,13274895,949.2043552886902,41.98374050659263,720.3,917.8,948.5,978.7,1050.0
LastHourMinAtmosphericPressureMb,13274895,948.6860503981272,42.0014069569358,720.0,917.3,947.9,978.1,1047.6
HourlyDryBulbAirTemperatureC,13274895,22.06339189123537,5.126604849432286,-7.2,18.8,21.9,25.4,44.8
DewPointTemperatureC,13274895,16.010340119450998,4.239247932834802,-24.9,13.4,16.7,19.2,37.7
LastHourMaxTemperatureC,13274895,22.752046023716147,5.25853719357768,-6.0,19.3,22.5,26.3,45.0
LastHourMinTemperatureC,13274895,21.41178068828419,4.987961374291914,-7.7,18.3,21.4,24.6,42.3
LastHourMaxDewPointTemperatureC,13274895,16.572962611003668,4.174733386944118,-22.1,13.9,17.2,19.7,42.2
LastHourMinDewPointTemperatureC,13274895,15.472800726484229,4.327098227821037,-25.0,12.8,16.2,18.7,36.9


### Estações

Esta informação já está associada a todos os dataframes, ainda assim vamos analisá-la brevemente e fazer o pré processamento caso seja necessário.

In [43]:
# Schema, show e count do DataFrame de Stations

df_stations.printSchema()
df_stations.show(5, truncate=False) 
df_stations.count()

root
 |-- station: string (nullable = true)
 |-- region: string (nullable = true)
 |-- state: string (nullable = true)
 |-- station_code: string (nullable = true)
 |-- first date: date (nullable = true)
 |-- height: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)

+---------------------------+------+-----+------------+----------+------+------------+------------+
|station                    |region|state|station_code|first date|height|longitude   |latitude    |
+---------------------------+------+-----+------------+----------+------+------------+------------+
|QUEIMADAS                  |NE    |BA   |A436        |2008-05-23|315.0 |-39.61694443|-10.984645  |
|MACAU                      |NE    |RN   |A317        |2007-01-06|32.0  |-36.57305554|-5.15111111 |
|SAQUAREMA - SAMPAIO CORREIA|SE    |RJ   |A667        |2019-01-01|26.0  |-42.60888888|-22.8711111 |
|SANTANA DO LIVRAMENTO      |S     |RS   |A804        |2001-11-22|328.0 |-55.

622

Este código exibe o esquema, as primeiras 5 linhas e conta o número total de linhas no dataframe `df_stations`, fornecendo uma visão geral da estrutura e do tamanho dos dados do datadrame das estações.

In [44]:
# há códigos repetidos porque 611 != 622
df_stations.select("station_code").distinct().count()

611

Este comando conta o número de códigos de estação únicos no DataFrame `df_stations`, indicando que há códigos de estação repetidos, pois o número de códigos distintos é menor do que o número total de registos (611 < 622).

In [45]:
# summary stats das stations
summary_stats = df_stations.summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
station,622,,,ABROLHOS,,,,ZE DOCA
region,622,,,CO,,,,SE
state,622,,,AC,,,,TO
station_code,622,,,A001,,,,S717
height,622,153.67808681672022,1686.8247749419163,-9999.0,109.62,345.0,625.0,2450.0
longitude,622,-48.31052870813508,7.355138695678336,-79.49416666,-52.85888888,-48.17333332,-42.7186111,-29.31666666
latitude,622,-15.821456649581986,8.720537558753788,-84.0,-22.35,-16.388977,-8.77666666,4.47749999


Este bloco de código calcula as *summary stats* para todas as colunas do dataframe `df_stations`.

In [46]:
# Verificar as estações repetidas
# Agrupar por 'station_code' e contar as ocorrências
station_counts = df_stations.groupBy("station_code").agg(count("station_code").alias("count"))

# Filtrar as estações com contagem maior que 1
stations_with_more_than_one = station_counts.filter(col("count") > 1)
stations_with_more_than_one.show()

+------------+-----+
|station_code|count|
+------------+-----+
|        A618|    2|
|        A624|    2|
|        A652|    2|
|        A603|    2|
|        A667|    2|
|        A927|    2|
|        A602|    2|
|        A620|    2|
|        A621|    2|
|        A601|    2|
|        A607|    2|
+------------+-----+



Este chunk de código verifica as estações repetidas ao agrupá-las pela coluna "station_code" e ao contar as suas ocorrências. De seguida, filtra as estações com contagem maior que 1 e exibe os resultados, identificando quais as estações que estão duplicadas.

In [47]:
# verificar com algumas dessas estações porque aparecem duplicada (eram estações antigas que foram reformuladas e a localização (station) mudou ligeiramente, mas como representam a mesma estação vamos substituir os registos antigos pelos novos)
filtered_df = df_stations.filter(col("station_code") == "A927")
filtered_df.show()

+--------------------+------+-----+------------+----------+------+------------+------------+
|             station|region|state|station_code|first date|height|   longitude|    latitude|
+--------------------+------+-----+------------+----------+------+------------+------------+
|BRASNORTE (NOVO M...|    CO|   MT|        A927|2019-01-01| 426.2|-58.23138888|-12.52194443|
|BRASNORTE (MUNDO ...|    CO|   MT|        A927|2008-02-29| 431.0|-58.38722221|-12.87194443|
+--------------------+------+-----+------------+----------+------+------------+------------+



Exibe registos específicos da estação com o código "A927", permitindo uma verificação detalhada dos registos duplicados e fornecendo informações sobre as diferenças nas localizações das estações antigas e as novas.

In [48]:
# verificar os dados na tabela de metereologia para a estação acima (os dados na tabela não estão atualizados em algumas instâncias e têm aindas os valores antigos)
vef = df_central_west.filter(col("station_code") == "A927")
vef.show()

+------+----------+-----+--------------------------+---------------------------------------+--------------------------------+--------------------------------+-------------------+----------------------------+--------------------+-----------------------+-----------------------+-------------------------------+-------------------------------+-------------------------------------+-------------------------------------+--------------------------------+--------------------------------+-----------------+-----------------+------+-----+--------------------+-----------+------------+------------+------+
| Index|      Date| Hour|TotalHourlyPrecipitationMm|HourlyStationLevelAtmosphericPressureMb|LastHourMaxAtmosphericPressureMb|LastHourMinAtmosphericPressureMb|GlobalRadiationKjM2|HourlyDryBulbAirTemperatureC|DewPointTemperatureC|LastHourMaxTemperatureC|LastHourMinTemperatureC|LastHourMaxDewPointTemperatureC|LastHourMinDewPointTemperatureC|LastHourMaxRelativeHumidityPercentage|LastHourMinRelativeHumid

Exibe os dados meteorológicos associados à estação com o código "A927", permitindo uma verificação dos valores presentes na tabela de meteorologia e identificando possíveis inconsistências nos dados, como valores desatualizados.

### Concatenação dos DFs

In [70]:
# concatenação dos dataframes
concatenated_df = df_central_west.union(df_south).union(df_north).union(df_southeast).union(df_northeast)
concatenated_df.printSchema()
concatenated_df.show()
concatenated_df.count()

root
 |-- Index: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Hour: string (nullable = true)
 |-- TotalHourlyPrecipitationMm: double (nullable = true)
 |-- HourlyStationLevelAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMaxAtmosphericPressureMb: double (nullable = true)
 |-- LastHourMinAtmosphericPressureMb: double (nullable = true)
 |-- HourlyDryBulbAirTemperatureC: double (nullable = true)
 |-- DewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxTemperatureC: double (nullable = true)
 |-- LastHourMinTemperatureC: double (nullable = true)
 |-- LastHourMaxDewPointTemperatureC: double (nullable = true)
 |-- LastHourMinDewPointTemperatureC: double (nullable = true)
 |-- LastHourMaxRelativeHumidityPercentage: integer (nullable = true)
 |-- LastHourMinRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyRelativeHumidityPercentage: integer (nullable = true)
 |-- HourlyWindDirectionRadiusDegrees: integer (nullable = true)
 |-- Maximum

49759514

Este conjunto de comandos realiza a concatenação dos dataframes `df_central_west`, `df_south`, `df_north`, `df_southeast` e `df_northeast` num único dataframe `concatenated_df`. De seguida, exibe estrutura, as primeiras linhas e devolve o número total de linhas no dataframe concatenado.

In [72]:
# summary stats do df final
summary_stats = concatenated_df.select(cols_num_2).summary().toPandas().transpose()
summary_stats.columns = summary_stats.iloc[0]
summary_stats = summary_stats[1:]
summary_stats

summary,count,mean,stddev,min,25%,50%,75%,max
TotalHourlyPrecipitationMm,49759514,0.152766090118929,1.2644366957327098,0.0,0.0,0.0,0.0,100.0
HourlyStationLevelAtmosphericPressureMb,49759514,965.2958074570424,37.44976884102456,720.1,939.8,969.6,997.0,1050.0
LastHourMaxAtmosphericPressureMb,49759514,965.5655641713112,37.45299070576685,720.3,940.0,969.9,997.3,1050.0
LastHourMinAtmosphericPressureMb,49759514,965.0045343790932,37.43667363657928,720.0,939.5,969.3,996.7,1049.6
HourlyDryBulbAirTemperatureC,49759514,23.413191065330665,5.549682278526599,-7.7,20.1,23.6,27.1,45.0
DewPointTemperatureC,49759514,17.1777642884535,4.760310674870038,-27.5,14.3,18.1,20.9,44.7
LastHourMaxTemperatureC,49759514,24.076027058865453,5.679706125437469,-7.6,20.7,24.1,27.9,45.0
LastHourMinTemperatureC,49759514,22.78356214049835,5.4138710682566415,-9.5,19.6,23.0,26.3,44.9
LastHourMaxDewPointTemperatureC,49759514,17.721601446911173,4.698454445750406,-25.1,14.9,18.6,21.3,44.7
LastHourMinDewPointTemperatureC,49759514,16.658636929211188,4.838334265145028,-29.8,13.7,17.6,20.4,44.6


Análise descritiva:

- A `precipitação` não é registada na maioria das horas, mas raros acontecimentos podem resultar em até 100.0 mm de precipitação numa única hora

- A `pressão atmosférica` média é cerca de 965.3 mb, com variações significativas que podem indicar mudanças climáticas

- A `temperatura` varia de -7.7 °C a 45.0 °C, o que sugere uma grande diversidade do clima ao longo do tempo

- A `humidade` relativa média é de aproximadamente 71.8%, variando de 6% a 100%

- A `velocidade do vento` média é de cerca de 2.05 m/s, com ampla variação entre ventos calmos e ventos fortes de até 29.5 m/s

- A `altitude` média registada é de 431.6 metros, sendo o valor máximo de 2450 metros. A altitude pode ser um fator influenciador das condições meteorológicas

In [71]:
# Guardar em formato parquet

output_all_stations = "all_stations.parquet"
concatenated_df.write.mode("overwrite").parquet(output_all_stations)

Este código guarda o dataframe concatenado em formato Parquet para ser utilizado nos notebooks seguintes.