## Trabalho de Conclusão de Curso
## Pós Graduação Ciência de Dados e Inteligência Artificial - PUCRS

### UTILIZANDO TÉCNICAS DE MACHINE LEARNING NA PRECIFICAÇÃO DE TARIFAS DE ARRECADAÇÃO TRIBUTÁRIA NO BRASIL

## Michel Dourado

### Parte 1 - Engenharia de dados


In [1]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.10.9


In [2]:
# Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
# pip install -U nome_pacote

# Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
#!pip install nome_pacote==versão_desejada

# Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

# Instala o pacote watermark. 
# Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
!pip install -q -U watermark

In [3]:
# https://pypi.org/project/findspark/
!pip install -q findspark

In [4]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [5]:
# Imports
import pyspark
import pandas as pd
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
#from sklearn import linear_model

from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

from sklearn import metrics 
from sklearn import preprocessing
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext, Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
import warnings
warnings.filterwarnings("ignore")
pd.set_option('display.float_format', lambda x : '%.2f' % x)
%matplotlib inline

In [6]:
# Formatação das saídas
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)
from matplotlib.axes._axes import _log as matplotlib_axes_logger
matplotlib_axes_logger.setLevel('ERROR')

In [7]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Michel Dourado" --iversions

Author: Michel Dourado

pyspark   : 3.4.1
sklearn   : 1.2.1
sys       : 3.10.9 | packaged by Anaconda, Inc. | (main, Mar  1 2023, 18:18:15) [MSC v.1916 64 bit (AMD64)]
numpy     : 1.23.5
pandas    : 1.5.3
matplotlib: 3.7.0
seaborn   : 0.12.2
findspark : 2.0.1



## Preparando o Ambiente Spark

Conectando em um cluster Spark.

In [8]:
# Definindo semente aleatória (seed) para reprodutibilidade do notebook
rnd_seed = 33
np.random.seed = rnd_seed
np.random.set_state = rnd_seed

In [9]:
# Criando o Spark Context
sc = SparkContext(appName = "rate_predictor")

In [10]:
sc.setLogLevel("ERROR")

In [11]:
# Criando as sessões Spark
spark_session_arrec = SparkSession.Builder().getOrCreate()

In [12]:
# Criando as sessões Spark
spark_session_mc = SparkSession.Builder().getOrCreate()

In [13]:
# Visualizando os objetos spark_session
spark_session_arrec

In [14]:
# Visualizando os objetos spark_session
spark_session_mc

## Carregando os Dados

As fontes de dados são 2 datasets contendo dados de séries históricas sobre guias arrecadadas entre 2019 e 2023 por entes públicos no Brasil através de uma entidade financeira, o primeiro dataset, chamado de dados_arrec contêm, dentre outras variáveis, a quantidade de guias arrecadadas por cliente e o segundo dataset, chamado de dados_mc, contêm, dentre outros recursos, o volume em reais de tarifas arrecadadas por cliente, a junção dos 2 datasets irá compor um dataset final com o valor de tarifa praticada calculado.

Como o tamanho dos datasets é um pouco considerável, aproxidamente 1,5 milhão de linhas e 400 mil linhas respectivamente, a biblioteca Pandas não se mostrou eficiente para o carregamento dos dados, então foi utilizada a biblioteca pySpark, que se mostrou bem mais adequada e robusta para realizar as tarefas de carregamento, data wrangling (que consiste nas tarefas de tratamento e limpeza dos dados, como a remoção de valores nulos dentre outras atividades semelhantes), além de engenharia de atributos e o Pandas foi utilizado para as atividades de análise exploratória, por ser mais adequado e amigável para essa finalidade.

### Dataset arrecadacao

In [15]:
# Carrega os dados a partir da sessão Spark
df_spark_arrec = spark_session_arrec.read.csv('dados/dataset_arrecadacao_vf.csv', header = 'true', sep=',', inferSchema = 'true') 

In [16]:
# Tipo do objeto
type(df_spark_arrec)

pyspark.sql.dataframe.DataFrame

In [17]:
# Visualiza os dados
df_spark_arrec.show()

+-------+---------+---+-------+---------+----------+-------------+
|Periodo|       ID| UF| Esfera|    Canal|Quantidade|       Volume|
+-------+---------+---+-------+---------+----------+-------------+
|07/2019|416383829| BR|FEDERAL|    Gefin|         2|       786,55|
|07/2019|416383829| BR|FEDERAL|    Caixa|   306.480|14658804861,3|
|07/2019|416383829| BR|FEDERAL|      TAA|   189.035|  154809808,6|
|07/2019|416383829| BR|FEDERAL|      TAA|   114.248|  78916937,81|
|07/2019|416383829| BR|FEDERAL|      TAA|    71.002|  90899116,33|
|07/2019|416383829| BR|FEDERAL|    Gefin|   357.123|2529176259,41|
|07/2019|416383829| BR|FEDERAL|B. Postal|    65.285|  13261196,42|
|07/2019|416383829| BR|FEDERAL|    Coban|    46.934|  10207974,77|
|07/2019|416383829| BR|FEDERAL|   Outros|         2|       572,39|
|07/2019|416383829| BR|FEDERAL|   Outros|     2.210| 196659057,73|
|07/2019|416383829| BR|FEDERAL|   Outros|        93|  11309766,62|
|07/2019|416383829| BR|FEDERAL|   Outros|     2.045|  17502660

In [18]:
# Verifica o número de linhas
df_spark_arrec.count()

1422112

In [19]:
# Visualiza os metadados (schema)
df_spark_arrec.printSchema()

root
 |-- Periodo: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Canal: string (nullable = true)
 |-- Quantidade: string (nullable = true)
 |-- Volume: string (nullable = true)



### Dataset MC

In [130]:
# Carrega os dados a partir da sessão Spark
df_spark_mc = spark_session_mc.read.csv('dados/dataset_mc_arrec_vf.csv', header = 'true', sep=',', inferSchema = 'true') 

In [131]:
type(df_spark_mc)

pyspark.sql.dataframe.DataFrame

In [132]:
df_spark_mc.show()

+----------+----+---+---------+-------+-------+-----------------------+-----------------+
|   PERIODO| ANO|MES|       ID| MARGEM| TARIFA|COD_TIPO_CARTEIRA_RELAC|SG_UF_MUN_END_CLI|
+----------+----+---+---------+-------+-------+-----------------------+-----------------+
|01/01/2022|2022|  1|200084156|  618,5| 638,73|                    406|               SP|
|01/02/2022|2022|  2|200084156| 843,65| 854,46|                    406|               SP|
|01/03/2022|2022|  3|200084156|5941,75|6254,47|                    406|               SP|
|01/04/2022|2022|  4|200084156|2695,41|2843,33|                    406|               SP|
|01/05/2022|2022|  5|200084156|2328,03|2461,86|                    406|               SP|
|01/06/2022|2022|  6|200084156|2483,55| 2623,3|                    406|               SP|
|01/07/2022|2022|  7|200084156|2446,43|2524,35|                    406|               SP|
|01/08/2022|2022|  8|200084156| 2257,1|2375,84|                    406|               SP|
|01/09/202

In [133]:
df_spark_mc.count()

275719

In [134]:
df_spark_mc.printSchema()

