In [None]:
# !pip install -r requirements.txt

In [None]:
from pyspark.sql import SparkSession
import zipfile
import wget
import os
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
spark = SparkSession.builder\
    .master('local[*]')\
    .appName('Baby steps in Spark')\
    .getOrCreate()

## Fonte dos dados
https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj

In [None]:
wget.download('https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip','.')
wget.download('https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip','.')
wget.download('https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip','.')

In [None]:
zipfile.ZipFile('empresas.zip').extractall('.')
zipfile.ZipFile('estabelecimentos.zip').extractall('.')
zipfile.ZipFile('socios.zip').extractall('.')

In [None]:
os.remove('./empresas.zip')
os.remove('./estabelecimentos.zip')
os.remove('./socios.zip')

EMPRESAS
|Campo|  Descrição |
|---|---|
|  cnpj  | NÚMERO BASE DE INSCRIÇÃO NO CNPJ (OITO PRIMEIROS DÍGITOS DO CNPJ). |
| raz_soc  | NOME EMPRESARIAL DA PESSOA JURÍDICA |
|  natur_jur | CÓDIGO DA NATUREZA JURÍDICA  |
|  qualif_resp | QUALIFICAÇÃO DA PESSOA FÍSICA RESPONSÁVEL PELA EMPRESA |
|  capital | CAPITAL SOCIAL DA EMPRESA |
| porte   | CÓDIGO DO PORTE DA EMPRESA: 00 – NÃO INFORMADO 01 - MICRO EMPRESA 03 - EMPRESA DE PEQUENO PORTE 05 - DEMAIS |
| ent_fed   |  O ENTE FEDERATIVO RESPONSÁVEL É PREENCHIDO PARA OS CASOS DE ÓRGÃOS E ENTIDADES DO GRUPO DE NATUREZA JURÍDICA 1XXX. PARA AS DEMAIS NATUREZAS, ESTE ATRIBUTO FICA EM BRANCO. |

In [None]:
empresas = spark.read.csv('empresas',sep=';',inferSchema=True)

In [None]:
empres_col = ['cnpj','raz_soc','natur_jur','qualif_resp','capital','porte','ent_fed']

In [None]:
for i,coluna in enumerate(empres_col):
    empresas = empresas.withColumnRenamed(f'_c{i}',coluna)

In [None]:
empresas.limit(5).toPandas()

In [None]:
empresas.count()

In [None]:
empresas.printSchema()

In [None]:
empresas = empresas.withColumn('capital',f.regexp_replace('capital',',','.'))

In [None]:
empresas = empresas.withColumn('capital',empresas['capital'].cast(DoubleType()))

In [None]:
empresas.printSchema()

In [None]:
empresas.select('*').show(5,False)

In [None]:
empresas\
    .where('capital<=50')\
    .filter(f.col('porte')==1)\
    .show(6,False)

In [None]:
empresas\
    .where(f.col('raz_soc').like('%RESTAURANTES%'))\
    .show(5,False)

In [None]:
empresas\
    .select('cnpj','porte','capital')\
    .groupBy('porte')\
    .agg(
        f.avg('capital').alias('avg_capital'),
        f.count('cnpj').alias('count')
    )\
    .orderBy('porte',ascending=True)\
    .show()

In [None]:
empresas\
    .select('capital')\
    .summary()\
    .show()
    

In [None]:
avg = empresas.select(f.avg('qualif_resp').cast('float')).collect()[0][0]
empresas\
    .withColumn(
        'Estudo',
        f.when(
            f.col('qualif_resp')< avg
            ,'Reprovado').otherwise('Aprovado')\
        )\
    .show(5,False)



ESTABELECIMENTOS

