## DESAFIO

1. Conversão do formato dos arquivos: Converta o arquivo CSV dados.csv, para um formato colunar de alta performance de leitura de sua escolha e justificar.

2. Realize a deduplicação dos dados após a conversão;

3. Remova os registros onde a coluna Instituição Financeira estiver vazia ou nula;

3. Defina o tipo correto para pelo menos 4 colunas.


#### Notas gerais
- Todas as operações devem ser realizadas utilizando Spark. 
 O serviço de execução fica a seu critério, podendo utilizar tanto serviços locais como serviços em cloud, desde que seja possível acessarmos para validação. 

In [None]:
#instalando spark no google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
#setando variaveis de ambiente no colab
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

from pyspark.sql import functions as F
from pyspark.sql.types import *

from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()
spark

In [None]:
#INSERIR ARQUIVO CSV REFERENTE AO DESAFIO
from google.colab import files 
  
  
uploaded = files.upload()

Saving Bacen - IfData - Ativos.csv to Bacen - IfData - Ativos.csv


In [None]:
arquivo = list(uploaded.items())
arquivo = arquivo[0][0]

In [None]:
resultado = spark.read.csv(arquivo, sep=';', header=True)

In [None]:
resultado.printSchema()

root
 |-- Instituição financeira: string (nullable = true)
 |-- Código: string (nullable = true)
 |-- Conglomerado: string (nullable = true)
 |-- Conglomerado Financeiro: string (nullable = true)
 |-- Conglomerado Prudencial: string (nullable = true)
 |-- TCB: string (nullable = true)
 |-- TC: string (nullable = true)
 |-- TI: string (nullable = true)
 |-- Cidade: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- Data: string (nullable = true)
 |-- Disponibilidades (a): string (nullable = true)
 |-- Aplicações Interfinanceiras de Liquidez (b): string (nullable = true)
 |-- TVM e Instrumentos Financeiros Derivativos (c): string (nullable = true)
 |-- Operações de Crédito: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- Arrendamento Mercantil: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- Outros

In [None]:
resultado.show(5)

+----------------------+--------+------------+-----------------------+-----------------------+----+----+----+---------+----+-------+--------------------+-------------------------------------------+----------------------------------------------+--------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+-----------------------------------------+-----------------------------+-----------------------+------------------------------------------------------------------------+----------------------------------------------+---------------------------+----+
|Instituição financeira|  Código|Conglomerado|Conglomerado Financeiro|Conglomerado Prudencial| TCB|  TC|  TI|   Cidade|  UF|   Data|Disponibilidades (a)|Aplicações Interfinanceiras de Liquidez (b)|TVM e Instrumentos Financeiros Derivativos (c)|Operações de Crédito|                _c15|                _c16|Arrendamento Mercantil|      

In [None]:
#ajustando nomenclatura de colunas
resultado = (resultado
        .withColumnRenamed("Instituição financeira","instituicao_financeira")
        .withColumnRenamed("Código","codigo")
        .withColumnRenamed("Conglomerado","conglomerado")
        .withColumnRenamed("Conglomerado Financeiro","conglomerado_financeiro")
        .withColumnRenamed("Conglomerado Prudencial","conglomerado_prudencial")
        .withColumnRenamed("TCB","tcb")
        .withColumnRenamed("TC","tc")
        .withColumnRenamed("TI","ti")
        .withColumnRenamed("Cidade","cidade")
        .withColumnRenamed("UF","uf")
        .withColumnRenamed("Data","data")
        .withColumnRenamed("Disponibilidades (a)","disponibilidade_a")
        .withColumnRenamed("Aplicações Interfinanceiras de Liquidez (b)","aplicacoes_interfinanceiras_liquidez_b")
        .withColumnRenamed("TVM e Instrumentos Financeiros Derivativos (c)","tvm_instrumentos_financeiros_derivativos_c")
        .withColumnRenamed("Operações de Crédito","operacoes_credito_d1")
        .withColumnRenamed("_c15","provisao_sobre_operacao_credito_d2")
        .withColumnRenamed("_c16","operacoes_credito_liquidas_provisao_d")
        .withColumnRenamed("Arrendamento Mercantil","arredondamento_mercantil_e1")
        .withColumnRenamed("_c18","imobilizado_arrendamento_e2")
        .withColumnRenamed("_c19","credores_antecipacao_valor_residual_e3")
        .withColumnRenamed("_c20","provisao_sobre_arrendamento_mercantil_para_cl_e4")
        .withColumnRenamed("_c21","arrendamento_mercantil_liquido_provisao_e")
        .withColumnRenamed("Outros Créditos - Líquido de Provisão (f)","outros_creditos_liquido_provisao_f")
        .withColumnRenamed("Outros Ativos Realizáveis (g)","outrs_ativos_realizaveis_g")
        .withColumnRenamed("Permanente Ajustado (h)","permanente_ajustado_h")
        .withColumnRenamed("Ativo Total Ajustado (i) = (a) + (b) + (c) + (d) + (e) + (f) + (g) + (h)","ativo_total_ajustado_i")
        .withColumnRenamed("Credores por Antecipação de Valor Residual (j)","credores_antecipacao_valor_residual_j")
        .withColumnRenamed("Ativo Total (k) = (i) - (j)","ativo_total_k_i-j")
        .withColumnRenamed("_c28","c28")
      
     )