root
 |-- PERIODO: string (nullable = true)
 |-- ANO: integer (nullable = true)
 |-- MES: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- MARGEM: string (nullable = true)
 |-- TARIFA: string (nullable = true)
 |-- COD_TIPO_CARTEIRA_RELAC: integer (nullable = true)
 |-- SG_UF_MUN_END_CLI: string (nullable = true)



## Data Wrangling com SparkSQL
Limpeza e tratamento dos dados, como remoção de valores nulos e outras atividades semelhantes.

### DATATSET DF_SPARK_ARREC

In [135]:
# Cria uma tabela temporária a partir do dataframe
# As tabelas temporárias são úteis quando você deseja que o conjunto de resultados fique visível 
# para todas as outras sessões Spark
df_spark_arrec.createOrReplaceTempView('dados_arrec') 

In [21]:
# Executa uma consulta SQL
df_temp_arrec = spark_session_arrec.sql("select * from dados_arrec");

In [22]:
type(df_temp_arrec)

pyspark.sql.dataframe.DataFrame

In [23]:
df_temp_arrec.printSchema()

root
 |-- Periodo: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Canal: string (nullable = true)
 |-- Quantidade: string (nullable = true)
 |-- Volume: string (nullable = true)



In [24]:
# Visualiza os dados
df_temp_arrec.show()

+-------+---------+---+-------+---------+----------+-------------+
|Periodo|       ID| UF| Esfera|    Canal|Quantidade|       Volume|
+-------+---------+---+-------+---------+----------+-------------+
|07/2019|416383829| BR|FEDERAL|    Gefin|         2|       786,55|
|07/2019|416383829| BR|FEDERAL|    Caixa|   306.480|14658804861,3|
|07/2019|416383829| BR|FEDERAL|      TAA|   189.035|  154809808,6|
|07/2019|416383829| BR|FEDERAL|      TAA|   114.248|  78916937,81|
|07/2019|416383829| BR|FEDERAL|      TAA|    71.002|  90899116,33|
|07/2019|416383829| BR|FEDERAL|    Gefin|   357.123|2529176259,41|
|07/2019|416383829| BR|FEDERAL|B. Postal|    65.285|  13261196,42|
|07/2019|416383829| BR|FEDERAL|    Coban|    46.934|  10207974,77|
|07/2019|416383829| BR|FEDERAL|   Outros|         2|       572,39|
|07/2019|416383829| BR|FEDERAL|   Outros|     2.210| 196659057,73|
|07/2019|416383829| BR|FEDERAL|   Outros|        93|  11309766,62|
|07/2019|416383829| BR|FEDERAL|   Outros|     2.045|  17502660

In [25]:
# Vamos renomear algumas colunas para facilitar a manipulação dos dados
df_temp_arrec = df_temp_arrec.withColumnRenamed("Periodo", "mes_ano")

In [26]:
df_temp_arrec[['mes_ano','ID','Quantidade']].show()

+-------+---------+----------+
|mes_ano|       ID|Quantidade|
+-------+---------+----------+
|07/2019|416383829|         2|
|07/2019|416383829|   306.480|
|07/2019|416383829|   189.035|
|07/2019|416383829|   114.248|
|07/2019|416383829|    71.002|
|07/2019|416383829|   357.123|
|07/2019|416383829|    65.285|
|07/2019|416383829|    46.934|
|07/2019|416383829|         2|
|07/2019|416383829|     2.210|
|07/2019|416383829|        93|
|07/2019|416383829|     2.045|
|07/2019|416383829|         6|
|07/2019|416383829|         1|
|07/2019|416383829|       417|
|07/2019|416383829|        44|
|07/2019|416383829|   210.446|
|07/2019|416383829|       141|
|07/2019|416383829|       193|
|07/2019|416383829|         3|
+-------+---------+----------+
only showing top 20 rows



In [27]:
# Vamos dividir o dataframe extraindo o mês
df_data_arrec = df_temp_arrec.withColumn("mes", split(col("mes_ano"),"/").getItem(0))

In [28]:
# Vamos dividir o dataframe extraindo o ano
df_data_arrec = df_data_arrec.withColumn("ano", split(col("mes_ano"),"/").getItem(1))

In [29]:
# Schema
df_data_arrec.printSchema()

root
 |-- mes_ano: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Canal: string (nullable = true)
 |-- Quantidade: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- ano: string (nullable = true)



In [30]:
df_data_arrec = df_data_arrec.withColumn("Periodo", concat(lit('01-'),df_data_arrec["mes"], lit('-'),df_data_arrec["ano"]))
df_data_arrec = df_data_arrec.withColumn("Periodo",to_date(col("Periodo"),"MM-dd-yyyy"))

In [31]:
# Visualiza os dados
df_data_arrec[['Periodo', 'mes', 'ano']].show()

+----------+---+----+
|   Periodo|mes| ano|
+----------+---+----+
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
|2019-01-07| 07|2019|
+----------+---+----+
only showing top 20 rows



In [32]:
df_data_arrec.printSchema()

root
 |-- mes_ano: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Canal: string (nullable = true)
 |-- Quantidade: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- Periodo: date (nullable = true)



In [33]:
df_data_arrec= df_data_arrec.withColumn("Quantidade",round(df_data_arrec.Quantidade.cast(DoubleType()),2))

In [34]:
df_data_arrec= df_data_arrec.withColumn("Volume",round(df_data_arrec.Volume.cast(DoubleType()),2))

In [35]:
df_data_arrec.printSchema()

root
 |-- mes_ano: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Canal: string (nullable = true)
 |-- Quantidade: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- mes: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- Periodo: date (nullable = true)



In [36]:
#Agrupando dados por ID
df_data_arrec.createOrReplaceTempView('dados_arrec') 

In [37]:
spark_session_arrec.sql("SHOW COLUMNS FROM dados_arrec").show()

+----------+
|  col_name|
+----------+
|   mes_ano|
|        ID|
|        UF|
|    Esfera|
|     Canal|
|Quantidade|
|    Volume|
|       mes|
|       ano|
|   Periodo|
+----------+



In [38]:
spark_session_arrec.sql("SELECT * FROM dados_arrec LIMIT 5").show()

+-------+---------+---+-------+-----+----------+------+---+----+----------+
|mes_ano|       ID| UF| Esfera|Canal|Quantidade|Volume|mes| ano|   Periodo|
+-------+---------+---+-------+-----+----------+------+---+----+----------+
|07/2019|416383829| BR|FEDERAL|Gefin|       2.0|  null| 07|2019|2019-01-07|
|07/2019|416383829| BR|FEDERAL|Caixa|    306.48|  null| 07|2019|2019-01-07|
|07/2019|416383829| BR|FEDERAL|  TAA|    189.04|  null| 07|2019|2019-01-07|
|07/2019|416383829| BR|FEDERAL|  TAA|    114.25|  null| 07|2019|2019-01-07|
|07/2019|416383829| BR|FEDERAL|  TAA|      71.0|  null| 07|2019|2019-01-07|
+-------+---------+---+-------+-----+----------+------+---+----+----------+



In [39]:
spark_session_arrec.sql("DESCRIBE dados_arrec").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|   mes_ano|   string|   null|
|        ID|      int|   null|
|        UF|   string|   null|
|    Esfera|   string|   null|
|     Canal|   string|   null|
|Quantidade|   double|   null|
|    Volume|   double|   null|
|       mes|   string|   null|
|       ano|   string|   null|
|   Periodo|     date|   null|
+----------+---------+-------+



