In [1]:
pip install cassandra-driver

Note: you may need to restart the kernel to use updated packages.


# Importando Bibliotecas

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark.sql.window import Window
import pandas as pd

In [3]:
from cassandra.cluster import Cluster
from cassandra.connection import EndPoint
cluster = Cluster(['34.151.220.189'])
                               

session = cluster.connect('irregularidades')

query1 = "SELECT * FROM irreg_2014_2021"

saida = session.execute(query1)
dados = []
for i in saida:
    dados.append(i)
 
df_irreg = pd.DataFrame(dados)
cluster.shutdown()
df_irreg.drop(columns=['chave'],inplace=True)
df_irreg

Unnamed: 0,ano,categoria,instituicao_financeira,irregularidade,periodo,qtd_reclamacoes_nao_reguladas,qtd_reclamacoes_reguladas_outras,qtd_reclamacoes_reguladas_procedentes,qtd_total_reclamacoes,tipo
0,2019,Menos de quatro milhões de clientes,BANCO CATERPILLAR S.A.,Irregularidades relacionadas ao fornecimento d...,4º trimestre,0,1,0,1,Banco/financeira
1,2019,Menos de quatro milhões de clientes,BANCO DA AMAZONIA S.A.,Restrições cadastrais,4º trimestre,1,0,0,1,Banco/financeira
2,2021,Grupo Secundário,INDUSTRIAL DO BRASIL (conglomerado),"Irregularidades relativas a integridade, confi...",1º trimestre,0,5,7,12,Conglomerado
3,2014,Mais de dois milhões de clientes,BANRISUL (conglomerado),Saque ou depósito divergente em terminais de a...,2º semestre,0,1,0,1,Conglomerado
4,2016,Mais de quatro milhões de clientes,BANRISUL (conglomerado),Insatisfação com a resposta recebida da instit...,2º semestre,0,22,3,25,Conglomerado
...,...,...,...,...,...,...,...,...,...,...
52113,2018,Mais de quatro milhões de clientes,BRADESCO (conglomerado),Utilização incorreta da taxa de desconto no cá...,2º trimestre,0,1,0,1,Conglomerado
52114,2021,Demais bancos e financeiras,GENIAL (conglomerado),"Irregularidades relativas a integridade, confi...",3º trimestre,0,0,2,2,Conglomerado
52115,2021,Grupo Secundário,"PEFISA S.A. - CRÉDITO, FINANCIAMENTO E INVESTI...","Irregularidades relativas a integridade, confi...",1º trimestre,0,8,11,19,Banco/financeira
52116,2015,Menos de dois milhões de clientes,ALFA (conglomerado),Descumprimento contratual,2º semestre,2,0,0,2,Conglomerado


# Configurando a Spark Session

In [4]:
spark = (SparkSession
        .builder
        .master('local')
        .appName('Irregularidades_Pyspark')
        .config('spark.ui.port', '4050')
        .getOrCreate()
        )

In [5]:
df = spark.createDataFrame(df_irreg)
df.show(5,truncate=False)

22/01/24 21:47:15 WARN org.apache.spark.scheduler.TaskSetManager: Stage 0 contains a task of very large size (2333 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 1) / 1]

+----+-----------------------------------+-----------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------+--------------------------------+-------------------------------------+---------------------+----------------+
|ano |categoria                          |instituicao_financeira             |irregularidade                                                                                                                                                 |periodo     |qtd_reclamacoes_nao_reguladas|qtd_reclamacoes_reguladas_outras|qtd_reclamacoes_reguladas_procedentes|qtd_total_reclamacoes|tipo            |
+----+-----------------------------------+-----------------------------------+------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [6]:
df.printSchema()

root
 |-- ano: long (nullable = true)
 |-- categoria: string (nullable = true)
 |-- instituicao_financeira: string (nullable = true)
 |-- irregularidade: string (nullable = true)
 |-- periodo: string (nullable = true)
 |-- qtd_reclamacoes_nao_reguladas: long (nullable = true)
 |-- qtd_reclamacoes_reguladas_outras: long (nullable = true)
 |-- qtd_reclamacoes_reguladas_procedentes: long (nullable = true)
 |-- qtd_total_reclamacoes: long (nullable = true)
 |-- tipo: string (nullable = true)



In [7]:
#Vizualização do número de vezes que cada categoria de irregularidades aparece,
# ordenada de forma decrescente
df.groupBy('categoria').count().sort("count", ascending=False).show()

22/01/24 21:47:18 WARN org.apache.spark.scheduler.TaskSetManager: Stage 1 contains a task of very large size (2333 KiB). The maximum recommended task size is 1000 KiB.

+--------------------+-----+
|           categoria|count|
+--------------------+-----+
|Menos de quatro m...|20088|
|Mais de quatro mi...|13209|
|Menos de dois mil...| 5615|
|Mais de dois milh...| 3688|
|Demais bancos e f...| 3293|
|    Grupo Secundário| 3117|
|Top 10 - Bancos e...| 1633|
|              Top 10| 1475|
+--------------------+-----+



                                                                                

In [8]:
#Numero de irregularidades (total) de cada tipo "Conglomerado" e "Banco/Financeira"
df.groupBy(F.col('tipo')).count().sort("count", ascending=False).show()

22/01/24 21:47:22 WARN org.apache.spark.scheduler.TaskSetManager: Stage 4 contains a task of very large size (2333 KiB). The maximum recommended task size is 1000 KiB.


+----------------+-----+
|            tipo|count|
+----------------+-----+
|    Conglomerado|41582|
|Banco/financeira|10536|
+----------------+-----+



In [9]:
#Exibindo "Instituição Financeira", "Ano", "Período" e a quantidade de cada tipo de irregularidade
#com exibição particionada pelo nome da Instituição Financeira e ordenada por ano