In [None]:
resultado.printSchema()

root
 |-- instituicao_financeira: string (nullable = true)
 |-- codigo: string (nullable = true)
 |-- conglomerado: string (nullable = true)
 |-- conglomerado_financeiro: string (nullable = true)
 |-- conglomerado_prudencial: string (nullable = true)
 |-- tcb: string (nullable = true)
 |-- tc: string (nullable = true)
 |-- ti: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- data: string (nullable = true)
 |-- disponibilidade_a: string (nullable = true)
 |-- aplicacoes_interfinanceiras_liquidez_b: string (nullable = true)
 |-- tvm_instrumentos_financeiros_derivativos_c: string (nullable = true)
 |-- operacoes_credito_d1: string (nullable = true)
 |-- provisao_sobre_operacao_credito_d2: string (nullable = true)
 |-- operacoes_credito_liquidas_provisao_d: string (nullable = true)
 |-- arredondamento_mercantil_e1: string (nullable = true)
 |-- imobilizado_arrendamento_e2: string (nullable = true)
 |-- credores_antecipacao_valor_residual

#### 1- Conversão do formato dos arquivos: Converta o arquivo CSV dados.csv, para um formato colunar de alta performance de leitura de sua escolha e justificar.

In [None]:
#MONTAR DRIVE PARA SALVAR ARQUIVO PARQUET
from google.colab import drive

drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
#SALVANDO ARQUIVOS PARQUET
resultado.write.parquet('/content/gdrive/My Drive/resultado',mode='overwrite')

#### JUSTIFICATIVA - ARQUIVO PARQUET

- O tipo de arquivo escolhido foi o parquet. Pois ele é uma eficiente forma de armazenar dados para análises. Foi desenhado para interoperabilidade, ser eficiente em tamanho em disco, rápidas consultas.

In [None]:
#LEITURA DE ARQUIVO PARQUET

resultado = spark.read.parquet('/content/gdrive/My Drive/resultado')
resultado.show(15,False)

+----------------------------------------------------+--------+---------------+-----------------------+-----------------------+----+----+----+--------------+----+-------+-----------------+--------------------------------------+------------------------------------------+-------------------------+----------------------------------------+---------------------------------------------+-------------------------------------+--------------------------------+-----------------------------------------------+------------------------------------------------+----------------------------------------------+----------------------------------+--------------------------+---------------------+----------------------+-------------------------------------+-----------------+----+
|instituicao_financeira                              |codigo  |conglomerado   |conglomerado_financeiro|conglomerado_prudencial|tcb |tc  |ti  |cidade        |uf  |data   |disponibilidade_a|aplicacoes_interfinanceiras_liquidez_b|tvm_in

#### 2-Realize a deduplicação dos dados após a conversão;

In [None]:
resultado.count()

1534

In [None]:
#DEDUPLICAÇÃO DE DUPLICADOS
resultado = resultado.dropDuplicates()
resultado.count()

1531

#### 3- Remova os registros onde a coluna Instituição Financeira estiver vazia ou nula;

In [None]:
resultado.createOrReplaceTempView('df')