In [40]:
#Período de início dos registros: jul/2019
spark_session_arrec.sql("SELECT MIN(periodo) FROM dados_arrec").show()

+------------+
|min(periodo)|
+------------+
|  2019-01-07|
+------------+



In [41]:
#Período final dos registro: mai/2023 
spark_session_arrec.sql("SELECT MAX(periodo) FROM dados_arrec").show()

+------------+
|max(periodo)|
+------------+
|  2023-01-05|
+------------+



In [42]:
#Agregação por cliente (ID)
# optamos por agregar por ano e não por mês/ano para não gerar distorções no cálculo da tarifa posteriormente, pois nem todo
#mês os clientes possuem movimento de arrecadação. 

query="""

SELECT 
ID, 
ANY_VALUE(UF) as UF,
ANY_VALUE(Esfera) as Esfera,
SUM(Quantidade) AS Soma_de_quantidade, 
SUM(Volume) AS Soma_de_volume,
ano AS Ano
FROM dados_arrec
GROUP BY ID, ano 
ORDER BY Soma_de_quantidade DESC

"""
spark_session_arrec.sql(query).show()

+---------+---+--------+------------------+--------------+----+
|       ID| UF|  Esfera|Soma_de_quantidade|Soma_de_volume| Ano|
+---------+---+--------+------------------+--------------+----+
|501365928| PB|ESTADUAL| 224262.8099999999|     7294630.0|2022|
|501365928| PB|ESTADUAL| 219359.8099999999|   8.3089492E7|2021|
|500688950| RN|ESTADUAL|214686.16999999998|    2.569635E7|2022|
|501365928| PB|ESTADUAL|206907.94000000012|   8.9929732E7|2020|
|500688950| RN|ESTADUAL|204938.06000000006|   2.3276616E7|2021|
|501365928| PB|ESTADUAL|145612.02000000002|     1.41907E7|2019|
|503136705| PE|ESTADUAL|139307.64999999994|   9.8087321E7|2020|
|500688950| RN|ESTADUAL|         136724.33|   1.3106195E7|2020|
|803231607| BA|ESTADUAL|136071.33000000005|   6.9478165E7|2020|
|503136705| PE|ESTADUAL|         127700.46|     2909797.0|2021|
|803231607| BA|ESTADUAL|123541.00999999998|   1.4064254E7|2021|
|105492843| MG|ESTADUAL|121983.00000000006|  2.39741657E8|2022|
|803231607| BA|ESTADUAL|         121613.

In [43]:
#gravando a query em pySpark DataFrame
df_group_id_arrec=spark_session_arrec.sql(query)

In [44]:
df_group_id_arrec.count()

17861

In [45]:
type(df_group_id_arrec)

pyspark.sql.dataframe.DataFrame

In [46]:
df_group_id_arrec.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Soma_de_quantidade: double (nullable = true)
 |-- Soma_de_volume: double (nullable = true)
 |-- Ano: string (nullable = true)



In [47]:
#FAZER AGREGAÇÃO USANDO WINDOW, JANELA DE SOMA_QUANTIDADE E SOMA_VOLUME POR ANO
#criando tabela temporária para usar queries SQL
df_group_id_arrec.createOrReplaceTempView('dados_group_arrec') 

In [48]:
#fazendo a query com função window sql

query="""
SELECT 
ID,
UF,
Esfera,
CAST(Soma_de_quantidade AS DECIMAL(15,0)), 
CAST(Soma_de_volume AS DECIMAL(15,2)),
Ano,
ROW_NUMBER() OVER (PARTITION BY ID ORDER BY Ano) AS Sequencial
FROM dados_group_arrec

"""
spark_session_arrec.sql(query).show()

+-----+---+---------+------------------+--------------+----+----------+
|   ID| UF|   Esfera|Soma_de_quantidade|Soma_de_volume| Ano|Sequencial|
+-----+---+---------+------------------+--------------+----+----------+
| 1198| PR|MUNICIPAL|              1341|         35.00|2022|         1|
| 1198| PR|MUNICIPAL|              2573|         70.00|2023|         2|
| 1821| PR|MUNICIPAL|              5235|          null|2019|         1|
| 1821| PR|MUNICIPAL|             12143|      28178.00|2020|         2|
| 1821| PR|MUNICIPAL|             13058|          null|2021|         3|
| 1821| PR|MUNICIPAL|              2257|       9000.00|2022|         4|
| 1821| PR|MUNICIPAL|               242|          null|2023|         5|
| 7219| PR| ESTADUAL|              2569|     216919.00|2021|         1|
| 7219| PR| ESTADUAL|             12435|    1582179.00|2022|         2|
| 7219| PR| ESTADUAL|              1119|     248557.00|2023|         3|
| 7607| PR|MUNICIPAL|              6169|      86780.00|2022|    

In [49]:
df_group_id_janelas_arrec=spark_session_arrec.sql(query)

In [50]:
df_group_id_janelas_arrec.count()

17861

In [52]:
#Agregação por cliente (ID) transformando anos em colunas (Pivot)
# Quantidade por ano
query="""
SELECT * FROM(
SELECT ID, CAST(SUM(Quantidade) AS DECIMAL(15,0)) AS q, Ano
FROM dados_arrec
GROUP BY ID, Ano
)
PIVOT(
    SUM(q)
    FOR Ano
    in ('2019','2020','2021','2022','2023')
)

ORDER BY ID DESC

"""
spark_session_arrec.sql(query).show()

+---------+-----+-----+-----+-----+----+
|       ID| 2019| 2020| 2021| 2022|2023|
+---------+-----+-----+-----+-----+----+
|937250328| null| null| null|  483|null|
|936565581| 1427| 3136| 3138| 3378|1516|
|935826298|  239|  517|  676|  851| 309|
|935791263|  678| 1309| 1994| 3093|1443|
|935402733| 7061|14228|14705|18889|8436|
|935298611|   99|  843| 1311| 1343| 304|
|935260072| 4555|10887|12724|14393|5873|
|934794039| 6733|10731|11259|14140|5710|
|934402481| 1183| 5639| 8241| 9136|6996|
|933975057| null| null|   10|   53|  27|
|933887049|  172|  489| 1110| 1797| 161|
|933241298| 6624|14758|17283|21272|7650|
|932213580| null| null| null|   13| 172|
|931866978| 2127| 3626| 5002| 5612|2326|
|931809113| null| null| null|    1|null|
|931808991| 2328| 3689| 4122| 4642|1495|
|930402979| 7185|10764|14058|15476|5415|
|930367871|15403|28251|29126|27725|9315|
|930281912| 1657| 3312| 4960| 6773|2793|
|929019479|   22|   68|   95|   73|  27|
+---------+-----+-----+-----+-----+----+
only showing top

In [53]:
#Salvando a query em pySpark Dataframe
df_group_id_anos_arrec=spark_session_arrec.sql(query)

In [54]:
df_group_id_anos_arrec.count()

4160

### DATASET DF_SPARK_MC