w0 = Window.partitionBy(F.col('instituicao_financeira')).orderBy('ano')
(df.withColumn("row_number", F.row_number().over(w0)).select(F.col("instituicao_financeira"),
                                                              F.col("ano"),
                                                              F.col("periodo"), 
                                                              F.col("qtd_reclamacoes_reguladas_procedentes"),
                                                              F.col("qtd_reclamacoes_reguladas_outras"),
                                                              F.col("qtd_reclamacoes_nao_reguladas"),
                                                              F.col("qtd_total_reclamacoes"))).show()


+----------------------+----+------------+-------------------------------------+--------------------------------+-----------------------------+---------------------+
|instituicao_financeira| ano|     periodo|qtd_reclamacoes_reguladas_procedentes|qtd_reclamacoes_reguladas_outras|qtd_reclamacoes_nao_reguladas|qtd_total_reclamacoes|
+----------------------+----+------------+-------------------------------------+--------------------------------+-----------------------------+---------------------+
|  BANCO CATERPILLAR...|2019|4º trimestre|                                    0|                               1|                            0|                    1|
|  BANCO DA AMAZONIA...|2019|4º trimestre|                                    0|                               0|                            1|                    1|
|  INDUSTRIAL DO BRA...|2021|1º trimestre|                                    7|                               5|                            0|                   12|
|  B

22/01/24 21:47:23 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (2333 KiB). The maximum recommended task size is 1000 KiB.


In [10]:
#Filtro por Conglomerados, Bancos e/ou Financeiras que começam com a letra B
df.filter("instituicao_financeira like 'B%' ").show()

+----+--------------------+----------------------+--------------------+------------+-----------------------------+--------------------------------+-------------------------------------+---------------------+----------------+
| ano|           categoria|instituicao_financeira|      irregularidade|     periodo|qtd_reclamacoes_nao_reguladas|qtd_reclamacoes_reguladas_outras|qtd_reclamacoes_reguladas_procedentes|qtd_total_reclamacoes|            tipo|
+----+--------------------+----------------------+--------------------+------------+-----------------------------+--------------------------------+-------------------------------------+---------------------+----------------+
|2019|Menos de quatro m...|  BANCO CATERPILLAR...|Irregularidades r...|4º trimestre|                            0|                               1|                                    0|                    1|Banco/financeira|
|2019|Menos de quatro m...|  BANCO DA AMAZONIA...|Restrições cadast...|4º trimestre|                

22/01/24 21:47:23 WARN org.apache.spark.scheduler.TaskSetManager: Stage 8 contains a task of very large size (2333 KiB). The maximum recommended task size is 1000 KiB.


In [11]:
#Número de irregularidades de cada tipo, ordenadas de maneira decrescente
df.groupBy(F.col("irregularidade")).count().sort("count", ascending=False).show(20)

22/01/24 21:47:24 WARN org.apache.spark.scheduler.TaskSetManager: Stage 9 contains a task of very large size (2333 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+-----+
|      irregularidade|count|
+--------------------+-----+
|Renegociação de d...| 1312|
|Insatisfação com ...| 1239|
|Reclamações relac...| 1190|
|Insatisfação com ...| 1117|
|Restrições cadast...| 1110|
|Irregularidades r...| 1101|
|Recusa de produto...| 1031|
|  Cobranças diversas|  982|
|Insatisfação com ...|  793|
|Descumprimento de...|  782|
|Irregularidades r...|  773|
|Renegociação de d...|  770|
|Cancelamento de c...|  756|
|Outros assuntos n...|  714|
|Oferta ou prestaç...|  687|
|Restrição à reali...|  686|
|Irregularidades r...|  685|
|Concessão de créd...|  624|
|Dificuldade na li...|  614|
|               Golpe|  610|
+--------------------+-----+
only showing top 20 rows



In [12]:
#Exibindo reclamações sobre "Renegociação de dívida (exceto cartão de crédito)", 
#referentes ao 2º semestre de 2014.

df.where((F.col("periodo") == "2º semestre") & (F.col("irregularidade") == "Renegociação de dívida (exceto cartão de crédito)")).show()

+----+--------------------+----------------------+--------------------+-----------+-----------------------------+--------------------------------+-------------------------------------+---------------------+----------------+
| ano|           categoria|instituicao_financeira|      irregularidade|    periodo|qtd_reclamacoes_nao_reguladas|qtd_reclamacoes_reguladas_outras|qtd_reclamacoes_reguladas_procedentes|qtd_total_reclamacoes|            tipo|
+----+--------------------+----------------------+--------------------+-----------+-----------------------------+--------------------------------+-------------------------------------+---------------------+----------------+
|2016|Menos de quatro m...|  MERCEDES-BENZ (co...|Renegociação de d...|2º semestre|                            2|                               0|                                    0|                    2|    Conglomerado|
|2016|Mais de quatro mi...|     BB (conglomerado)|Renegociação de d...|2º semestre|                     

22/01/24 21:47:25 WARN org.apache.spark.scheduler.TaskSetManager: Stage 12 contains a task of very large size (2333 KiB). The maximum recommended task size is 1000 KiB.


# Salvando o arquivo (em formato parquet)

In [15]:
(df.write.format("parquet")
.option("header", "true")
.option("inferschema", "true")
.save('gs://deloitte_g10/saída/Pyspark/BC-Irregularidades/Irregularidades_2014_2021.parquet')
)

22/01/24 21:49:44 WARN org.apache.spark.scheduler.TaskSetManager: Stage 14 contains a task of very large size (2333 KiB). The maximum recommended task size is 1000 KiB.
                                                                                