In [None]:
#REMOVENDO RESULTADOS COM COLUNA INSTITUIÇÃO FINANCEIRA VAZIA OU NULA
resultado = spark.sql("""
    SELECT *
    FROM df
    where 
        trim(instituicao_financeira) is not null and
        (instituicao_financeira) <> 'null' and
        trim(instituicao_financeira) <> ''
""")

In [None]:
resultado.count()

1528

#### 4- Defina o tipo correto para pelo menos 4 colunas.

In [None]:
resultado.printSchema()

root
 |-- instituicao_financeira: string (nullable = true)
 |-- codigo: string (nullable = true)
 |-- conglomerado: string (nullable = true)
 |-- conglomerado_financeiro: string (nullable = true)
 |-- conglomerado_prudencial: string (nullable = true)
 |-- tcb: string (nullable = true)
 |-- tc: string (nullable = true)
 |-- ti: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- data: string (nullable = true)
 |-- disponibilidade_a: string (nullable = true)
 |-- aplicacoes_interfinanceiras_liquidez_b: string (nullable = true)
 |-- tvm_instrumentos_financeiros_derivativos_c: string (nullable = true)
 |-- operacoes_credito_d1: string (nullable = true)
 |-- provisao_sobre_operacao_credito_d2: string (nullable = true)
 |-- operacoes_credito_liquidas_provisao_d: string (nullable = true)
 |-- arredondamento_mercantil_e1: string (nullable = true)
 |-- imobilizado_arrendamento_e2: string (nullable = true)
 |-- credores_antecipacao_valor_residual

In [None]:
#DEFININDO TIPO CORRETO DAS COLUNAS

resultado = (resultado
      .withColumn('tc',F.col('tc').cast(IntegerType()))
      .withColumn('ti',F.col('ti').cast(IntegerType()))
      .withColumn('disponibilidade_a',F.col('disponibilidade_a').cast(FloatType()))
      .withColumn('aplicacoes_interfinanceiras_liquidez_b',F.col('aplicacoes_interfinanceiras_liquidez_b').cast(FloatType()))
      .withColumn('tvm_instrumentos_financeiros_derivativos_c',F.col('tvm_instrumentos_financeiros_derivativos_c').cast(FloatType()))
      .withColumn('provisao_sobre_operacao_credito_d2',F.col('provisao_sobre_operacao_credito_d2').cast(FloatType()))
      .withColumn('operacoes_credito_liquidas_provisao_d',F.col('operacoes_credito_liquidas_provisao_d').cast(FloatType()))
      .withColumn('outros_creditos_liquido_provisao_f',F.col('outros_creditos_liquido_provisao_f').cast(FloatType()))
      .withColumn('outrs_ativos_realizaveis_g',F.col('outrs_ativos_realizaveis_g').cast(FloatType()))
      .withColumn('permanente_ajustado_h',F.col('permanente_ajustado_h').cast(FloatType()))
      .withColumn('ativo_total_ajustado_i',F.col('ativo_total_ajustado_i').cast(FloatType()))
      .withColumn('credores_antecipacao_valor_residual_j',F.col('credores_antecipacao_valor_residual_j').cast(FloatType()))     
      
     )     

In [None]:
resultado.printSchema()

root
 |-- instituicao_financeira: string (nullable = true)
 |-- codigo: string (nullable = true)
 |-- conglomerado: string (nullable = true)
 |-- conglomerado_financeiro: string (nullable = true)
 |-- conglomerado_prudencial: string (nullable = true)
 |-- tcb: string (nullable = true)
 |-- tc: integer (nullable = true)
 |-- ti: integer (nullable = true)
 |-- cidade: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- data: string (nullable = true)
 |-- disponibilidade_a: float (nullable = true)
 |-- aplicacoes_interfinanceiras_liquidez_b: float (nullable = true)
 |-- tvm_instrumentos_financeiros_derivativos_c: float (nullable = true)
 |-- operacoes_credito_d1: string (nullable = true)
 |-- provisao_sobre_operacao_credito_d2: float (nullable = true)
 |-- operacoes_credito_liquidas_provisao_d: float (nullable = true)
 |-- arredondamento_mercantil_e1: string (nullable = true)
 |-- imobilizado_arrendamento_e2: string (nullable = true)
 |-- credores_antecipacao_valor_residual_e3