In [136]:
# Cria uma tabela temporária a partir do dataframe
# As tabelas temporárias são úteis quando você deseja que o conjunto de resultados fique visível 
# para todas as outras sessões Spark
df_spark_mc.createOrReplaceTempView('dados_mc') 

In [137]:
# Executa uma consulta SQL
df_temp_mc = spark_session_mc.sql("select * from dados_mc");

In [138]:
type(df_temp_mc)

pyspark.sql.dataframe.DataFrame

In [139]:
df_temp_mc.printSchema()

root
 |-- PERIODO: string (nullable = true)
 |-- ANO: integer (nullable = true)
 |-- MES: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- MARGEM: string (nullable = true)
 |-- TARIFA: string (nullable = true)
 |-- COD_TIPO_CARTEIRA_RELAC: integer (nullable = true)
 |-- SG_UF_MUN_END_CLI: string (nullable = true)



In [141]:
df_temp_mc.show()

+----------+----+---+---------+-------+-------+-----------------------+-----------------+
|   PERIODO| ANO|MES|       ID| MARGEM| TARIFA|COD_TIPO_CARTEIRA_RELAC|SG_UF_MUN_END_CLI|
+----------+----+---+---------+-------+-------+-----------------------+-----------------+
|01/01/2022|2022|  1|200084156|  618,5| 638,73|                    406|               SP|
|01/02/2022|2022|  2|200084156| 843,65| 854,46|                    406|               SP|
|01/03/2022|2022|  3|200084156|5941,75|6254,47|                    406|               SP|
|01/04/2022|2022|  4|200084156|2695,41|2843,33|                    406|               SP|
|01/05/2022|2022|  5|200084156|2328,03|2461,86|                    406|               SP|
|01/06/2022|2022|  6|200084156|2483,55| 2623,3|                    406|               SP|
|01/07/2022|2022|  7|200084156|2446,43|2524,35|                    406|               SP|
|01/08/2022|2022|  8|200084156| 2257,1|2375,84|                    406|               SP|
|01/09/202

In [142]:
df_temp_mc[['PERIODO']].show()

+----------+
|   PERIODO|
+----------+
|01/01/2022|
|01/02/2022|
|01/03/2022|
|01/04/2022|
|01/05/2022|
|01/06/2022|
|01/07/2022|
|01/08/2022|
|01/09/2022|
|01/10/2022|
|01/11/2022|
|01/12/2022|
|01/01/2022|
|01/02/2022|
|01/03/2022|
|01/04/2022|
|01/05/2022|
|01/06/2022|
|01/07/2022|
|01/08/2022|
+----------+
only showing top 20 rows



In [143]:
df_mes_mc = df_temp_mc.withColumn("MES_2DIG",
                                  when((col("MES")<10),concat(lit('0'),df_temp_mc["MES"]))
                                 .otherwise(df_temp_mc["MES"]))
df_mes_mc[["MES_2DIG"]].show()

+--------+
|MES_2DIG|
+--------+
|      01|
|      02|
|      03|
|      04|
|      05|
|      06|
|      07|
|      08|
|      09|
|      10|
|      11|
|      12|
|      01|
|      02|
|      03|
|      04|
|      05|
|      06|
|      07|
|      08|
+--------+
only showing top 20 rows



In [144]:
df_data_mc = df_mes_mc.withColumn("PERIODO", concat(lit('01-'),df_mes_mc["MES_2DIG"], lit('-'),df_mes_mc["ANO"]))
df_data_mc = df_data_mc.withColumn("PERIODO",to_date(col("PERIODO"),"MM-dd-yyyy"))

In [145]:
df_data_mc.printSchema()

root
 |-- PERIODO: date (nullable = true)
 |-- ANO: integer (nullable = true)
 |-- MES: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- MARGEM: string (nullable = true)
 |-- TARIFA: string (nullable = true)
 |-- COD_TIPO_CARTEIRA_RELAC: integer (nullable = true)
 |-- SG_UF_MUN_END_CLI: string (nullable = true)
 |-- MES_2DIG: string (nullable = true)



In [146]:
df_data_mc = df_data_mc.drop('MES_2DIG')

In [147]:
df_data_mc.printSchema()

root
 |-- PERIODO: date (nullable = true)
 |-- ANO: integer (nullable = true)
 |-- MES: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- MARGEM: string (nullable = true)
 |-- TARIFA: string (nullable = true)
 |-- COD_TIPO_CARTEIRA_RELAC: integer (nullable = true)
 |-- SG_UF_MUN_END_CLI: string (nullable = true)



In [148]:
# Visualiza os dados
df_data_mc[['PERIODO', 'MES', 'ANO']].show()

+----------+---+----+
|   PERIODO|MES| ANO|
+----------+---+----+
|2022-01-01|  1|2022|
|2022-01-02|  2|2022|
|2022-01-03|  3|2022|
|2022-01-04|  4|2022|
|2022-01-05|  5|2022|
|2022-01-06|  6|2022|
|2022-01-07|  7|2022|
|2022-01-08|  8|2022|
|2022-01-09|  9|2022|
|2022-01-10| 10|2022|
|2022-01-11| 11|2022|
|2022-01-12| 12|2022|
|2022-01-01|  1|2022|
|2022-01-02|  2|2022|
|2022-01-03|  3|2022|
|2022-01-04|  4|2022|
|2022-01-05|  5|2022|
|2022-01-06|  6|2022|
|2022-01-07|  7|2022|
|2022-01-08|  8|2022|
+----------+---+----+
only showing top 20 rows



In [149]:
#Agrupando dados por ID
df_data_mc.createOrReplaceTempView('dados_mc') 


In [150]:
spark_session_mc.sql("DESCRIBE dados_mc").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             PERIODO|     date|   null|
|                 ANO|      int|   null|
|                 MES|      int|   null|
|                  ID|      int|   null|
|              MARGEM|   string|   null|
|              TARIFA|   string|   null|
|COD_TIPO_CARTEIRA...|      int|   null|
|   SG_UF_MUN_END_CLI|   string|   null|
+--------------------+---------+-------+



In [151]:
spark_session_mc.sql("SELECT MIN(PERIODO) FROM dados_mc").show()

+------------+
|min(PERIODO)|
+------------+
|  2019-01-01|
+------------+



In [152]:
spark_session_mc.sql("SELECT MAX(PERIODO) FROM dados_mc").show()

+------------+
|max(PERIODO)|
+------------+
|  2023-01-05|
+------------+



In [153]:
spark_session_mc.sql("SELECT MIN(PERIODO) FROM dados_mc WHERE PERIODO >= '2019-01-07'").show()

+------------+
|min(PERIODO)|
+------------+
|  2019-01-07|
+------------+



In [154]:
df_1S2019 = spark_session_mc.sql("SELECT * FROM dados_mc WHERE PERIODO < '2019-01-07'")

In [155]:
df_1S2019.count()

26197

In [156]:
#quantidade de clientes com dados a partir do 1S2019
dropDisDF = df_1S2019.dropDuplicates(["ID"]).select("ID")
dropDisDF.show(truncate=False)

df_1S2019.dropDuplicates(["ID"]).select("ID").count()

+---------+
|ID       |
+---------+
|200250241|
|200155586|
|501388607|
|204396945|
|205800274|
|914200901|
|901399041|
|201381021|
|104460857|
|104868416|
|104141805|
|419387   |
|105068133|
|600148970|
|602700564|
|200900570|
|250022   |
|103784086|
|200680618|
|556541   |
+---------+
only showing top 20 rows