| Campo                       | Descrição                                                                                          |
|-----------------------------|----------------------------------------------------------------------------------------------------|
| cnpj_bas                 | NÚMERO BASE DE INSCRIÇÃO NO CNPJ (OITO PRIMEIROS DÍGITOS DO CNPJ).                                 |
| cnpj_ord                 | NÚMERO DO ESTABELECIMENTO DE INSCRIÇÃO NO CNPJ (DO NONO ATÉ O DÉCIMO SEGUNDO DÍGITO DO CNPJ).      |
| cnpj_dv                     | DÍGITO VERIFICADOR DO NÚMERO DE INSCRIÇÃO NO CNPJ (DOIS ÚLTIMOS DÍGITOS DO CNPJ).                  |
| id_mat_fil | CÓDIGO DO IDENTIFICADOR MATRIZ/FILIAL: 1 – MATRIZ    2 – FILIAL                                    |
| nome_fant               | CORRESPONDE AO NOME FANTASIA                                                                       |
| sit_cad          | CÓDIGO DA SITUAÇÃO CADASTRAL: 01 – NULA    2 – ATIVA    3 – SUSPENSA    4 – INAPTA    08 – BAIXADA |
| dt_sit_cad     | DATA DO EVENTO DA SITUAÇÃO CADASTRAL                                                               |
| mt_sit_cad   | CÓDIGO DO MOTIVO DA SITUAÇÃO CADASTRAL                                                             |
| nome_cidad  | NOME DA CIDADE NO EXTERIOR                                                                         |
| pais                        | CÓDIGO DO PAIS                                                                                     |
| dt_ini_ativ    | DATA DE INÍCIO DA ATIVIDADE                                                                        |
| cnae_pri       | CÓDIGO DA ATIVIDADE ECONÔMICA PRINCIPAL DO ESTABELECIMENTO                                         |
| cnae_sec      | CÓDIGO DA(S) ATIVIDADE(S) ECONÔMICA(S) SECUNDÁRIA(S) DO ESTABELECIMENTO                            |
| tp_logr         | DESCRIÇÃO DO TIPO DE LOGRADOURO                                                                    |
| lograd                  | NOME DO LOGRADOURO ONDE SE LOCALIZA O ESTABELECIMENTO.                                             |
| numero                      | NÚMERO ONDE SE LOCALIZA O ESTABELECIMENTO. QUANDO NÃO HOUVER PREENCHIMENTO DO NÚMERO HAVERÁ ‘S/N’. |
| compl                 | COMPLEMENTO PARA O ENDEREÇO DE LOCALIZAÇÃO DO ESTABELECIMENTO                                      |
| bairro                      | BAIRRO ONDE SE LOCALIZA O ESTABELECIMENTO.                                                         |
| cep                         | CÓDIGO DE ENDEREÇAMENTO POSTAL REFERENTE AO LOGRADOURO NO QUAL O ESTABELECIMENTO ESTA LOCALIZADO   |
| uf                          | SIGLA DA UNIDADE DA FEDERAÇÃO EM QUE SE ENCONTRA O ESTABELECIMENTO                                 |
| munic                   | CÓDIGO DO MUNICÍPIO DE JURISDIÇÃO ONDE SE ENCONTRA O ESTABELECIMENTO                               |
| ddd_1                      | CONTÉM O DDD 1                                                                                     |
| tel_1                 | CONTÉM O NÚMERO DO TELEFONE 1                                                                      |
| ddd_2                       | CONTÉM O DDD 2                                                                                     |
| tel_2                 | CONTÉM O NÚMERO DO TELEFONE 2                                                                      |
| ddd_fax                 | CONTÉM O DDD DO FAX                                                                                |
| fax                         | CONTÉM O NÚMERO DO FAX                                                                             |
| email          | CONTÉM O E-MAIL DO CONTRIBUINTE                                                                    |
| sit_esp           | SITUAÇÃO ESPECIAL DA EMPRESA                                                                       |
| dt_sit_esp   | DATA EM QUE A EMPRESA ENTROU EM SITUAÇÃO ESPECIAL                                                  |

In [None]:
estabelecimentos = spark.read.csv('estabelecimentos',sep=';',inferSchema=True)

In [None]:
estab_col = [
    'cnpj_bas','cnpj_ord','cnpj_dv','id_mat_fil','nome_fant','sit_cad','dt_sit_cad','mt_sit_cad','nome_cidad','pais','dt_ini_ativ','cnae_pri','cnae_sec','tp_logr'
    ,'lograd','numero','compl','bairro','cep','uf','munic','ddd_1','tel_1','ddd_2','tel_2','ddd_fax','fax','email','sit_esp','dt_sit_esp'
    ]

In [None]:
for i,coluna in enumerate(estab_col):
    estabelecimentos = estabelecimentos.withColumnRenamed(f'_c{i}',coluna)

In [None]:
estabelecimentos.limit(5).toPandas()

In [None]:
estabelecimentos.count()

In [None]:
estabelecimentos.printSchema()

In [None]:
estabelecimentos = estabelecimentos\
.withColumn(
    'dt_sit_cad',
    f.to_date(
    estabelecimentos.dt_sit_cad.cast(StringType()),'yyyyMMdd'
    )
)\
.withColumn(
    'dt_ini_ativ',
    f.to_date(
    estabelecimentos.dt_ini_ativ.cast(StringType()),'yyyyMMdd')
)\
.withColumn(
    'dt_sit_esp',
    f.to_date(
    estabelecimentos.dt_sit_esp.cast(StringType()),'yyyyMMdd')
)

In [None]:
estabelecimentos.printSchema()

In [None]:
estabelecimentos\
    .select('nome_fant','munic',f.year('dt_ini_ativ').alias('ano_ini_ativ'),f.month('dt_ini_ativ').alias('ano_ini_ativ'))\
    .show(5,False)

SÓCIOS

