In [2]:
import os

os.environ["SPARK_HOME"] = "C:\spark\spark-3.2.1-bin-hadoop3.2"

In [3]:
# os.unsetenv('PYSPARK_SUBMIT_ARGS')

In [4]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]') .appName("Initializing with Spark") .config('spark.ui.port','4050') .getOrCreate()

In [7]:
get_ipython().system_raw('./ngrok authtoken 25CLaFPgFRxdL2hz6CKqEdtICCd_71r2mEekY6Grz7HdxgRZh')
get_ipython().system_raw('./ngrok http 4050 &')

In [8]:
!curl -s http://localhost:4040/api/tunnels

In [9]:
spark

## Dataframes

In [10]:
data = [('Neil', '38'), ('Buzz', '39')] # A list with tuples
colNames = ['Name', 'Age']

df = spark.createDataFrame(data, colNames)
df

DataFrame[Name: string, Age: string]

In [11]:
df.show()

+----+---+
|Name|Age|
+----+---+
|Neil| 38|
|Buzz| 39|
+----+---+



If we want to visualize it in the Pandas dataframe style, there's a method for that:

In [12]:
df.toPandas()

Unnamed: 0,Name,Age
0,Neil,38
1,Buzz,39


## Loading the Data

In [15]:
pathC = 'data\companies\part-00000-58983ad4-8444-4405-aec6-9cd3e5413d1b-c000.csv'
pathE = 'data\establishments\part-00000-701c2cc9-d9db-469f-be12-341c24a77308-c000.csv'
pathP = 'data\partners\part-00000-1ab6459b-5a3c-4294-83b8-316c171f0202-c000.csv'

companies = spark.read.csv(pathC, sep=';', inferSchema=True)
establishments = spark.read.csv(pathE, sep=';', inferSchema=True)
partners = spark.read.csv(pathP, sep=';', inferSchema=True)

In [17]:
print(companies.count())
print(establishments.count())
print(partners.count())

458549
484305
204373


## Manipulating the Data

### Basic Operations

In [19]:
companies.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


The column names are quite bad. We should rename them.

### Renaming columns

In [30]:
companiesColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

for index, colName in enumerate(companiesColNames):
    companies = companies.withColumnRenamed(f'_c{index}', colName)

companies.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [31]:
establishmentsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

for index, colName in enumerate(establishmentsColNames):
    establishments = establishments.withColumnRenamed(f'_c{index}', colName)

establishments.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [34]:
partnersColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

for index, colName in enumerate(partnersColNames):
    partners = partners.withColumnRenamed(f'_c{index}', colName)

partners.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7
