# ETL  - Tratamento dos dados

### Importando as Lib Utilizadas

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import DoubleType,IntegerType,StringType, DataType
import os
from unidecode import unidecode
import pyodbc
import pandas as pd

### Declarando Variaveis de Ambiente para Exporção em CSV no Pyspark

In [2]:
os.environ['SPARK_HOME']=r'C:\Users\leandro.silva\PROJETOS\BPC_By_Spark\spark-3.2.4-bin-hadoop2.7'
os.environ['JAVA_HOME']=r'C:\Program Files\Java\jre1.8.0_172'
os.environ['HADOOP_HOME']=r'C:\Users\leandro.silva\PROJETOS\BPC_By_Spark\spark-3.2.4-bin-hadoop2.7\hadoop'


### Declarando Variaveis para Conexão com Banco de dados 

In [3]:
server = 'UNITNB015\CONNECTION'
database = 'EMPRESA_HOMOLOGACAO'
user = 'sa'
password = '85356325Ll@'

In [4]:
conexao = pyodbc.connect\
            (f'DRIVER=ODBC Driver 17 for SQL Server;SERVER={server};DATABASE={database};UID={user};PWD={password}'
            )

### Criando conexão

In [5]:
cursor = conexao.cursor()
cursor.fast_executemany = True

### Criando Sessão do PySpark para Tratamento dos dados

In [6]:
spark = SparkSession\
            .builder\
            .master('local[*]')\
            .appName('Tratamento Base CNPJ')\
            .config('spark.ui.port','4051')\
            .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
            .getOrCreate()

spark

## Importando Tabelas empresas

In [7]:
arquivos = os.listdir('EMPRESAS/')

In [8]:
empresas = spark.read.csv(f'EMPRESAS/{arquivos[0]}', encoding='ISO-8859-1', sep=';')

In [9]:
for c,arq in enumerate(arquivos):
    file = spark.read.csv(f'EMPRESAS/{arq}', encoding='ISO-8859-1', sep=';')
    empresas = empresas.union(file)
    
    

### Deletando os Registros Duplicados

In [10]:
empresas = empresas.drop_duplicates()

### INSERINDO NOME DAS COLUNAS

| Campo |Descrição|
|-------|-----------|
|CNPJ BÁSICO | NÚMERO BASE DE INSCRIÇÃO NO CNPJ (OITO PRIMEIROS DÍGITOSDO CNPJ)
RAZÃO SOCIAL |NOME EMPRESARIAL NOME EMPRESARIAL DA PESSOA JURÍDICA
NATUREZA JURÍDICA | CÓDIGO DA NATUREZA JURÍDICA
QUALIFICAÇÃO DO RESPONSÁVEL |QUALIFICAÇÃO DA PESSOA FÍSICA RESPONSÁVEL PELA EMPRESA
CAPITAL SOCIAL DA EMPRESA | CAPITAL SOCIAL DA EMPRESA
PORTE DA EMPRESA CÓDIGO DO PORTE DA EMPRESA:|00 – NÃO INFORMADO - 01 - MICRO EMPRESA - 03 - EMPRESA DE PEQUENO PORTE 05 - DEMAIS
ENTE FEDERATIVO|RESPONSÁVEL O ENTE FEDERATIVO RESPONSÁVEL É PREENCHIDO PARA OS CASOS DE ÓRGÃOS E ENTIDADES DO GRUPO DE NATUREZA JURÍDICA 1XXX. PARA AS DEMAIS NATUREZAS, ESTE ATRIBUTO FICA EM BRANCO


In [11]:
empresas_colunas = ['CNPJ_BASICO','RAZAO_SOCIAL','NATUREZA_JURIDICA','QUALIFICACAO_DO_RESPONSAVEL',
                    'CAPITAL_SOCIAL_DA_EMPRESA','PORTE','ENTE_FEDERATIVO']

In [12]:
for x, y  in enumerate(empresas_colunas):
    empresas = empresas.withColumnRenamed(f'_c{x}',y)

## Alterar o Porte para descrição