3255

In [157]:
df_data_mc2 = spark_session_mc.sql("SELECT * FROM dados_mc WHERE PERIODO >= '2019-01-07'")

In [158]:
#Atualizando tabela temporária dados_mc com nova data de início dos registros a partir de jul/2023 
#para igualar com os dados do dataset_arrec

df_data_mc2.createOrReplaceTempView('dados_mc') 

In [159]:
spark_session_mc.sql("SELECT MIN(PERIODO) FROM dados_mc").show()

+------------+
|min(PERIODO)|
+------------+
|  2019-01-07|
+------------+



In [160]:
spark_session_mc.sql("DESCRIBE dados_mc").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             PERIODO|     date|   null|
|                 ANO|      int|   null|
|                 MES|      int|   null|
|                  ID|      int|   null|
|              MARGEM|   string|   null|
|              TARIFA|   string|   null|
|COD_TIPO_CARTEIRA...|      int|   null|
|   SG_UF_MUN_END_CLI|   string|   null|
+--------------------+---------+-------+



In [161]:
df_data_mc2.columns

['PERIODO',
 'ANO',
 'MES',
 'ID',
 'MARGEM',
 'TARIFA',
 'COD_TIPO_CARTEIRA_RELAC',
 'SG_UF_MUN_END_CLI']

In [162]:
#Agregação por cliente (ID)
# Optamos por agrupar por ano e não por período (mês/ano) devido nem todo mês os clientes possuírem guias arrecadadas, 
#o que resultaria em distroções no cálculo de tarifa a ser feito posteriormente.
query="""
SELECT 
ID,
ANY_VALUE(COD_TIPO_CARTEIRA_RELAC) AS Cod_Tipo_Cart_Rel,
ANY_VALUE(SG_UF_MUN_END_CLI) AS UF,
CAST(SUM(MARGEM) AS DECIMAL(15,2)) AS Soma_de_margem, 
CAST(SUM(TARIFA) AS DECIMAL(15,2)) AS Soma_de_tarifas,
ANO as Ano
FROM dados_mc
GROUP BY ID, ANO
ORDER BY SUM(TARIFA) DESC
"""
spark_session_mc.sql(query).show()

+---------+-----------------+---+--------------+---------------+----+
|       ID|Cod_Tipo_Cart_Rel| UF|Soma_de_margem|Soma_de_tarifas| Ano|
+---------+-----------------+---+--------------+---------------+----+
|509646522|              405| SP|          null|     3056576.00|2021|
|106229821|              403| MT|          null|     2769415.00|2019|
|200964047|              400| SP|     140560.00|     1752983.00|2020|
|104372038|              403| MA|          null|     1520186.00|2020|
|106229821|              403| MT|          null|     1398942.00|2020|
|509646522|              405| SP|          null|     1120136.00|2022|
|200964047|              400| SP|          null|      863308.00|2023|
|907132352|              400| SP|          null|      723595.00|2023|
|509646522|              405| SP|          null|      674168.00|2019|
|500019475|              403| CE|          null|      627668.00|2019|
|501466442|              403| PE|          null|      620961.00|2022|
|500688950|         

In [163]:
#Gravando query em pySpark DataFrame
df_group_id_mc = spark_session_mc.sql(query)

In [164]:
#Verificando quantidade de registros do dataset_mc agrupado por cliente (ID)
df_group_id_mc.count()

19481

In [165]:
#FAZER AGREGAÇÃO USANDO WINDOW, JANELA DE SOMA_QUANTIDADE E SOMA_VOLUME POR ANO
#criando tabela temporária para usar queries SQL
df_group_id_mc.createOrReplaceTempView('dados_group_mc') 

#fazendo a query com função window sql

query="""
SELECT 
ID,
Cod_Tipo_Cart_Rel,
UF,
CAST(Soma_de_tarifas AS DECIMAL(15,2)), 
CAST(Soma_de_margem AS DECIMAL(15,2)),
Ano,
ROW_NUMBER() OVER (PARTITION BY ID ORDER BY Ano) AS Sequencial
FROM dados_group_mc

"""
spark_session_mc.sql(query).show(70)

+-----+-----------------+---+---------------+--------------+----+----------+
|   ID|Cod_Tipo_Cart_Rel| UF|Soma_de_tarifas|Soma_de_margem| Ano|Sequencial|
+-----+-----------------+---+---------------+--------------+----+----------+
| 1198|              407| PR|          21.00|          null|2022|         1|
| 1198|              407| PR|          42.00|          null|2023|         2|
| 1821|              406| PR|           null|          null|2019|         1|
| 1821|              406| PR|           null|          null|2020|         2|
| 1821|              406| PR|           null|          null|2021|         3|
| 1821|              406| PR|           null|          null|2022|         4|
| 1821|              406| PR|         455.00|          null|2023|         5|
| 7219|              405| PR|           0.00|          null|2021|         1|
| 7219|              405| PR|           null|          null|2022|         2|
| 7219|              405| PR|           null|          null|2023|         3|

In [166]:
df_group_id_janelas_mc=spark_session_mc.sql(query)

In [167]:
df_group_id_janelas_mc.count()

19481

In [168]:
#Agregação por cliente (ID) transformando anos em colunas (Pivot)
query="""
SELECT * FROM(
SELECT ID, CAST(SUM(TARIFA) AS DECIMAL(15,2)) AS t, ANO
FROM dados_mc
GROUP BY ID, ANO
)
PIVOT(
    SUM(t)
    FOR Ano
    in ('2019','2020','2021','2022','2023')
)

ORDER BY ID DESC

"""
spark_session_mc.sql(query).show()

+---------+-------+--------+--------+--------+-------+
|       ID|   2019|    2020|    2021|    2022|   2023|
+---------+-------+--------+--------+--------+-------+
|937250328|   null|    null|    null|    0.00|   null|
|936565581|2803.00| 3648.00|  817.00| 3116.00|2052.00|
|935826298|   null|    null|  878.00|  747.00| 708.00|
|935791263|   null| 2155.00|  831.00| 4381.00|   null|
|935626319|   0.00|    0.00|    0.00|    null|   null|
|935402733|   null| 2349.00| 3422.00| 4596.00|   0.00|
|935298611|   null|  173.00|  231.00|  326.00| 182.00|
|935260072|   null|    0.00|  522.00|    0.00|  27.00|
|934794039|   0.00|    0.00|    0.00|11643.00|   0.00|
|934402481|   null|    null|    null|    null|   null|
|933975057|   null|    null|    1.00|   60.00|  65.00|
|933887049|   0.00|  422.00|  873.00| 2061.00|   null|
|933241298|   null|  140.00|  224.00|  122.00| 131.00|
|932213580|   null|    null|    null|    6.00| 104.00|
|931866978| 654.00|   21.00| 1522.00|    0.00|   0.00|
|931809113

In [169]:
#Salvando a query em pySpark Dataframe
df_group_id_anos_mc=spark_session_mc.sql(query)

In [170]:
df_group_id_anos_mc.count()

4517

## Unificando os dois datasets

