In [None]:
import numpy                 as np
import pandas                as pd
import glob                  as g
from   datetime              import date

from pyspark.sql             import SparkSession
from pyspark.sql.types       import StringType
from pyspark.sql.types       import TimestampType
from pyspark.sql             import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.functions   import *
from pyspark.sql             import Window
from pyspark.sql.types       import *

spark = (
    SparkSession
        .builder.master('local[*]')
        .appName('Churn')
        .config('spark.driver.memory','2g') 
        .config('spark.executor.memory','2g')
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .config('spark.ui.showConsoleProgress', True)
        .config("viewsEnabled", "true")
        .config("spark.sql.debug.maxToStringFields", 1000)
        .getOrCreate()
)

In [None]:
df = (spark.read.format("csv") 
      .option("header", "True") 
      .load('../Agro/raw_data/*.csv') 
      .withColumn("filename", input_file_name())
    )

df = df.select(
                'Date', 'RegionID', 'dewpoint_temperature_2m',
                'max_temperature_2m', 'min_temperature_2m','temperature_2m',
                'humidity',  'surface_pressure', 'total_precipitation',
                'u_component_of_wind_10m', 'v_component_of_wind_10m', 
                )

df = df.withColumnRenamed("RegionID", "CD_MUN")

df = df.withColumn('temperature_2m',df['temperature_2m'].cast(FloatType()))
df = df.withColumn('max_temperature_2m',df['max_temperature_2m'].cast(FloatType()))
df = df.withColumn('min_temperature_2m',df['min_temperature_2m'].cast(FloatType()))
df = df.withColumn('humidity',df['humidity'].cast(FloatType()))
df = df.withColumn('surface_pressure',df['surface_pressure'].cast(FloatType()))
df = df.withColumn('total_precipitation',df['total_precipitation'].cast(FloatType()))
df = df.withColumn('u_component_of_wind_10m',df['u_component_of_wind_10m'].cast(FloatType()))
df = df.withColumn('v_component_of_wind_10m',df['v_component_of_wind_10m'].cast(FloatType()))



d_para = (spark.read.format("csv") 
      .option("header", "True") 
      .load('../Agro/raw_data/de_para_IBGE.csv') 
    )

d_para = d_para.withColumn('CD_MUN',d_para['CD_MUN'].cast(StringType()))


df = (df.join(d_para, (df["CD_MUN"] == d_para["CD_MUN"]), how="inner"))

df = (df.groupby('SIGLA_UF','Date')
               .agg(F.mean("temperature_2m").alias("temperature_2m"),
                    F.mean("max_temperature_2m").alias("max_temperature_2m"),
                    F.mean("min_temperature_2m").alias("min_temperature_2m"),
                    F.mean("humidity").alias("humidity"),
                    F.mean("surface_pressure").alias("surface_pressure"),
                    F.mean("total_precipitation").alias("total_precipitation"),
                    F.mean("u_component_of_wind_10m").alias("u_component_of_wind_10m"),
                    F.mean("v_component_of_wind_10m").alias("v_component_of_wind_10m")
                   )
    )

df = df.withColumn('temperature_2m', col('temperature_2m') - lit(273.15))
df = df.withColumn('max_temperature_2m', col('max_temperature_2m') - lit(273.15))
df = df.withColumn('min_temperature_2m', col('min_temperature_2m') - lit(273.15))


In [None]:
(
    df.write.format("parquet")
    .mode("overwrite")
    .save("../Agro/raw_data/clima_Agro.parquet")
)