In [13]:
portes = {'00': 'NAO INFORMADO','01' : 'MICRO EMPRESA', '03':'EMPRESA DE PEQUENO PORTE', '05':'DEMAIS'}

In [14]:
empresas = empresas.withColumn('PORTE',f.when(f.isnull(empresas['PORTE']),portes['00'])\
                    .when(empresas['PORTE']=='01', portes['01'])\
                    .when(empresas['PORTE']=='03', portes['03'])\
                    .when(empresas['PORTE']=='05',portes['05'])
                   )

In [15]:
empresas = empresas.withColumn\
            ('CAPITAL_SOCIAL_DA_EMPRESA',
                     f.regexp_replace('CAPITAL_SOCIAL_DA_EMPRESA',',','.')
            )

In [16]:
empresas = empresas.withColumn\
        ('CAPITAL_SOCIAL_DA_EMPRESA',
                     empresas['CAPITAL_SOCIAL_DA_EMPRESA'].cast(DoubleType())

            
        )

## Analisando Estabelecimento

|Campo | Descricao|
|-------|----------|
|CNPJ BASICO |NÚMERO BASE DE INSCRIÇÃO NO CNPJ (OITO PRIMEIROS DÍGITOS DO CNPJ)|
|CNPJ | ORDEM NÚMERO DO ESTABELECIMENTO DE INSCRIÇÃO NO CNPJ (DO NONO ATÉ O DÉCIMO SEGUNDO DÍGITO DO CNPJ)|
|CNPJ DV | DÍGITO VERIFICADOR DO NÚMERO DE INSCRIÇÃO NO CNPJ (DOIS ÚLTIMOS DÍGITOS DO CNPJ).
|IDENTIFICADOR MATRIZ/FILIAL |CÓDIGO DO IDENTIFICADOR MATRIZ/FILIAL: 1 – MATRIZ 2 – FILIAL|
|NOME FANTASIA |CORRESPONDE AO NOME FANTASIA
|SITUAÇÃO CADASTRAL| CÓDIGO DA SITUAÇÃO CADASTRAL: 01 – NULA 2 – ATIVA 3 – SUSPENSA 4 – INAPTA 08 – BAIXADA|
|DATA SITUAÇÃO CADASTRAL |DATA DO EVENTO DA SITUAÇÃO CADASTRAL|
|MOTIVO SITUAÇÃO CADASTRAL | CÓDIGO DO MOTIVO DA SITUAÇÃO CADASTRAL|
|NOME DA CIDADE NO EXTERIOR |NOME DA CIDADE NO EXTERIOR|
|PAIS | CÓDIGO DO PAIS |
|DATA DE INÍCIO | ATIVIDADE DATA DE INÍCIO DA ATIVIDADE|
|CNAE FISCAL | PRINCIPAL CÓDIGO DA ATIVIDADE ECONÔMICA PRINCIPAL DO ESTABELECIMENTO|
|CNAE FISCAL SECUNDÁRIA | CÓDIGO DA(S) ATIVIDADE(S) ECONÔMICA(S) SECUNDÁRIA(S) DO ESTABELECIMENTO|
|TIPO DE LOGRADOURO | DESCRIÇÃO DO TIPO DE LOGRADOURO|
|LOGRADOURO| NOME DO LOGRADOURO ONDE SE LOCALIZA O ESTABELECIMENTO.|
|NÚMERO| NÚMERO ONDE SE LOCALIZA O ESTABELECIMENTO. QUANDO NÃO HOUVER PREENCHIMENTO DO NÚMERO HAVERÁ ‘S/N’.
|COMPLEMENTO| COMPLEMENTO PARA O ENDEREÇO DE LOCALIZAÇÃO DO ESTABELECIMENTO|
|BAIRRO |BAIRRO ONDE SE LOCALIZA O ESTABELECIMENTO.|
|CEP | CÓDIGO DE ENDEREÇAMENTO POSTAL REFERENTE AO LOGRADOURO NO QUAL O ESTABELECIMENTO ESTA LOCALIZADO|
|UF| SIGLA DA UNIDADE DA FEDERAÇÃO EM QUE SE ENCONTRA O ESTABELECIMENTO|
|MUNICÍPIO | CÓDIGO DO MUNICÍPIO DE JURISDIÇÃO ONDE SE ENCONTRA O ESTABELECIMENTO|
|DDD 1| CONTÉM O DDD 1|
|TELEFONE 1 |CONTÉM O NÚMERO DO TELEFONE 1|
|DDD 2 |CONTÉM O DDD 2|
|TELEFONE 2 | CONTÉM O NÚMERO DO TELEFONE 2|
|DDD DO FAX | CONTÉM O DDD DO FAX|
|FAX| CONTÉM O NÚMERO DO FAX|
|CORREIO ELETRÔNICO |CONTÉM O E-MAIL DO CONTRIBUINTE|
|SITUAÇÃO ESPECIAL |SITUAÇÃO ESPECIAL DA EMPRESA|
|DATA DA SITUAÇÃO ESPECIAL| DATA EM QUE A EMPRESA ENTROU EM SITUAÇÃO ESPECIAL|