In [None]:
#Comparando a quantidade de linhas (clientes) dos 2 dataset, nota-se que o dataset_mc possui mais registros 
#Então vamos fazer inner_join, considerando apenas os dados constantes nos 2 datasets, pois para calcular o valor das tarifas, 
#que é o objetivo da junção dos datasets, são necessários o volume de tarifas em reais e os dados de quantidade de guias. 


In [171]:
df_group_id_janelas_arrec.count()

17861

In [172]:
df_group_id_janelas_mc.count()

19481

In [173]:
df_group_id_janelas_mc.count()-df_group_id_janelas_arrec.count()

1620

In [174]:
df_group_id_janelas_arrec.createOrReplaceTempView('dados_group_arrec')


In [175]:
df_group_id_janelas_mc.createOrReplaceTempView('dados_group_mc')

In [176]:
df_group_id_janelas_arrec.show()

+-----+---+---------+------------------+--------------+----+----------+
|   ID| UF|   Esfera|Soma_de_quantidade|Soma_de_volume| Ano|Sequencial|
+-----+---+---------+------------------+--------------+----+----------+
| 1198| PR|MUNICIPAL|              1341|         35.00|2022|         1|
| 1198| PR|MUNICIPAL|              2573|         70.00|2023|         2|
| 1821| PR|MUNICIPAL|              5235|          null|2019|         1|
| 1821| PR|MUNICIPAL|             12143|      28178.00|2020|         2|
| 1821| PR|MUNICIPAL|             13058|          null|2021|         3|
| 1821| PR|MUNICIPAL|              2257|       9000.00|2022|         4|
| 1821| PR|MUNICIPAL|               242|          null|2023|         5|
| 7219| PR| ESTADUAL|              2569|     216919.00|2021|         1|
| 7219| PR| ESTADUAL|             12435|    1582179.00|2022|         2|
| 7219| PR| ESTADUAL|              1119|     248557.00|2023|         3|
| 7607| PR|MUNICIPAL|              6169|      86780.00|2022|    

In [177]:
df_group_id_janelas_mc.show()

+-----+-----------------+---+---------------+--------------+----+----------+
|   ID|Cod_Tipo_Cart_Rel| UF|Soma_de_tarifas|Soma_de_margem| Ano|Sequencial|
+-----+-----------------+---+---------------+--------------+----+----------+
| 1198|              407| PR|          21.00|          null|2022|         1|
| 1198|              407| PR|          42.00|          null|2023|         2|
| 1821|              406| PR|           null|          null|2019|         1|
| 1821|              406| PR|           null|          null|2020|         2|
| 1821|              406| PR|           null|          null|2021|         3|
| 1821|              406| PR|           null|          null|2022|         4|
| 1821|              406| PR|         455.00|          null|2023|         5|
| 7219|              405| PR|           0.00|          null|2021|         1|
| 7219|              405| PR|           null|          null|2022|         2|
| 7219|              405| PR|           null|          null|2023|         3|

In [178]:
#fazendo o inner_join
query="""
SELECT 
tb_arrec.ID,
tb_mc.Cod_Tipo_Cart_Rel,
tb_arrec.UF,
tb_arrec.Esfera,
tb_arrec.Soma_de_quantidade,
tb_arrec.Soma_de_volume,
tb_mc.Soma_de_tarifas,
tb_mc.Soma_de_margem,
tb_arrec.Ano
FROM dados_group_arrec AS tb_arrec
INNER JOIN dados_group_mc AS tb_mc
ON tb_arrec.ID = tb_mc.ID AND tb_arrec.Ano = tb_mc.Ano
"""

spark_session_arrec.sql(query).show()

+---------+-----------------+---+---------+------------------+--------------+---------------+--------------+----+
|       ID|Cod_Tipo_Cart_Rel| UF|   Esfera|Soma_de_quantidade|Soma_de_volume|Soma_de_tarifas|Soma_de_margem| Ano|
+---------+-----------------+---+---------+------------------+--------------+---------------+--------------+----+
|205012677|              406| SP|MUNICIPAL|             38653|     725072.00|           null|          null|2022|
|204724337|              406| SP|MUNICIPAL|              8641|          null|           0.00|          null|2021|
|200371172|              406| SP|MUNICIPAL|              5256|       1590.00|          80.00|        239.00|2021|
|201334555|              406| SP|MUNICIPAL|              1385|        600.00|           null|          null|2019|
|106447149|              406| MT|MUNICIPAL|               242|       2275.00|         738.00|          null|2021|
|105106107|              406| PA|MUNICIPAL|               879|       1490.00|          9

In [179]:
spark_session_arrec.sql(query).count()

17687

In [180]:
df_consolid = spark_session_arrec.sql(query)

In [181]:
df_consolid.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Cod_Tipo_Cart_Rel: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Soma_de_quantidade: decimal(15,0) (nullable = true)
 |-- Soma_de_volume: decimal(15,2) (nullable = true)
 |-- Soma_de_tarifas: decimal(15,2) (nullable = true)
 |-- Soma_de_margem: decimal(15,2) (nullable = true)
 |-- Ano: string (nullable = true)



In [182]:
#Transformando valores nulos por zero para possibilitar a realização de cálculos matemáticos
df_consolid = df_consolid.na.fill(value=0)

In [183]:
df_consolid.show()

+---------+-----------------+---+---------+------------------+--------------+---------------+--------------+----+
|       ID|Cod_Tipo_Cart_Rel| UF|   Esfera|Soma_de_quantidade|Soma_de_volume|Soma_de_tarifas|Soma_de_margem| Ano|
+---------+-----------------+---+---------+------------------+--------------+---------------+--------------+----+
|205012677|              406| SP|MUNICIPAL|             38653|     725072.00|           0.00|          0.00|2022|
|204724337|              406| SP|MUNICIPAL|              8641|          0.00|           0.00|          0.00|2021|
|200371172|              406| SP|MUNICIPAL|              5256|       1590.00|          80.00|        239.00|2021|
|201334555|              406| SP|MUNICIPAL|              1385|        600.00|           0.00|          0.00|2019|
|106447149|              406| MT|MUNICIPAL|               242|       2275.00|         738.00|          0.00|2021|
|105106107|              406| PA|MUNICIPAL|               879|       1490.00|          9

In [184]:
#Criando a coluna tarifa dividindo a soma de tarifas pela soma de quantidade
df_consolid = df_consolid.withColumn("Tarifa",when(col("Soma_de_tarifas")==0,0).otherwise(col("Soma_de_tarifas")/col("Soma_de_quantidade")))

In [185]:
#Formatando coluna Tarifa para 2 casa decimais
df_consolid = df_consolid.withColumn("Tarifa",round(df_consolid.Tarifa.cast(DoubleType()),2))

In [188]:
df_consolid[['ID','Soma_de_tarifas', 'Soma_de_quantidade', 'Soma_de_volume', 'Soma_de_margem', 'Tarifa', 'Ano']].show()


