<a href="https://colab.research.google.com/github/leticiafaria7/alura-courses/blob/main/apache-spark/manipulando_dados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 0. Configuração do ambiente

In [1]:
# instalar as dependências

# pyspark (não pode ter espaço antes e depois do ==)
!pip install pyspark==3.3.1

# findspark - torna o pyspark uma biblioteca possível de ser importada regularmente
!pip install -q findspark

# instalar um kit de desenvolvimento java (JDK) que vai permitir rodar código na
# linguagem Scala e nas máquinas virtuais Java (JVM), que é como o spark foi construído
# VERIFICAR SE O ARQUIVO EXISTE NO SITE
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz # baixar, através da ferramenta wget, os arquivos do Spark na máquina virtual do Google

Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark==3.3.1)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845494 sha256=c29a757243abebf1d39e3e0ec37345ce6c4e76ef8941f51328f0fddd37ad17cc
  Stored in directory: /root/.cache/pip/wheels/0f/f0/3d/517368b8ce80486e84f89f214e0a022554e4ee64969f46279b
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py

In [2]:
# descompactar o arquivo que fizemos download

!tar xf spark-3.3.4-bin-hadoop3.tgz

In [11]:
# bibliotecas

import findspark
from pyspark.sql import SparkSession
from google.colab import drive
import zipfile
import pandas as pd

from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [4]:
# usar o findspark para permitir a importação dos pacotes necessários para utilizar o PySpark
findspark.init()

# criar a seção Spark
spark = SparkSession.builder\
  .master('local[*]')\
    .appName('Iniciando com Spark')\
      .config('spark.ui.port', '4050')\
        .getOrCreate()

In [5]:
# montar o drive

drive.mount('/content/drive')

Mounted at /content/drive


# 1. Ler os dados

In [6]:
path_empresas = '/content/drive/MyDrive/5. Cursos/programming/alura-courses/apache-spark/dados/empresas'
empresas = spark.read.csv(path_empresas, sep = ';', inferSchema = True)

path_estabelecimentos = '/content/drive/MyDrive/5. Cursos/programming/alura-courses/apache-spark/dados/estabelecimentos'
estabelecimentos = spark.read.csv(path_estabelecimentos, sep = ';', inferSchema = True)

path_socios = '/content/drive/MyDrive/5. Cursos/programming/alura-courses/apache-spark/dados/socios'
socios = spark.read.csv(path_socios, sep = ';', inferSchema = True)

In [None]:
empresas.count()

4585679

# 2. Manipulando os dados

In [None]:
# operações básicas

empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [7]:
# renomear colunas

colunas_empresas = ['cnpj_basico', 'razao_social', 'natureza_juridica', 'qualificacao_responsavel', 'capital_social', 'porte', 'ente_federativo_responsavel']

colunas_estabelecimentos = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial',
                            'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral',
                            'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal',
                            'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento',
                            'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax',
                            'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

colunas_socios = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social',
                  'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade',
                  'pais', 'representante_legal', 'nome_do_representante',
                  'qualificacao_do_representante_legal', 'faixa_etaria']


for index, colName in enumerate(colunas_empresas):
  empresas = empresas.withColumnRenamed(f"_c{index}", colName)

for index, colName in enumerate(colunas_estabelecimentos):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

for index, colName in enumerate(colunas_socios):
  socios = socios.withColumnRenamed(f"_c{index}", colName)

In [9]:
empresas.columns

['cnpj_basico',
 'razao_social',
 'natureza_juridica',
 'qualificacao_responsavel',
 'capital_social',
 'porte',
 'ente_federativo_responsavel']

In [None]:
# socios

socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [None]:
# estabelecimentos

estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [None]:
# ver tipos das variáveis

empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_responsavel: integer (nullable = true)
 |-- capital_social: string (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [12]:
# alterar o tipo da coluna de capital social da empresa

empresas = empresas.withColumn('capital_social', f.regexp_replace('capital_social', ',', '.'))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social,natureza_juridica,qualificacao_responsavel,capital_social,porte,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [13]:
empresas = empresas.withColumn('capital_social', f.col('capital_social').cast(DoubleType()))
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_responsavel: integer (nullable = true)
 |-- capital_social: double (nullable = true)
 |-- porte: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



## Converter string para data

In [14]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


In [15]:
df.printSchema()

root
 |-- data: long (nullable = true)



In [16]:
df = df.withColumn('data', f.to_date(df.data.cast(StringType()), 'yyyyMMdd'))
df.printSchema()

root
 |-- data: date (nullable = true)



In [17]:
estabelecimentos = estabelecimentos\
.withColumn(
    'data_situacao_cadastral',
    f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
)\
.withColumn(
    'data_de_inicio_atividade',
    f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
)\
.withColumn(
    'data_da_situacao_especial',
    f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
)

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [18]:
estabelecimentos.limit(2).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,


In [19]:
sorted(socios.columns)

['cnpj_basico',
 'cnpj_ou_cpf_do_socio',
 'data_de_entrada_sociedade',
 'faixa_etaria',
 'identificador_de_socio',
 'nome_do_representante',
 'nome_do_socio_ou_razao_social',
 'pais',
 'qualificacao_do_representante_legal',
 'qualificacao_do_socio',
 'representante_legal']

In [20]:
socios = socios\
.withColumn(
    'data_de_entrada_sociedade',
    f.to_date(f.col('data_de_entrada_sociedade').cast(StringType()), 'yyyyMMdd')
)

socios.limit(2).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
