# Setup

In [1]:
import duckdb
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession,Window
import pyspark.sql.functions as F
import os
from pathlib import Path

initial_path = 'C:/Users/Mateus Santos Rochas/Desktop/Estudos/07. Doutorado - Matemática aplicada/Pesquisa/Dados R2'
bronze_path = os.path.join(initial_path,'1. bronze')
prata_path = os.path.join(initial_path,'2. prata')
ouro_path = os.path.join(initial_path,'3. ouro')

spark = SparkSession.builder.appName("Mateus") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.sql.files.openCostInBytes", "4MB")  \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.driver.memory", "16g")  \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "4")  \
    .getOrCreate()

conn = duckdb.connect('r2_prata.duckdb')
conn.execute("ATTACH 'r2_ouro.duckdb' AS r2_ouro")


def sql(query,conn=conn):
    return conn.execute(query).fetch_df()

show_tables_prata_query = """ SELECT DISTINCT table_name FROM information_schema.tables WHERE table_catalog = 'r2_prata' """


show_tables_ouro_query = """ SELECT DISTINCT table_name FROM information_schema.tables WHERE table_catalog = 'r2_ouro' """

# Protótipos

In [2]:
from pyspark.sql.types import TimestampType,LongType,DoubleType,FloatType,StringType,StructField,StructType,IntegerType

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

# Funções para preenchimento de dados faltantes usando estações próximas
def preencher_com_vizinha(row,medida,cols_vizinhas):
    if pd.notnull(row[f'vl_{medida}']):
        return row[f'vl_{medida}']
    # Itera sobre as temperaturas vizinhas e retorna o primeiro valor não nulo
    return next((row[col] for col in cols_vizinhas if pd.notnull(row[col])), np.nan)

def fill_na_estacoes_vizinhas(abt,medida):
    cols_vizinhas = [f'vl_{medida}_vizinha_{i}' for i in range(1, 13)]
    return abt.apply(preencher_com_vizinha,medida=medida,cols_vizinhas=cols_vizinhas, axis=1)

def construir_abt_primaria(n_estacoes_vizinhas):
    # Importando a ABT Bruta pronta do database
    abt_bruto = sql("SELECT * FROM r2_ouro.abt_bruto")

    # Substituindo os dados -9999 por NaN, já que representam erros de medição
    abt = abt_bruto.replace(-9999,np.nan)

    # Preenchendo os valores nulos de temperatura e umidade com a estação vizinha mais próxima o possível
    for medida in ['temperatura_minima','temperatura_media','temperatura_maxima','umidade_relativa_minima','umidade_relativa_media']:
        abt[f'vl_{medida}'] = fill_na_estacoes_vizinhas(abt,medida)

    # Retirando as colunas auxiliares de temperatura e umidade das estações vizinhas
    abt = abt.drop([f'vl_{medida}_vizinha_{i}' for i in range(1,13) for medida in ['temperatura_minima','temperatura_media','temperatura_maxima','umidade_relativa_minima','umidade_relativa_media']]+[c for c in abt.columns if 'umidade_relativa_maxima' in c],axis=1)

    # Inserindo as informações de lat-lon
    dim_estacoes = sql("SELECT id_estacao,lat AS lat_estacao, lon AS lon_estacao FROM r2_prata.dim_estacoes")
    abt = abt.merge(dim_estacoes,on='id_estacao',how='inner')

    # Reordenando as colunas
    abt = abt[
        ['id_estacao','dt_medicao','lat_estacao','lon_estacao',
        'vl_temperatura_minima','vl_temperatura_media','vl_temperatura_maxima',
        'vl_umidade_relativa_minima','vl_umidade_relativa_media']+
        [item for i in range(1, n_estacoes_vizinhas+1) for item in (f'id_estacao_vizinha_{i}', f'vl_precipitacao_vizinha_{i}')]+
        ['vl_precipitacao']]

    # Corrigindo um bug de colunas duplicadas
    abt = abt.loc[:, ~abt.columns.duplicated()]

    return abt

In [4]:
abt_pandas = construir_abt_primaria(n_estacoes_vizinhas=12)

In [8]:
abt_pandas.to_csv(os.path.join(ouro_path,'abt_primaria_12_estacoes.csv'),index=False)

# ABT com Satélite

## Importando a ABT no Spark

In [3]:
abt = spark.read.format('csv').option('header','true').option('inferSchema','true').load(os.path.join(ouro_path,'abt_primaria_12_estacoes.csv'))

## Join com satélites

In [4]:
def get_single_csv(satelite, folder,db):
    file_path = os.path.join(f'{folder}',f'{db}_fato_{satelite}.csv',[f for f in os.listdir(os.path.join(folder,f'{db}_fato_{satelite}.csv')) if f.endswith('.csv')][0])
    return str(Path(file_path).resolve())

satelites = ['AgCFSR','AgMERRA','CHIRPS','CPC','GL','GPM_Final_Run', 'GPM_Late_Run','PERSIANN_CDR','POWER','TRMM']

In [5]:
satelite_dict = {satelite:spark.read.format('csv').option('header','true').option('inferSchema','true').load(get_single_csv(satelite,'2. prata','prata')) for satelite in satelites}

