In [19]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
from pyspark.rdd import RDD
import pandas as pd
import pyspark.sql.functions as F

In [20]:
spark = SparkSession.builder.appName('AnaliseCnaes').getOrCreate()

In [3]:
endereco_base = "dados_nao_tratados/"

In [21]:
custom_schema = StructType() \
            .add("cnpj_basico",IntegerType(),True)\
            .add("cnpj_ordem",IntegerType(),True)\
            .add("cnpj_dv",IntegerType(),True)\
            .add("identificador_matriz_filial",StringType(),True) \
            .add("nome_fanatsia",StringType(),True)\
            .add("situacao_cadastral",IntegerType(),True)\
            .add("data_situacao_cadastral",StringType(),True)\
            .add("motivo_situacao_cadastral",StringType(),True)\
            .add("nome_cidade_exterior",StringType(),True)\
            .add("pais",StringType(),True)\
            .add("data_inicio_atividade",StringType(),True)\
            .add("cnae_fiscal_principal",StringType(),True)\
            .add("cnae_fiscal_secundaria",StringType(),True)\
            .add("tipo_logradouro",StringType(),True)\
            .add("logradouro",StringType(),True)\
            .add("numero",StringType(),True)\
            .add("complemento",StringType(),True)\
            .add("bairro",StringType(),True)\
            .add("cep",StringType(),True)\
            .add("uf",StringType(),True)\
            .add("municipio",IntegerType(),True)\
            .add("ddd_1",StringType(),True)\
            .add("telefone_1",StringType(),True)\
            .add("ddd_2",StringType(),True)\
            .add("telefone_2",StringType(),True)\
            .add("ddd_fax",StringType(),True)\
            .add("numero_fax",StringType(),True)\
            .add("correio_eletronico",StringType(),True)\
            .add("situacao_especial",StringType(),True)\
            .add("data_situacao_especial",StringType(),True)
            

In [22]:
dataframe = spark.read.options(header=False,delimiter=";",inferSchema=True)\
    .format("csv").schema(custom_schema).load(endereco_base)

In [6]:
lista_cnaes = ['4110700','4619200','6619302','8211300','7319002']

dataframe = dataframe.filter(dataframe.municipio == 9701)\
    .filter(dataframe.situacao_cadastral == 2)
    
dataframe = dataframe.filter((F.col('cnae_fiscal_principal').isin(lista_cnaes))|\
    (dataframe.cnae_fiscal_secundaria.contains(lista_cnaes[0]))|\
    (dataframe.cnae_fiscal_secundaria.contains(lista_cnaes[1]))|\
    (dataframe.cnae_fiscal_secundaria.contains(lista_cnaes[2]))|\
    (dataframe.cnae_fiscal_secundaria.contains(lista_cnaes[3]))|\
    (dataframe.cnae_fiscal_secundaria.contains(lista_cnaes[4])))


In [23]:
dataframe = dataframe.filter(dataframe.municipio == 9701)\
    .filter(dataframe.situacao_cadastral == 2)
    
dataframe = dataframe.filter((dataframe.cnae_fiscal_principal.contains('474'))|\
    (dataframe.cnae_fiscal_secundaria.contains('474')))


In [24]:
colunas = ["cnpj_basico","cnpj_ordem","cnpj_dv",\
    "identificador_matriz_filial","nome_fanatsia",\
        "situacao_cadastral","cnae_fiscal_principal",\
            "cnae_fiscal_secundaria","tipo_logradouro",\
                "logradouro","numero","complemento","bairro",\
                    "cep","uf","municipio","ddd_1","telefone_1",\
                        "ddd_2","telefone_2","correio_eletronico"]
dataframe = dataframe.select(*colunas)

In [27]:
#dataframe.write.format("com.databricks.spark.csv").option("header", "true").mode('overwrite').save('output.csv')
dataframe.toPandas().to_csv('output.csv')  

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:458)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)
                                                                                

In [49]:
#dataframe.select('cnae_fiscal_principal').distinct().collect()

In [28]:
df_pandas = pd.read_csv('output.csv')

In [32]:
df_pandas.columns

Index(['Unnamed: 0', 'cnpj_basico', 'cnpj_ordem', 'cnpj_dv',
       'identificador_matriz_filial', 'nome_fanatsia', 'situacao_cadastral',
       'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_logradouro',
       'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf',
       'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2',
       'correio_eletronico'],
      dtype='object')

In [33]:
df_pandas["municipio"] = "Brasilia"

In [30]:
df_pandas.municipio.unique()

array([9701])

In [29]:
df_pandas

Unnamed: 0.1,Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fanatsia,situacao_cadastral,cnae_fiscal_principal,cnae_fiscal_secundaria,tipo_logradouro,...,complemento,bairro,cep,uf,municipio,ddd_1,telefone_1,ddd_2,telefone_2,correio_eletronico
0,0,5565144,1,9,1,IMPACTO LOCACAO & TERRAPLANAGEM,2,7732201,77322024744099,OUTROS,...,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX,TAGUATINGA,72025660,DF,9701,61.0,33564654,61.0,33524244.0,
1,1,2232149,1,21,1,,2,4742300,3313999,QUADRA,...,,SETOR ECONOMICO DE SOBRADINHO (SOBRADINH,73020407,DF,9701,61.0,33878126,,,ELOBOMBAS@HOTMAIL.COM
2,2,623926,1,33,1,MUNDO DAS MOTOS,2,4541206,453070545439004744001474409947521004753900,QUADRA,...,,MORRO AZUL (SAO SEBASTIAO),71691242,DF,9701,61.0,30332020,,,
3,3,3975253,1,79,1,SIMOES FILHO MATERIAIS DE CONSTRUCAO,2,4744005,,AVENIDA,...,,VILA DVO,70310500,DF,9701,61.0,3939131,,,
4,4,2291355,1,2,1,POLI TINTAS,2,4741500,,,...,LOJA 57,ASA NORTE,70750734,DF,9701,61.0,2720066,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
18253,18253,41560888,1,2,1,ELETRICA RG,2,4742300,47440034744099,QUADRA,...,LOJA 01,ARAPOANGA (PLANALTINA),73368432,DF,9701,61.0,99601972,,,SUPERMARIO.GUEDES@YAHOO.COM
18254,18254,32829177,1,91,1,CASA CONSTRUCAO,2,4744099,4742300474400447440034741500,QUADRA,...,LOTE 01;LOJA,SAMAMBAIA SUL (SAMAMBAIA),72309702,DF,9701,61.0,85615893,,,PRIMELEGALIZAR@GMAIL.COM
18255,18255,37453270,1,68,1,W7 INCORP,2,4120400,"4211101,4213800,4313400,4321500,4322301,433040...",QUADRA,...,SALA 205,SAMAMBAIA NORTE (SAMAMBAIA),72319527,DF,9701,61.0,82727166,,,WIN7SERVICOS@GMAIL.COM
18256,18256,41613026,1,92,1,JR NACIONAIS E IMPORTADOS,2,4763603,"4789001,4752100,4781400,4751201,4763604,475989...",RUA,...,APT 104,SETOR HABITACIONAL VICENTE PIRES,72006055,DF,9701,61.0,99319658,,,JHENNEFERRAMOS2@GMAIL.COM