## Importar tabela Estabelecimento

In [17]:
arquivos = os.listdir('ESTABELECIMENTO/')

In [18]:
estabelecimento = spark.read.csv(f'ESTABELECIMENTO/{arquivos[0]}', sep=';', encoding='ISO-8859-1')

In [19]:
for c,arq in enumerate(arquivos):
    file = spark.read.csv(f'ESTABELECIMENTO/{arq}', encoding='ISO-8859-1', sep=';')
    estabelecimento = estabelecimento.union(file)
    

In [20]:
estabelecimento = estabelecimento.drop_duplicates()

In [21]:
estabele_coluna = ['CNPJ_BASICO','CNPJ','CNPJ_DV','IDENTIFICADOR_MATRIZ/FILIAL','NOME_FANTASIA','SITUACAO_CADASTRAL',
'DATA_SITUACAO_CADASTRAL','MOTIVO_SITUACAO_CADASTRAL','NOME_DA_CIDADE_NO_EXTERIOR','PAIS','DT_ABERTURA',
    'CNAE_FISCAL','CNAE_FISCAL_SECUNDARIA','TIPO_DE_LOGRADOURO','LOGRADOURO','NUMERO','COMPLEMENTO','BAIRRO',
'CEP','UF','MUNICIPIO','DDD_1','TELEFONE_1','DDD_2','TELEFONE_2','DDD DO_FAX','FAX','CORREIO_ELETRONICO',
'SITUACAO_ESPECIAL','DATA_DA_SITUACAO_ESPECIAL']

## Renomeando colunas

In [22]:
for x,y in enumerate(estabele_coluna):
    estabelecimento = estabelecimento\
            .withColumnRenamed(f'_c{x}',y)

## Criado coluna unificada do CNPJ

In [23]:
estabelecimento = estabelecimento.select('CNPJ_BASICO',f.concat('CNPJ_BASICO','CNPJ','CNPJ_DV').alias('CNPJ'),
                      'IDENTIFICADOR_MATRIZ/FILIAL',
                     'NOME_FANTASIA',
                     'SITUACAO_CADASTRAL',
                     'DATA_SITUACAO_CADASTRAL',
                     'MOTIVO_SITUACAO_CADASTRAL',
                     'NOME_DA_CIDADE_NO_EXTERIOR',
                     'PAIS',
                     'DT_ABERTURA',
                     'CNAE_FISCAL',
                     'CNAE_FISCAL_SECUNDARIA',
                     'TIPO_DE_LOGRADOURO',
                     'LOGRADOURO',
                     'NUMERO',
                     'COMPLEMENTO',
                     'BAIRRO',
                     'CEP',
                     'UF',
                     'MUNICIPIO',
                     'DDD_1',
                     'TELEFONE_1',
                     'DDD_2',
                     'TELEFONE_2',
                     'DDD DO_FAX',
                     'FAX',
                     'CORREIO_ELETRONICO',
                     'SITUACAO_ESPECIAL',
                     'DATA_DA_SITUACAO_ESPECIAL')