+---------+---------------+------------------+--------------+--------------+------+----+
|       ID|Soma_de_tarifas|Soma_de_quantidade|Soma_de_volume|Soma_de_margem|Tarifa| Ano|
+---------+---------------+------------------+--------------+--------------+------+----+
|205012677|           0.00|             38653|     725072.00|          0.00|   0.0|2022|
|204724337|           0.00|              8641|          0.00|          0.00|   0.0|2021|
|200371172|          80.00|              5256|       1590.00|        239.00|  0.02|2021|
|201334555|           0.00|              1385|        600.00|          0.00|   0.0|2019|
|106447149|         738.00|               242|       2275.00|          0.00|  3.05|2021|
|105106107|          98.00|               879|       1490.00|          0.00|  0.11|2020|
|104487725|           0.00|              1590|       5084.00|          0.00|   0.0|2022|
|502884879|           0.00|              2116|        668.00|          0.00|   0.0|2023|
|105553637|        22

In [189]:
df_consolid.count()

17687

In [190]:
df_consolid.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Cod_Tipo_Cart_Rel: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Soma_de_quantidade: decimal(15,0) (nullable = true)
 |-- Soma_de_volume: decimal(15,2) (nullable = true)
 |-- Soma_de_tarifas: decimal(15,2) (nullable = true)
 |-- Soma_de_margem: decimal(15,2) (nullable = true)
 |-- Ano: string (nullable = true)
 |-- Tarifa: double (nullable = true)



## Excluindo clientes com tarifas negativas e tarifas zeradas em todo o período analisado 

In [191]:
#Vamos criar uma nova tabela temporária para fazer manipulações via sql

df_consolid.createOrReplaceTempView('dados_consolid_ano')

In [192]:
spark_session_arrec.sql("SELECT * FROM dados_consolid_ano").show()

+---------+-----------------+---+---------+------------------+--------------+---------------+--------------+----+------+
|       ID|Cod_Tipo_Cart_Rel| UF|   Esfera|Soma_de_quantidade|Soma_de_volume|Soma_de_tarifas|Soma_de_margem| Ano|Tarifa|
+---------+-----------------+---+---------+------------------+--------------+---------------+--------------+----+------+
|205012677|              406| SP|MUNICIPAL|             38653|     725072.00|           0.00|          0.00|2022|   0.0|
|204724337|              406| SP|MUNICIPAL|              8641|          0.00|           0.00|          0.00|2021|   0.0|
|200371172|              406| SP|MUNICIPAL|              5256|       1590.00|          80.00|        239.00|2021|  0.02|
|201334555|              406| SP|MUNICIPAL|              1385|        600.00|           0.00|          0.00|2019|   0.0|
|106447149|              406| MT|MUNICIPAL|               242|       2275.00|         738.00|          0.00|2021|  3.05|
|105106107|              406| PA

In [193]:
#Agregação por cliente (ID)
query="""
SELECT 
ID,
CAST(SUM(Soma_de_quantidade) AS NUMERIC(15)) AS Soma_de_quantidade, 
CAST(SUM(Soma_de_tarifas) AS DECIMAL(15,2)) AS Soma_de_tarifas
FROM dados_consolid_ano
GROUP BY ID

"""
spark_session_arrec.sql(query).show()

+---------+------------------+---------------+
|       ID|Soma_de_quantidade|Soma_de_tarifas|
+---------+------------------+---------------+
|205800274|             10012|         779.00|
|200155586|              4638|           0.00|
|518204891|              1716|         561.00|
|914200901|             98445|       77931.00|
|501388607|              6849|        2718.00|
|204396945|             50547|       19714.00|
|200250241|             18417|        4340.00|
|901119865|             36107|         978.00|
|104455819|               955|         255.00|
|906071860|             14388|           0.00|
|104099929|               604|         690.00|
|105187523|             23072|        4412.00|
|204579911|              5654|        1027.00|
|204795820|                93|           0.00|
|105551032|               104|         236.00|
|100991653|                14|           0.00|
|302951191|               629|         689.00|
|104481195|               309|           4.00|
|922621589|  

In [194]:
df_clientes_com_tfa = spark_session_arrec.sql(query)

In [195]:
df_clientes_com_tfa.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Soma_de_quantidade: decimal(15,0) (nullable = true)
 |-- Soma_de_tarifas: decimal(15,2) (nullable = true)



In [196]:
#Transformando valores nulos por zero para possibilitar a realização de cálculos matemáticos
df_clientes_com_tfa = df_clientes_com_tfa.na.fill(value=0)

In [197]:
#Criando a coluna tarifa dividindo a soma de tarifas pela soma de quantidade
df_clientes_com_tfa = df_clientes_com_tfa.withColumn("Tarifa",when(col("Soma_de_tarifas")==0,0).otherwise(col("Soma_de_tarifas")/col("Soma_de_quantidade")))

In [198]:
#Formatando coluna Tarifa para 2 casa decimais
df_clientes_com_tfa = df_clientes_com_tfa.withColumn("Tarifa",round(df_clientes_com_tfa.Tarifa.cast(DoubleType()),2))

In [199]:
df_clientes_com_tfa.show()

+---------+------------------+---------------+------+
|       ID|Soma_de_quantidade|Soma_de_tarifas|Tarifa|
+---------+------------------+---------------+------+
|205800274|             10012|         779.00|  0.08|
|200155586|              4638|           0.00|   0.0|
|518204891|              1716|         561.00|  0.33|
|914200901|             98445|       77931.00|  0.79|
|501388607|              6849|        2718.00|   0.4|
|204396945|             50547|       19714.00|  0.39|
|200250241|             18417|        4340.00|  0.24|
|901119865|             36107|         978.00|  0.03|
|104455819|               955|         255.00|  0.27|
|906071860|             14388|           0.00|   0.0|
|104099929|               604|         690.00|  1.14|
|105187523|             23072|        4412.00|  0.19|
|204579911|              5654|        1027.00|  0.18|
|204795820|                93|           0.00|   0.0|
|105551032|               104|         236.00|  2.27|
|100991653|                1

In [200]:
df_clientes_com_tfa.count()

4115

In [201]:
df_clientes_com_tfa.filter("Tarifa == 0").count()

710

In [202]:
df_clientes_com_tfa.filter("Tarifa < 0").count()

1

In [203]:
df_clientes_com_tfa.filter("Tarifa > 0").count()

3404

In [204]:
df_clientes_com_tfa = df_clientes_com_tfa.filter("Tarifa > 0")

In [205]:
df_clientes_com_tfa.count()

3404

In [206]:
df_clientes_com_tfa.show()

+---------+------------------+---------------+------+
|       ID|Soma_de_quantidade|Soma_de_tarifas|Tarifa|
+---------+------------------+---------------+------+
|205800274|             10012|         779.00|  0.08|
|518204891|              1716|         561.00|  0.33|
|914200901|             98445|       77931.00|  0.79|
|501388607|              6849|        2718.00|   0.4|
|204396945|             50547|       19714.00|  0.39|
|200250241|             18417|        4340.00|  0.24|
|901119865|             36107|         978.00|  0.03|
|104455819|               955|         255.00|  0.27|
|104099929|               604|         690.00|  1.14|
|105187523|             23072|        4412.00|  0.19|
|204579911|              5654|        1027.00|  0.18|
|105551032|               104|         236.00|  2.27|
|302951191|               629|         689.00|   1.1|
|104481195|               309|           4.00|  0.01|
|922621589|             84892|       19024.00|  0.22|
| 33199683|             6208