| Campo| Descrição|
|------|----------|
| cnpj_bas| NÚMERO BASE DE INSCRIÇÃO NO CNPJ (CADASTRO NACIONAL DA PESSOA JURÍDICA)|
| id_soc | CÓDIGO DO IDENTIFICADOR DE SÓCIO 1 – PESSOA JURÍDICA    2 – PESSOA FÍSICA    3 – ESTRANGEIRO                                                
| nome_soc | NOME DO SÓCIO PESSOA FÍSICA OU A RAZÃO SOCIAL E/OU NOME EMPRESARIAL DA PESSOA JURÍDICA E/OU NOME DO SÓCIO/RAZÃO SOCIAL DO SÓCIO ESTRANGEIRO |
| cnpj_cpf_soc| CPF OU CNPJ DO SÓCIO (SÓCIO ESTRANGEIRO NÃO TEM ESTA INFORMAÇÃO).|
| qualif_soc| CÓDIGO DA QUALIFICAÇÃO DO SÓCIO|
| dt_entrad | DATA DE ENTRADA NA SOCIEDADE|
| pais| CÓDIGO PAÍS DO SÓCIO ESTRANGEIRO|
| repr_legal| NÚMERO DO CPF DO REPRESENTANTE LEGAL|
| nome_repr| NOME DO REPRESENTANTE LEGAL|
| qualif_repr| CÓDIGO DA QUALIFICAÇÃO DO REPRESENTANTE LEGAL|
| faix_etar | CÓDIGO CORRESPONDENTE À FAIXA ETÁRIA DO SÓCIO|

In [None]:
socios = spark.read.csv('socios',sep=';',inferSchema=True)

In [None]:
soc_col = ['cnpj_bas','id_soc','nome_soc','cnpj_cpf_soc','qualif_soc','dt_entrad','pais','repr_legal','nome_repr','qualif_repr','faix_etar']

In [None]:
for i,coluna in enumerate(soc_col):
    socios = socios.withColumnRenamed(f'_c{i}',coluna)

In [None]:
socios.limit(5).toPandas()

In [None]:
socios.count()

In [None]:
socios.printSchema()

In [None]:
socios = socios\
.withColumn(
    'dt_entrad',
    f.to_date(
    socios.dt_entrad.cast(StringType()),'yyyyMMdd'
    )
)

In [None]:
socios.printSchema()

In [None]:
socios\
    .select('nome_soc','faix_etar',f.year('dt_entrad').alias('ano_entrad'))\
    .show(5,False)

In [None]:
socios\
    .select(
        f.concat_ws(', ',
        f.substring_index(f.col('nome_soc'),' ',-1),
        f.substring_index(f.col('nome_soc'),' ',1),
        )
        .alias('ident ')
    )\
    .show(5,False)

In [None]:
socios\
    .select(
        [
          f.count(
              f.when(
                  f.isnull(campo),1
              )
          ).alias(campo)\
            for campo in socios.columns
        ]
    ).show()

In [None]:
socios = socios.na.fill(0)

In [None]:
socios = socios.na.fill('N/A')

In [None]:
socios.limit(5).toPandas()

Order By

In [None]:
socios\
    .select('nome_soc','faix_etar',f.year('dt_entrad').alias('ano_entrad'))\
    .orderBy('ano_entrad',ascending=False)\
    .show(5,False)

OrderBy two columns

In [None]:
socios\
    .select('nome_soc','faix_etar',f.year('dt_entrad').alias('ano_entrad'))\
    .orderBy(['ano_entrad','faix_etar'],ascending=[False,False])\
    .show(5,False)

Filter

In [None]:
socios\
    .filter((f.col('nome_soc').startswith('RODRIGO')))\
    .filter((f.col('nome_soc').endswith('DIAS')))\
    .select('nome_soc')\
    .limit(10)\
    .toPandas()


Count

In [None]:
socios\
    .select(f.year('dt_entrad').alias('ano_entrad'))\
    .where('ano_entrad >= 2010')\
    .groupBy('ano_entrad')\
    .count()\
    .orderBy('ano_entrad',ascending=True)\
    .show()

Joins

In [None]:
empresas_join = estabelecimentos.join(empresas,estabelecimentos.cnpj_bas == empresas.cnpj,how='inner')

Views

In [None]:
empresas.createOrReplaceTempView('empresasView')

In [None]:
spark.sql("""
SELECT * FROM empresasView
WHERE capital =50
""").show(5)

Write

In [None]:
empresas.write.csv(path='empresas/csv',mode='overwrite',sep=';',header=True)

In [None]:
estabelecimentos.write.parquet(path='estabelecimentos/parquet',mode='overwrite')

In [None]:
socios.write.orc(path='socios/orc',mode='overwrite')

Particionamento

In [None]:
empresas.coalesce(1).write.csv(path='empresas/csv-unico',mode='overwrite',sep=';',header=True)

In [None]:
estabelecimentos.write.parquet(path='estabelecimentos/parquet-with-partition',mode='overwrite',partitionBy='dt_sit_cad')

In [None]:
spark.stop()