## Juntar base estabecimento com empresas

In [24]:
coluna_empresa_nova = ['CNPJ_BASICO',
'CNPJ',
'RAZAO_SOCIAL',
'NOME_FANTASIA',
'IDENTIFICADOR_MATRIZ/FILIAL',
'SITUACAO_CADASTRAL',
'NATUREZA_JURIDICA',
'QUALIFICACAO_DO_RESPONSAVEL',
'CAPITAL_SOCIAL_DA_EMPRESA',
'PORTE',
'DATA_SITUACAO_CADASTRAL',
'MOTIVO_SITUACAO_CADASTRAL',
'NOME_DA_CIDADE_NO_EXTERIOR',
'PAIS',
'DT_ABERTURA',
'CNAE_FISCAL',
'CNAE_FISCAL_SECUNDARIA',
'SITUACAO_ESPECIAL',
'DATA_DA_SITUACAO_ESPECIAL',
'ENTE_FEDERATIVO']

In [25]:
empresas_nova = empresas.join(estabelecimento,on='CNPJ_BASICO', how='inner')\
                .select([x for x in coluna_empresa_nova])



In [26]:
empresas_nova = empresas_nova.withColumn(
    'DT_ABERTURA',empresas_nova['DT_ABERTURA'].cast(StringType())
)

In [27]:
empresas_nova = empresas_nova.withColumn(
    'DT_ABERTURA', f.to_date(empresas_nova['DT_ABERTURA'],'yyyyMMdd')
)

In [28]:
empresas_nova = empresas_nova.withColumn(
    'DATA_DA_SITUACAO_ESPECIAL',empresas_nova['DATA_DA_SITUACAO_ESPECIAL'].cast(StringType())
)

In [29]:
empresas_nova = empresas_nova.withColumn(
    'DATA_DA_SITUACAO_ESPECIAL', f.to_date(empresas_nova['DATA_DA_SITUACAO_ESPECIAL'],'yyyyMMdd')
)

In [30]:
empresas_nova = empresas_nova.withColumn(
    'DATA_SITUACAO_CADASTRAL',empresas_nova['DATA_SITUACAO_CADASTRAL'].cast(StringType())
)

In [31]:
empresas_nova = empresas_nova.withColumn(
    'DATA_SITUACAO_CADASTRAL', f.to_date(empresas_nova['DATA_SITUACAO_CADASTRAL'],'yyyyMMdd')
)

In [32]:
empresas_nova = empresas_nova.withColumn('IDENTIFICADOR_MATRIZ/FILIAL',f.when(empresas_nova['IDENTIFICADOR_MATRIZ/FILIAL']=='1','M')\
                            .when(empresas_nova['IDENTIFICADOR_MATRIZ/FILIAL']=='2','F')
                        )

In [33]:
empresas_nova = empresas_nova.drop('CNPJ_BASICO')

In [34]:
empresas_nova =  empresas_nova.withColumn('RAZAO_SOCIAL',f.regexp_replace('RAZAO_SOCIAL',';',' '))\
                    .withColumn('NOME_FANTASIA',f.regexp_replace('NOME_FANTASIA',';',' '))

In [35]:
empresas_nova =  empresas_nova.withColumn('RAZAO_SOCIAL',f.regexp_replace('RAZAO_SOCIAL',',',' '))\
                    .withColumn('NOME_FANTASIA',f.regexp_replace('NOME_FANTASIA',',',' '))

In [50]:
empresas_cnae_secundario = empresas_nova.select('CNPJ','CNAE_FISCAL_SECUNDARIA')

In [37]:
empresas_nova = empresas_nova.withColumn(
    'CNAE_FISCAL_SECUNDARIA',f.substring('CNAE_FISCAL_SECUNDARIA',1,7)
)

In [None]:
empresas_nova.coalesce(1).write.csv(mode='overwrite',path='ETL/EMPRESAS_ETL_EMPRESA_ARQ_UNICO/', sep=',',header=True)