In [None]:
# Agora que descobrimos quais os clientes possuem tarifas vamos cruzar o dataset de clientes com tarifa com o 
#dataset consolidado para excluir os 710 registros com tarifa zero, deixando o dataset limpo com 3404 clientes.

In [207]:
df_clientes_com_tfa.createOrReplaceTempView('dados_clientes')

In [208]:
df_consolid.createOrReplaceTempView('dados_consolid')

In [209]:
df_consolid.columns

['ID',
 'Cod_Tipo_Cart_Rel',
 'UF',
 'Esfera',
 'Soma_de_quantidade',
 'Soma_de_volume',
 'Soma_de_tarifas',
 'Soma_de_margem',
 'Ano',
 'Tarifa']

In [210]:
#fazendo o inner_join
query="""
SELECT 
tb_clientes.ID,
tb_consolid.Cod_Tipo_Cart_Rel,
tb_consolid.UF,
tb_consolid.Esfera,
tb_consolid.Soma_de_quantidade,
tb_consolid.Soma_de_volume,
tb_consolid.Soma_de_tarifas,
tb_consolid.Soma_de_margem,
tb_consolid.Tarifa,
tb_consolid.Ano
FROM dados_consolid AS tb_consolid
INNER JOIN dados_clientes AS tb_clientes
ON tb_clientes.ID = tb_consolid.ID
"""

spark_session_arrec.sql(query).show()

+-----+-----------------+---+---------+------------------+--------------+---------------+--------------+------+----+
|   ID|Cod_Tipo_Cart_Rel| UF|   Esfera|Soma_de_quantidade|Soma_de_volume|Soma_de_tarifas|Soma_de_margem|Tarifa| Ano|
+-----+-----------------+---+---------+------------------+--------------+---------------+--------------+------+----+
| 1198|              407| PR|MUNICIPAL|              1341|         35.00|          21.00|          0.00|  0.02|2022|
| 1198|              407| PR|MUNICIPAL|              2573|         70.00|          42.00|          0.00|  0.02|2023|
| 1821|              406| PR|MUNICIPAL|              5235|          0.00|           0.00|          0.00|   0.0|2019|
| 1821|              406| PR|MUNICIPAL|             12143|      28178.00|           0.00|          0.00|   0.0|2020|
| 1821|              406| PR|MUNICIPAL|             13058|          0.00|           0.00|          0.00|   0.0|2021|
| 1821|              406| PR|MUNICIPAL|              2257|      

In [211]:
df_consolid = spark_session_arrec.sql(query)

In [212]:
df_consolid.count()

15216

In [213]:
df_consolid.filter("Tarifa == 0").count()

6294

In [214]:
df_consolid.filter("Tarifa < 0").count()

2

In [215]:
df_consolid.filter("Tarifa > 0").count()

8920

In [216]:
df_consolid = df_consolid.filter("Tarifa > 0")

In [217]:
df_consolid.count()

8920

In [218]:
df_consolid.show()

+-----+-----------------+---+---------+------------------+--------------+---------------+--------------+------+----+
|   ID|Cod_Tipo_Cart_Rel| UF|   Esfera|Soma_de_quantidade|Soma_de_volume|Soma_de_tarifas|Soma_de_margem|Tarifa| Ano|
+-----+-----------------+---+---------+------------------+--------------+---------------+--------------+------+----+
| 1198|              407| PR|MUNICIPAL|              1341|         35.00|          21.00|          0.00|  0.02|2022|
| 1198|              407| PR|MUNICIPAL|              2573|         70.00|          42.00|          0.00|  0.02|2023|
| 1821|              406| PR|MUNICIPAL|               242|          0.00|         455.00|          0.00|  1.88|2023|
| 7607|              406| PR|MUNICIPAL|              6169|      86780.00|        4871.00|          0.00|  0.79|2022|
|47893|              406| PR|MUNICIPAL|               190|       1880.00|         116.00|          0.00|  0.61|2020|
|47893|              406| PR|MUNICIPAL|               120|      

In [219]:
df_consolid.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Cod_Tipo_Cart_Rel: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Soma_de_quantidade: decimal(15,0) (nullable = true)
 |-- Soma_de_volume: decimal(15,2) (nullable = true)
 |-- Soma_de_tarifas: decimal(15,2) (nullable = true)
 |-- Soma_de_margem: decimal(15,2) (nullable = true)
 |-- Tarifa: double (nullable = true)
 |-- Ano: string (nullable = true)



In [220]:
df_consolid.createOrReplaceTempView('dados_consolid')

In [221]:
#Agregação por ID com tarifa média

query="""
SELECT 
ID,
ANY_VALUE(Cod_Tipo_Cart_Rel) AS Cod_Tipo_Cart_Rel,
ANY_VALUE(UF) AS UF,
ANY_VALUE(Esfera) Esfera,
CAST(AVG(Soma_de_quantidade) AS DECIMAL(15,2)) AS Media_de_quantidade, 
CAST(AVG(Soma_de_volume) AS DECIMAL(15,2)) AS Media_de_volume,
CAST(AVG(Soma_de_tarifas) AS DECIMAL(15,2)) AS Media_de_tarifas,
CAST(AVG(Soma_de_margem) AS DECIMAL(15,2)) AS Media_de_margem,
CAST(AVG(Tarifa) AS DECIMAL(15,2)) AS Tarifa_media
FROM dados_consolid
GROUP BY ID
ORDER BY (Tarifa_media) DESC
"""
spark_session_mc.sql(query).show()

+---------+-----------------+---+---------+-------------------+---------------+----------------+---------------+------------+
|       ID|Cod_Tipo_Cart_Rel| UF|   Esfera|Media_de_quantidade|Media_de_volume|Media_de_tarifas|Media_de_margem|Tarifa_media|
+---------+-----------------+---+---------+-------------------+---------------+----------------+---------------+------------+
|800809325|              406| MA|MUNICIPAL|             265.00|           0.00|        17757.00|           0.00|      370.91|
| 35086758|              403| PB| ESTADUAL|             285.00|         233.50|        21099.00|           0.00|       88.23|
|106229821|              403| MT| ESTADUAL|           42224.00|   120169566.50|      2084178.50|           0.00|       58.21|
|204823794|              406| SP|MUNICIPAL|             699.75|        1113.25|         1673.25|          22.50|       23.30|
|200964047|              400| SP| ESTADUAL|           46241.75|   263002381.00|       787680.75|      109422.00|      

In [222]:
df_consolid=spark_session_mc.sql(query)

In [223]:
df_consolid.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Cod_Tipo_Cart_Rel: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Esfera: string (nullable = true)
 |-- Media_de_quantidade: decimal(15,2) (nullable = true)
 |-- Media_de_volume: decimal(15,2) (nullable = true)
 |-- Media_de_tarifas: decimal(15,2) (nullable = true)
 |-- Media_de_margem: decimal(15,2) (nullable = true)
 |-- Tarifa_media: decimal(15,2) (nullable = true)



In [224]:
df_consolid.count()

3404

In [225]:
#Convertendo o dataframe do Spark em dataframe do Pandas para facilitar a análise exploratória.
df_pd_consolid = df_consolid.toPandas()

In [226]:
type(df_pd_consolid)

pandas.core.frame.DataFrame

In [227]:
df_pd_consolid.to_csv('df_pd_consolid_vf.csv', index=False)

# Fim