In [None]:
import basedosdados as bd

from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
spark = SparkSession \
          .builder \
          .config("spark.sql.caseSensitive", "true") \
          .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") \
          .config("spark.sql.sources.commitProtocolClass","org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") \
          .appName("ipca_etl") \
          .getOrCreate()

In [None]:
SOURCE_DATASET = "br_ibge_ipca"
SOURCE_TABLE = "mes_brasil"
GCP_PROJECT_ID = "puc-gcp-sources"

In [None]:
raw_table = bd.read_table(
    dataset_id=SOURCE_DATASET,
    table_id=SOURCE_TABLE,
    billing_project_id=GCP_PROJECT_ID
)
df = spark.createDataFrame(raw_table)

In [None]:
df.createTempView("stg_ipca")

In [None]:
query_current_year = '''
  SELECT *
  FROM stg_ipca
  WHERE ano = 2022
  ORDER BY mes;
'''

df_ipca_2022 = spark.sql(query_current_year)

In [None]:
df_ipca_2022.show()

In [None]:
highest_month_inflation_rates_query = '''
    SELECT
        CONCAT(ano, "-", mes) AS mes,
        variacao_mensal
    FROM stg_ipca
    WHERE variacao_mensal <> 'NaN'
    ORDER BY variacao_mensal DESC
    LIMIT 10;
'''
spark.sql(highest_month_inflation_rates_query) \
      .toPandas() \
      .plot \
      .bar(x="mes", y="variacao_mensal")

In [None]:
anual_variation_historic = '''
    SELECT
      ano,
      variacao_anual
    FROM stg_ipca
    WHERE mes = 12
        AND ano BETWEEN 2000 AND 2011
    ORDER BY ano DESC;
'''
spark.sql(anual_variation_historic) \
      .toPandas() \
      .plot \
      .line(x="ano", y="variacao_anual")

In [None]:
df.repartition(1) \
  .write \
  .format("csv") \
  .mode("overwrite") \
  .save("ibge_ipca_brasil")

In [None]:
lowest_inflation = '''
  SELECT 
      CONCAT(ano, "-", mes) AS mes,
      variacao_mensal
    FROM stg_ipca
    WHERE variacao_mensal <> 'NaN'
    ORDER BY variacao_mensal ASC
    LIMIT 10;
'''

df_ipca_low_inflation = spark.sql(lowest_inflation)

In [None]:
df_ipca_low_inflation.show()

In [None]:
high_inflation = '''
    SELECT
        CONCAT(ano, "-", mes) AS mes,
        variacao_mensal
    FROM stg_ipca
    WHERE variacao_mensal <> 'NaN'
    ORDER BY variacao_mensal DESC
    LIMIT 10;
'''
# spark.sql(high_inflation) \
  #    .toPandas() \
   #   .plot \
    #  .bar(x="mes", y="variacao_mensal")

hi_inflation = spark.sql(high_inflation)

In [None]:
hi_inflation.show()