In [42]:
empresas_nova.unpersist(blocking=True)

DataFrame[CNPJ: string, RAZAO_SOCIAL: string, NOME_FANTASIA: string, IDENTIFICADOR_MATRIZ/FILIAL: string, SITUACAO_CADASTRAL: string, NATUREZA_JURIDICA: string, QUALIFICACAO_DO_RESPONSAVEL: string, CAPITAL_SOCIAL_DA_EMPRESA: double, PORTE: string, DATA_SITUACAO_CADASTRAL: date, MOTIVO_SITUACAO_CADASTRAL: string, NOME_DA_CIDADE_NO_EXTERIOR: string, PAIS: string, DT_ABERTURA: date, CNAE_FISCAL: string, CNAE_FISCAL_SECUNDARIA: string, SITUACAO_ESPECIAL: string, DATA_DA_SITUACAO_ESPECIAL: date, ENTE_FEDERATIVO: string]

In [None]:
arquivo = [arquivo for arquivo in os.listdir('ETL/EMPRESAS_ETL_EMPRESA_ARQ_UNICO') if '.csv' in arquivo and 'crc' not in arquivo][0]

In [None]:
arquivo

In [None]:
chunk_generator = pd.read_csv(f'ETL/EMPRESAS_ETL_EMPRESA_ARQ_UNICO/{arquivo}',sep=',', chunksize=12000, on_bad_lines='skip' )

In [None]:
query = '''INSERT INTO EMPRESA_HOMOLOGACAO..EMPRESAS(CNPJ
                                                    ,RAZAO_SOCIAL
                                                    ,NOME_FANTASIA
                                                    ,[IDENTIFICADOR_MATRIZ FILIAL]
                                                    ,SITUACAO_CADASTRAL
                                                    ,NATUREZA_JURIDICA
                                                    ,QUALIFICACAO_DO_RESPONSAVEL
                                                    ,CAPITAL_SOCIAL_DA_EMPRESA
                                                    ,PORTE
                                                    ,DATA_SITUACAO_CADASTRAL
                                                    ,MOTIVO_SITUACAO_CADASTRAL
                                                    ,NOME_DA_CIDADE_NO_EXTERIOR
                                                    ,PAIS
                                                    ,DT_ABERTURA
                                                    ,CNAE_FISCAL
                                                    ,CNAE_FISCAL_SECUNDARIA
                                                    ,SITUACAO_ESPECIAL
                                                    ,DATA_DA_SITUACAO_ESPECIAL
                                                    ,ENTE_FEDERATIVO
) 
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
'''

In [None]:
for chunk in chunk_generator:
    df = pd.DataFrame(chunk)
    df = df.fillna('')
    df = df.astype(str)
    df['CNPJ'] = df['CNPJ'].str.zfill(14)
    values  = [tuple(row) for _, row in df.iterrows()]
    cursor.executemany (query,values)
    cursor.commit()
    del df

### Criar tabela Empresa Cnae

In [43]:
empresas_cnae_top10000 = empresas_cnae_secundario.limit(100)

In [44]:
empresas_cnae_top10000.select('CNPJ','CNAE_FISCAL_SECUNDARIA')\
                    .limit(10)\
                    .show()
            

Py4JJavaError: An error occurred while calling o443.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 22.0 failed 1 times, most recent failure: Lost task 4.0 in stage 22.0 (TID 26) (UNITNB015.UnitOffice.com.br executor driver): java.io.FileNotFoundException: C:\Users\leandro.silva\AppData\Local\Temp\blockmgr-2a8bdaa9-55cb-45a9-b13b-1c1b4a775d14\0e\temp_local_d875de89-d83b-4125-b484-619c81faf669 (O sistema não pode encontrar o caminho especificado)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:133)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:152)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:291)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.<init>(UnsafeSorterSpillWriter.java:81)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:227)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.createWithExistingInMemorySorter(UnsafeExternalSorter.java:113)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:158)
	at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:248)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.agg_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2399)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2398)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2398)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1156)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2638)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2580)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2569)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: C:\Users\leandro.silva\AppData\Local\Temp\blockmgr-2a8bdaa9-55cb-45a9-b13b-1c1b4a775d14\0e\temp_local_d875de89-d83b-4125-b484-619c81faf669 (O sistema não pode encontrar o caminho especificado)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:133)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:152)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:291)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.<init>(UnsafeSorterSpillWriter.java:81)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:227)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.createWithExistingInMemorySorter(UnsafeExternalSorter.java:113)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:158)
	at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:248)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.agg_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