for satelite_name,df in satelite_dict.items():
    for column in df.columns:
        if not column in ['dt_medicao','lat','lon']:
            df = df.withColumnRenamed(column,f'{column}_{satelite_name}')
    satelite_dict[satelite_name] = df

In [31]:
delta_dict = {
    'AgCFSR':0.5,
    'AgMERRA':0.5,
    'CHIRPS':0.1,
    'CPC':0.1,
    'GL':0.2,
    'GPM_Final_Run':0.2,
    'GPM_Late_Run':0.2,
    'PERSIANN_CDR':0.5,
    'POWER':1,
    'TRMM':0.5
}

window_spec = Window.partitionBy("id_estacao", "dt_medicao").orderBy("lat")

joined_df = {}
for satelite_name, df in satelite_dict.items():
    alias_df = df.withColumnRenamed('dt_medicao','dt_medicao_satelite')

    joined_df[satelite_name] = abt.join(
        alias_df,
        on=[
            (abt.dt_medicao == alias_df.dt_medicao_satelite) &
            (F.abs(abt.lat_estacao-alias_df.lat)<=delta_dict[satelite_name]/4) &
            (F.abs(abt.lon_estacao-alias_df.lon)<=delta_dict[satelite_name]/4)
        ],
        how='left') \
        .withColumn("rank", F.row_number().over(window_spec)) \
        .filter(F.col("rank") == 1).drop("rank") \
        .drop('rank')

In [32]:
for satelite_name, df in satelite_dict.items():    
    joined_df[satelite_name] \
            .drop('dt_medicao_satelite','lat','lon') \
            .coalesce(1) \
            .write \
            .option("header",True) \
            .option("delimiter",",") \
            .mode("overwrite") \
            .csv(os.path.join(ouro_path, f'ouro_ABT_{satelite_name}.csv'))

## Salvando no DB

In [33]:
def get_single_csv_abt(satelite, folder,db):
    file_path = os.path.join(f'{folder}',f'{db}_ABT_{satelite}.csv',[f for f in os.listdir(os.path.join(folder,f'{db}_ABT_{satelite}.csv')) if f.endswith('.csv')][0])
    return str(Path(file_path).resolve())

for satelite in satelite_dict.keys():
    conn.execute(f"""
    CREATE OR REPLACE TABLE
                r2_ouro.ABT_satelite_{satelite} AS 
    SELECT * FROM read_csv_auto(
    '{get_single_csv_abt(satelite, '3. ouro','ouro')}'
    )""")

In [34]:
abt_satelite = {satelite:sql(f"SELECT * FROM r2_ouro.ABT_satelite_{satelite}") for satelite in satelites}

In [35]:
print(f"ABT: {abt.count()}")
for satelite,df in abt_satelite.items():
    print(f"ABT_{satelite}: {len(df)}")

ABT: 2268306
ABT_AgCFSR: 2268306
ABT_AgMERRA: 2268306
ABT_CHIRPS: 2268306
ABT_CPC: 2268306
ABT_GL: 2268306
ABT_GPM_Final_Run: 2268306
ABT_GPM_Late_Run: 2268306
ABT_PERSIANN_CDR: 2268306
ABT_POWER: 2268306
ABT_TRMM: 2268306


## Join final dos satélites

In [36]:
sql("SELECT table_name FROM information_schema.tables WHERE table_name LIKE '%satelite%' AND table_name LIKE '%ABT%'")

Unnamed: 0,table_name
0,ABT_satelite_AgCFSR
1,ABT_satelite_AgMERRA
2,ABT_satelite_all
3,ABT_satelite_CHIRPS
4,ABT_satelite_CPC
5,ABT_satelite_GL
6,ABT_satelite_GPM_Final_Run
7,ABT_satelite_GPM_Late_Run
8,ABT_satelite_PERSIANN_CDR
9,ABT_satelite_POWER


In [37]:
satelites = ['AgCFSR','AgMERRA','CHIRPS','CPC','GL','GPM_Final_Run', 'GPM_Late_Run','PERSIANN_CDR','POWER','TRMM']
string_columns_satelite = {
    ','+','.join([f'{satelite}.{col}' for col in list(sql(f"SELECT * FROM r2_ouro.ABT_satelite_{satelite} LIMIT 1").columns[34:])])
    for satelite in satelites
    if satelite != 'AgCFSR'}

In [38]:
abt_satelite_join_all = sql(
f"""
SELECT
    AgCFSR.*
    {''.join(string_columns_satelite)}
FROM r2_ouro.ABT_satelite_AgCFSR AS AgCFSR

{' '.join({
    f"JOIN r2_ouro.ABT_satelite_{satelite} AS {satelite} ON AgCFSR.id_estacao = {satelite}.id_estacao AND AgCFSR.dt_medicao = {satelite}.dt_medicao"
    for satelite in satelites
    if satelite != 'AgCFSR'})}
"""
)

In [39]:
sql("CREATE OR REPLACE TABLE r2_ouro.ABT_satelite_all AS (SELECT * FROM abt_satelite_join_all)")

Unnamed: 0,Count
0,2268306


In [40]:
conn.close()