In [1]:
import os
os.environ["SPARK_HOME"] = "C:\\spark"
os.environ["HADOOP_HOME"] = "C:\\spark\\hadoop"

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [5]:
spark

In [6]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']

In [7]:
df = spark.createDataFrame(data, colNames)

In [8]:
df

DataFrame[Nome: string, Idade: string]

In [9]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [10]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


In [11]:
empresas_path = 'C:\\Users\\Selto\\OneDrive\\Documentos\\ProjetosGithub\\Spark\\files\\empresas'
empresas = spark.read.csv(empresas_path, sep=';', inferSchema=True)

In [12]:
estabelecimentos_path = 'C:\\Users\\Selto\\OneDrive\\Documentos\\ProjetosGithub\\Spark\\files\\estabelecimentos'
estabelecimentos = spark.read.csv(estabelecimentos_path, sep=';', inferSchema=True)

In [13]:
socios_path = 'C:\\Users\\Selto\\OneDrive\\Documentos\\ProjetosGithub\\Spark\\files\\socios'
socios = spark.read.csv(socios_path, sep=';', inferSchema=True)

In [15]:
empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [16]:
empresas_col_name = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
estabs_col_name = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
socios_col_name = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [17]:
# Create method to change column names
def change_column_name(cols_names, df):
    for index, col_name in enumerate(cols_names):
        df = df.withColumnRenamed(f"_c{index}", col_name)
    return df

In [18]:
#Change company column Names
estabelecimentos = change_column_name(estabs_col_name,estabelecimentos)
empresas = change_column_name(empresas_col_name,empresas)
socios = change_column_name(socios_col_name,socios)

In [19]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,11748,2,MARIO KATUMI HOSI,***504158**,49,19940530,,***000000**,,0,7
1,11748,2,ROBERTO YUKIO HOSI,***241578**,22,19940530,,***000000**,,0,7
2,13289,2,ANDREIA CRISTINA DELSIN,***787278**,65,20180615,,***000000**,,0,3
3,17389,2,MARCIA DO CANTO ARRUDA DAIER,***920408**,49,19940613,,***000000**,,0,7
4,19204,2,ALMIR CARLOS CAPELLINI,***299028**,49,19980908,,***000000**,,0,7


In [20]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [21]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [22]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [23]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [24]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str