In [51]:
empresas_cnae_secundario.show(5)

Py4JJavaError: An error occurred while calling o478.showString.
: java.nio.file.NoSuchFileException: C:\Users\leandro.silva\AppData\Local\Temp\blockmgr-2a8bdaa9-55cb-45a9-b13b-1c1b4a775d14\32
	at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
	at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
	at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
	at sun.nio.fs.WindowsFileSystemProvider.createDirectory(Unknown Source)
	at java.nio.file.Files.createDirectory(Unknown Source)
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:96)
	at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:137)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1964)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1450)
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:370)
	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1385)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:150)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:144)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:144)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:95)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:74)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1525)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:102)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues(FileFormat.scala:131)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues$(FileFormat.scala:122)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:177)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:426)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:417)
	at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:504)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:497)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:237)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.UnionExec.$anonfun$doExecute$5(basicPhysicalOperators.scala:698)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:698)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:497)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:146)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:258)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:256)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:256)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:228)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:370)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:343)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)


## Criar tabela endereço

In [None]:
coluna_endereco_nova = ['CNPJ',
    'TIPO_DE_LOGRADOURO',
    'LOGRADOURO',
    'NUMERO',
    'COMPLEMENTO',
    'BAIRRO',
    'CEP',
    'UF',
    'MUNICIPIO']

In [None]:
coluna_telefones_nova = [
    'CNPJ',
    'DDD_1',
    'TELEFONE_1',
    'DDD_2',
    'TELEFONE_2',
    'DDD DO_FAX',
    'FAX'
]

In [None]:
coluna_email_nova = [
    'CNPJ',
    'CORREIO_ELETRÔNICO'
    
]

## Tratamento e criacao da base endereço Receita

In [None]:
endereco_nova = estabelecimento.select([x for x in coluna_endereco_nova])

In [None]:
endereco_nova.count()

## Deletar qualquer registros que não tenha CEP de 8 Digito

In [None]:
endereco_nova = endereco_nova.select('*')\
            .where(f.length('CEP')==8)

In [None]:
acentos = {
    'á': 'a', 'à': 'a', 'â': 'a', 'ã': 'a', 'ä': 'a',
    'é': 'e', 'è': 'e', 'ê': 'e', 'ë': 'e',
    'í': 'i', 'ì': 'i', 'î': 'i', 'ï': 'i',
    'ó': 'o', 'ò': 'o', 'ô': 'o', 'õ': 'o', 'ö': 'o',
    'ú': 'u', 'ù': 'u', 'û': 'u', 'ü': 'u',
    'ç': 'c', 'ñ': 'n'
}

In [None]:
import regex as re

In [None]:
acentos_pattern = "|".join(map(re.escape, acentos.keys()))

In [None]:
remover_acento_udf = f.udf(remover_acento,StringType())

In [None]:
endereco_nova.withColumn('LOGRADOURO', f.regexp_replace(f.col('LOGRADOURO'),acentos_pattern,lambda x: acentos[x.group(0)]))

In [None]:
endereco_nova.select('TIPO_DE_LOGRADOURO')\
            .groupBy('TIPO_DE_LOGRADOURO')\
            .agg(f.count('*').alias('qtd'))\
            .orderBy('qtd',ascending=False)\
            .show(100)

In [None]:
endereco_nova.select('CEP')\
            .drop_duplicates()\
            .count()

## Validar todos os CEPs e depois de ter todos os CEPs validados atualizar endereço e deletar  os CEPS Invalidos

In [None]:
ceps_validar = endereco_nova.select('CEP')\
                .drop_duplicates()

In [None]:
ceps_validar.show(5)

## Tratamento e criacao da base Telefone

In [None]:
telefone_nova = estabelecimento.select([x for x in coluna_telefones_nova])

In [None]:
telefone_nova.columns

In [None]:
telefone_nova.select('*')\
            .limit(10)\
            .show()

## Deletar numeros menor que 8 digitos Tel1

In [None]:
telefone_nova = telefone_nova.select('*')\
                        .where(f.length('TELEFONE_1') >= 8)

## Deletar numeros maior que 9 Digitos Tel1

In [None]:
telefone_nova = telefone_nova.select('*')\
                        .where(f.length('TELEFONE_1') <=9 )

## Deletar numeros menor que 8 digitos Tel2

In [None]:
telefone_nova.select('*')\
        .where(f.length('TELEFONE_2') < 8)\
        .count()

In [None]:
!pip install phonenumbers

In [None]:
import phonenumbers as phone

In [None]:
def valida_telefone(numero_testar):
    numero = phone.parse(f'+55{numero_testar}')
    valido = phone.is_valid_number(numero)
    possivel = phone.is_possible_number(numero)
    return valido

In [None]:
valida_telefone('79529488')==False

In [None]:
telefone_nova.withColumn(
    'TELEFONE_10',f.when(valida_telefone(telefone_nova['']) col=telefone_nova['TELEFONE_1'])
)\
.show()


In [None]:
telefone_nova.select('*')\
        .where(f.length('TELEFONE_2') < 8)\
        .show(10)

In [None]:
telefone_nova = telefone_nova\
                    .withColumn('TELEFONE_2',f.when(f.length('TELEFONE_2')< 8,None))\
                    .withColumn('DDD_2',f.when(f.length('TELEFONE_2')< 8,None))

In [None]:
telefone_nova = telefone_nova\
            .withColumn('TELEFONE_2',f.when(f.length('TELEFONE_2') > 9,None))\
            .withColumn('DDD_2',f.when(f.length('TELEFONE_2')> 9,None))

## Atribuir o numero 9 em celulares com 8 Digitos

In [None]:
telefone_nova\
        .select('*')\
        .where((f.substring('TELEFONE_1',1,1) >= 9) & (f.length('TELEFONE_1')==9))\
        .show(10)

In [None]:
telefone_nova\
        .withColumn\
            ('TELEFONE_1',f.when((f.substring('TELEFONE_1',1,1) >= 6) & (f.length('TELEFONE_1')==8)
                                 ,f.concat(f.lit('9'),'TELEFONE_1')))\
            .show(10)

In [None]:
telefone_nova = telefone_nova\
        .withColumn\
            ('TELEFONE_1',f.when((f.substring('TELEFONE_1',1,1) >= 6) & (f.length('TELEFONE_1')==8)
                                 ,f.concat(f.lit('9'),'TELEFONE_1')))\
        .withColumn\
               (
                   'TELEFONE_2',f.when((f.substring('TELEFONE_2',1,1) >= 6) & (f.length('TELEFONE_2')==8),
                               f.concat(f.lit('9'),'TELEFONE_2'))) 

In [None]:
telefone_nova.select('*')\
        .where(f.length('TELEFONE_2') >= 8)\
        .count()

In [None]:
telefone_nova.show(10)

## Tratamento base Email Receita

## Analistando Tabela de CNAEs

In [None]:
cnaes = spark.read.csv('Cnaes/',encoding='ISO-8859-1',sep=';')

In [None]:
cnaes = cnaes.withColumnRenamed('_c0','Codigo_cnae')\
            .withColumnRenamed('_c1','Descricao')

In [None]:
cnaes.show(n=5, truncate=False)

In [None]:
cnaes.count()

In [None]:
email_novos = estabelecimento\
                    .select([x for x in coluna_email_nova])

In [None]:
email_novos\
        .select('*')\
        .limit(10)\
        .toPandas()