Instalação das bibliotecas básicas

In [None]:
import pandas as pd 
import numpy as np
import random
import os

### Instalação Spark

Para a constituição do ambiente Spark, basta a execução da célula abaixo. (*dura cerca de 40 segundos*)

In [2]:
%pip install findspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [4]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("app").getOrCreate()

In [7]:
spark

### Por que usar Spark nesse contexto?

Vamos comparar o tempo de execução de um comando de importação de um dataframe pandas e de um dataframe do spark.

In [149]:
import time

inicio = time.time()

cabecalho=['CNPJ_RAD', \
           'RAZAO_SOCIAL', \
           'NAT_JURIDICA', \
           'QUALIFICA_RESP', \
           'CAPITAL_SOCIAL', \
           'PORTE', \
           'ENTE_FED_RESP']

dataframe = pd.read_csv('RFB_CNPJ_EMPRESA_0.CSV', \
                            header=None, names=cabecalho, sep=';', encoding='windows-1252', low_memory=False)
dataframe.info()

fim = time.time()

print(f'\nTempo de execução com pandas (em segundos): {fim - inicio}')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9387789 entries, 0 to 9387788
Data columns (total 7 columns):
 #   Column          Dtype  
---  ------          -----  
 0   CNPJ_RAD        int64  
 1   RAZAO_SOCIAL    object 
 2   NAT_JURIDICA    int64  
 3   QUALIFICA_RESP  int64  
 4   CAPITAL_SOCIAL  object 
 5   PORTE           float64
 6   ENTE_FED_RESP   object 
dtypes: float64(1), int64(3), object(3)
memory usage: 501.4+ MB

Tempo de execução com pandas (em segundos): 12.83230996131897


In [150]:
inicio = time.time()

csvSchema = StructType(
    [StructField('CNPJ_RAD',StringType(),True),\
     StructField('RAZAO_SOCIAL',StringType(),True),\
     StructField('NAT_JURIDICA',StringType(),True),\
     StructField('QUALIFICA_RESP',StringType(),True),\
     StructField('CAPITAL_SOCIAL',StringType(),True),\
     StructField('PORTE',StringType(),True),\
     StructField('ENTE_FED_RESP',StringType(),True)])

dataframe = spark.read.csv('RFB_CNPJ_EMPRESA_0.CSV', header=None, schema=csvSchema, sep=';', encoding='windows-1252')

dataframe.printSchema()

fim = time.time()

print(f'Tempo de execução com pyspark (em segundos): {fim - inicio}')

root
 |-- CNPJ_RAD: string (nullable = true)
 |-- RAZAO_SOCIAL: string (nullable = true)
 |-- NAT_JURIDICA: string (nullable = true)
 |-- QUALIFICA_RESP: string (nullable = true)
 |-- CAPITAL_SOCIAL: string (nullable = true)
 |-- PORTE: string (nullable = true)
 |-- ENTE_FED_RESP: string (nullable = true)

Tempo de execução com pyspark (em segundos): 0.40335845947265625


Com o pandas o dataframe levou 12,8 segundos para carragar e com o pyspark, esse mesmo dataframe, levou 0,4 segundos para ser carregado, um tempo 32 vezes mais rápido (!!!)<br><br>
Dada o volume de dados a ser trabalhado nesse cenário, a opção natural, é portando, o uso do pyspark para análise exploratória.

### Iniciando a exploração dos dados

Importando bibliotecas específicas do pyspark

In [6]:
from pyspark.sql.types import *
import pyspark.sql.functions as f

## Empresas

In [7]:
csvSchema = StructType(
    [StructField('CNPJ_RAD',StringType(),True),\
     StructField('RAZAO_SOCIAL',StringType(),True),\
     StructField('NAT_JURIDICA',StringType(),True),\
     StructField('QUALIFICA_RESP',StringType(),True),\
     StructField('CAPITAL_SOCIAL',StringType(),True),\
     StructField('PORTE',StringType(),True),\
     StructField('ENTE_FED_RESP',StringType(),True)])

In [8]:
csvFilesEmpresa = ['RFB_CNPJ_EMPRESA_0.CSV',\
                  'RFB_CNPJ_EMPRESA_1.CSV',\
                  'RFB_CNPJ_EMPRESA_2.CSV',\
                  'RFB_CNPJ_EMPRESA_3.CSV',\
                  'RFB_CNPJ_EMPRESA_4.CSV',\
                  'RFB_CNPJ_EMPRESA_5.CSV',\
                  'RFB_CNPJ_EMPRESA_6.CSV',\
                  'RFB_CNPJ_EMPRESA_7.CSV',\
                  'RFB_CNPJ_EMPRESA_8.CSV',\
                  'RFB_CNPJ_EMPRESA_9.CSV']

In [9]:
empresa = spark.read.csv(csvFilesEmpresa, header=None, schema=csvSchema, sep=';', encoding='windows-1252')

In [9]:
display(empresa)

CNPJ_RAD,RAZAO_SOCIAL,NAT_JURIDICA,QUALIFICA_RESP,CAPITAL_SOCIAL,PORTE,ENTE_FED_RESP
41273602,FABIO SOUZA DO RO...,2135,50,1500000,1,
41273603,GRAFLINE ACESSORI...,2062,49,1000000,1,
41273604,RUMO - ESTUDIO DE...,2062,49,1000000,1,
41273605,WALLACE DE OLIVEI...,2135,50,100000,1,
41273606,MARCOS CESAR DE M...,2135,50,7200000,1,
41273607,LAYANE SCARLETT D...,2135,50,100,1,
41273608,FRANCISCA SAMPAIO...,2135,50,0,1,
41273609,INGRID DIAS ALVES...,2135,50,1000000,1,
41273610,DAIANE DE SOUZA 0...,2135,50,250000,1,
41273611,RENATO MOREIRA DA...,2135,50,200000,1,


#### Quantidade de registros na base de Empresas

In [10]:
empresa.count()

49841529

#### Porte das empresas

In [111]:
df_porte = empresa\
            .select('PORTE')\
            .groupby('PORTE')\
            .count()\
            .sort('count',ascending=False)

df_porte = df_porte.withColumn('DESC_PORTE', 
                     f.when(f.col('PORTE') == '01','MICRO EMPRESA')
                      .when(f.col('PORTE') == '03','EMPRESA DE PEQUENO PORTE')
                      .when(f.col('PORTE') == '05','DEMAIS')
                     .otherwise(f.lit('Não informado'))
                    )
df_porte.drop('PORTE')

df_porte.select('DESC_PORTE','count').show(truncate=False)

+------------------------+--------+
|DESC_PORTE              |count   |
+------------------------+--------+
|MICRO EMPRESA           |34995753|
|DEMAIS                  |13489254|
|EMPRESA DE PEQUENO PORTE|1292685 |
|Não informado           |63837   |
+------------------------+--------+



#### Capital Social

In [129]:
from pyspark.sql.types import IntegerType

empresa = empresa.withColumn("CAPITAL_SOCIAL_N", f.regexp_replace("CAPITAL_SOCIAL",',','.').cast(IntegerType()))

In [139]:
df_capital = empresa\
            .select('CNPJ_RAD','RAZAO_SOCIAL','CAPITAL_SOCIAL_N')\
            .groupby('CNPJ_RAD','RAZAO_SOCIAL')\
            .sum()\
            .sort('sum(CAPITAL_SOCIAL_N)',ascending=False)\
            .withColumnRenamed('sum(CAPITAL_SOCIAL_N)','CAPITAL_SOCIAL_TOTAL')

In [140]:
df_capital.select('RAZAO_SOCIAL','CAPITAL_SOCIAL_TOTAL').show(truncate=False)

+-------------------------------------------------------------------+--------------------+
|RAZAO_SOCIAL                                                       |CAPITAL_SOCIAL_TOTAL|
+-------------------------------------------------------------------+--------------------+
|CNH INDUSTRIAL LATIN AMERICA LTDA.                                 |2146078578          |
|COMPANHIA DE ALUMINA DO PARA                                       |2143204237          |
|SUMMIT EMPREENDIMENTOS MINERAIS LTDA.                              |2140703000          |
|SOMPO INTERNATIONAL HOLDINGS BRASIL LTDA                           |2136264866          |
|JNS 41 PARTICIPACOES LTDA.                                         |2135897701          |
|COOPERATIVA AGROINDUSTRIAL DOS PRODUTORES RURAIS DO SUDOESTE GOIANO|2134567988          |
|EDP RENOVAVEIS BRASIL S/A                                          |2129472300          |
|ALTO SERTAO PARTICIPACOES S.A. EM RECUPERACAO JUDICIAL             |2128113290          |

In [144]:
df_capital_faixa = df_capital.withColumn('FAIXA_CAPITAL_SOCIAL',
                     f.when(f.col('CAPITAL_SOCIAL_TOTAL') > f.lit(1000000),'5. (1000-)')
                      .when(f.col('CAPITAL_SOCIAL_TOTAL') > f.lit(100000),'4. (100-1000]') 
                      .when(f.col('CAPITAL_SOCIAL_TOTAL') > f.lit(10000),'3. (10-100]')
                      .when(f.col('CAPITAL_SOCIAL_TOTAL') > f.lit(1000),'2. (1-10]')
                      .when(f.col('CAPITAL_SOCIAL_TOTAL') >= f.lit(0),'1. [0-1]')
                      .otherwise(f.lit('9. Não possui capital social'))
                    )

df_capital_faixa = df_capital_faixa\
            .select('FAIXA_CAPITAL_SOCIAL')\
            .groupby('FAIXA_CAPITAL_SOCIAL')\
            .count()\
            .sort('FAIXA_CAPITAL_SOCIAL',ascending=True)\

df_capital_faixa.select('FAIXA_CAPITAL_SOCIAL','count').show(truncate=False)

+----------------------------+--------+
|FAIXA_CAPITAL_SOCIAL        |count   |
+----------------------------+--------+
|1. [0-1]                    |27544092|
|2. (1-10]                   |13208526|
|3. (10-100]                 |7498530 |
|4. (100-1000]               |1337897 |
|5. (1000-)                  |251373  |
|9. Não possui capital social|1111    |
+----------------------------+--------+



## Estabelecimentos

In [9]:
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import col, max, min

In [10]:
csvSchema = StructType(
    [StructField('CNPJ_RAD',StringType(),True),\
     StructField('CNPJ_ORD',StringType(),True),\
     StructField('CNPJ_DV',StringType(),True),\
     StructField('ID_MATRIZ',StringType(),True),\
     StructField('NM_FANTASIA',StringType(),True),\
     StructField('SITUACAO_CAD',StringType(),True),\
     StructField('DT_SITUACAO_CAD',StringType(),True),\
     StructField('MOT_SITUACAO_CAD',StringType(),True),\
     StructField('NM_CIDADE_EXTERIOR',StringType(),True),\
     StructField('PAIS',StringType(),True),\
     StructField('DT_INICIO_ATV',StringType(),True),\
     StructField('CNAE_PRINC',StringType(),True),\
     StructField('CNAE_SEC',StringType(),True),\
     StructField('TP_LOGRADOURO',StringType(),True),\
     StructField('LOGADOURO',StringType(),True),\
     StructField('NUMERO',StringType(),True),\
     StructField('COMPLEMENTO',StringType(),True),\
     StructField('BAIRRO',StringType(),True),\
     StructField('CEP',StringType(),True),\
     StructField('UF',StringType(),True),\
     StructField('CD_MUNICIPIO',StringType(),True),\
     StructField('DDD1',StringType(),True),\
     StructField('FONE1',StringType(),True),\
     StructField('DDD2',StringType(),True),\
     StructField('FONE2',StringType(),True),\
     StructField('DDD_FAX',StringType(),True),\
     StructField('FAX',StringType(),True),\
     StructField('EMAIL',StringType(),True),\
     StructField('SITUACAO_ESPECIAL',StringType(),True),\
     StructField('DT_SITUACAO_ESPECIAL',StringType(),True)])

In [7]:
path='C:/Users/thiago.silva/TCC/RFB_CNPJ/'

In [8]:
csvFilesEstabelecimento = ['ESTAB_0.CSV',\
                           'ESTAB_1.CSV',\
                           'ESTAB_2.CSV',\
                           'ESTAB_3.CSV',\
                           'ESTAB_4.CSV',\
                           'ESTAB_5.CSV',\
                           'ESTAB_6.CSV',\
                           'ESTAB_7.CSV',\
                           'ESTAB_8.CSV',\
                           'ESTAB_9.CSV']

In [None]:
estabelecimento = spark.read.csv(path+csvFilesEstabelecimento, header=None, schema=csvSchema, sep=';', encoding='windows-1252')

In [12]:
display(estabelecimento)

DataFrame[CNPJ_RAD: string, CNPJ_ORD: string, CNPJ_DV: string, ID_MATRIZ: string, NM_FANTASIA: string, SITUACAO_CAD: string, DT_SITUACAO_CAD: string, MOT_SITUACAO_CAD: string, NM_CIDADE_EXTERIOR: string, PAIS: string, DT_INICIO_ATV: string, CNAE_PRINC: string, CNAE_SEC: string, TP_LOGRADOURO: string, LOGADOURO: string, NUMERO: string, COMPLEMENTO: string, BAIRRO: string, CEP: string, UF: string, CD_MUNICIPIO: string, DDD1: string, FONE1: string, DDD2: string, FONE2: string, DDD_FAX: string, FAX: string, EMAIL: string, SITUACAO_ESPECIAL: string, DT_SITUACAO_ESPECIAL: string]

#### Quantidade de registros na base de Estabelecimentos

In [54]:
estabelecimento.count()

56520622

#### Matriz/Filial

In [24]:
df_matriz = estabelecimento\
            .select('ID_MATRIZ')\
            .groupby('ID_MATRIZ')\
            .count()\
            .sort('count',ascending=False)

df_matriz = df_matriz.withColumn('MATRIZ_FILIAL', 
                     f.when(f.col('ID_MATRIZ') == '1','MATRIZ')
                      .when(f.col('ID_MATRIZ') == '2','FILIAL')
                     .otherwise(f.lit('Não informado'))
                    )
df_matriz.drop('ID_MATRIZ')

df_matriz.select('MATRIZ_FILIAL','count').show()

+-------------+--------+
|MATRIZ_FILIAL|   count|
+-------------+--------+
|       MATRIZ|53671274|
|       FILIAL| 2849348|
+-------------+--------+



#### Ano de abertura das empresas

In [13]:
estabelecimento = estabelecimento.withColumn('ANO_ABERTURA', f.substring(f.col('DT_INICIO_ATV'), 1, 4))
estabelecimento = estabelecimento.withColumn('ANO_ABERTURA_n', col('ANO_ABERTURA').cast("integer"))

In [39]:
df_ano_abertura = estabelecimento\
                  .select('ANO_ABERTURA')\
                  .groupby('ANO_ABERTURA')\
                  .count()\
                  .sort('count',ascending=False)

In [12]:
df_ano_abertura.show()

+------------+-------+
|ANO_ABERTURA|  count|
+------------+-------+
|        2021|4136068|
|        2020|4011782|
|        2022|3970323|
|        2019|3271266|
|        2018|2762068|
|        2016|2654049|
|        2017|2371952|
|        2012|2332555|
|        2015|2127507|
|        2014|2022592|
|        2013|1978709|
|        2011|1726484|
|        2010|1544043|
|        2023|1507961|
|        2008|1216212|
|        2004|1036644|
|        2009| 872811|
|        2006| 807948|
|        2007| 750726|
|        1997| 677493|
+------------+-------+
only showing top 20 rows



In [45]:
df_ano_abertura.sort('ANO_ABERTURA',ascending=True).show(50)

+------------+-----+
|ANO_ABERTURA|count|
+------------+-----+
|        null|    3|
|        1109|    1|
|        1194|    1|
|        1199|    1|
|        1601|    1|
|        1696|    1|
|        1891|    1|
|        1893|    1|
|        1899|    1|
|        1901|  161|
|        1902|    3|
|        1903|    8|
|        1904|    8|
|        1905|    4|
|        1906|    6|
|        1907|    4|
|        1908|    6|
|        1909|    4|
|        1910|    3|
|        1911|    4|
|        1912|    4|
|        1913|    3|
|        1914|    4|
|        1915|    7|
|        1916|    2|
|        1917|    5|
|        1918|    7|
|        1919|    4|
|        1920|    8|
|        1921|    6|
|        1922|   12|
|        1923|    2|
|        1924|    6|
|        1925|   10|
|        1926|    6|
|        1927|   10|
|        1928|    3|
|        1929|   18|
|        1930|    9|
|        1931|   15|
|        1932|   15|
|        1933|   19|
|        1934|   40|
|        1935|   10|
|        1936

In [26]:
# Supondo que o seu DataFrame seja chamado de 'df' e a coluna de string seja chamada 'coluna_string'
df_ano_abertura2 = df_ano_abertura.withColumn('ANO_ABERTURA_N', col('ANO_ABERTURA').cast("integer"))

In [27]:
# Supondo que o seu DataFrame seja chamado de 'df' e a coluna convertida seja 'coluna_inteiro'
valor_maximo = df_ano_abertura2.select(max('ANO_ABERTURA_N')).first()[0]
valor_minimo = df_ano_abertura2.select(min('ANO_ABERTURA_N')).first()[0]

print("Valor máximo:", valor_maximo)
print("Valor mínimo:", valor_minimo)

Valor máximo: 9420
Valor mínimo: 1109


In [29]:
estatisticas_descritivas = df_ano_abertura2.describe('ANO_ABERTURA_N')
estatisticas_descritivas.show()

+-------+------------------+
|summary|    ANO_ABERTURA_N|
+-------+------------------+
|  count|               139|
|   mean|2137.4964028776976|
| stddev| 902.7985344624495|
|    min|              1109|
|    max|              9420|
+-------+------------------+



**Situação cadastral**

In [14]:
estabelecimento = estabelecimento.withColumn('ANO_SIT_CADASTRAL', f.substring(f.col('DT_SITUACAO_CAD'), 1, 4))
estabelecimento = estabelecimento.withColumn('ANO_SIT_CADASTRAL_N', col('ANO_SIT_CADASTRAL').cast("integer"))

In [30]:
# Analisando a data da situação cadastral
estatisticas_descritivas2 = estabelecimento.describe('ANO_SIT_CADASTRAL_N')
estatisticas_descritivas2.show()

+-------+-------------------+
|summary|ANO_SIT_CADASTRAL_N|
+-------+-------------------+
|  count|           56520622|
|   mean|  2010.071795388239|
| stddev| 102.79639970566382|
|    min|                  0|
|    max|               2023|
+-------+-------------------+



**Período de funcionamento do estabelecimento**

In [15]:
estabelecimento = estabelecimento.withColumn('TEMPO_FUNCIONAMENTO', f.col('ANO_SIT_CADASTRAL')-f.col('ANO_ABERTURA_N'))

In [19]:
# Analisando o tempo de funcionamento
estatisticas_descritivas3 = estabelecimento.describe('TEMPO_FUNCIONAMENTO')
estatisticas_descritivas3.show()

+-------+-------------------+
|summary|TEMPO_FUNCIONAMENTO|
+-------+-------------------+
|  count|           56520619|
|   mean| 0.7809523812893839|
| stddev| 102.58931731143124|
|    min|            -9420.0|
|    max|              909.0|
+-------+-------------------+



In [16]:
# Calcule o primeiro e o terceiro quartil para uma coluna específica
coluna = 'TEMPO_FUNCIONAMENTO'
quantiles = estabelecimento.approxQuantile(coluna, [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]

# Calcule o intervalo interquartil (IQR)
iqr = q3 - q1

# Calcule os limites superior e inferior usando o IQR
limite_superior = q3 + (1.5 * iqr)
limite_inferior = q1 - (1.5 * iqr)

# Use os limites para filtrar os outliers
df_outliers = estabelecimento.filter((col(coluna) > limite_superior) | (col(coluna) < limite_inferior))

# Exiba os outliers
df_outliers.show()

+--------+--------+-------+---------+--------------------+------------+---------------+----------------+------------------+----+-------------+----------+--------------------+-------------+--------------------+------+--------------------+--------------------+--------+---+------------+----+--------+----+--------+-------+--------+--------------------+-----------------+--------------------+------------+--------------+-----------------+-------------------+-------------------+
|CNPJ_RAD|CNPJ_ORD|CNPJ_DV|ID_MATRIZ|         NM_FANTASIA|SITUACAO_CAD|DT_SITUACAO_CAD|MOT_SITUACAO_CAD|NM_CIDADE_EXTERIOR|PAIS|DT_INICIO_ATV|CNAE_PRINC|            CNAE_SEC|TP_LOGRADOURO|           LOGADOURO|NUMERO|         COMPLEMENTO|              BAIRRO|     CEP| UF|CD_MUNICIPIO|DDD1|   FONE1|DDD2|   FONE2|DDD_FAX|     FAX|               EMAIL|SITUACAO_ESPECIAL|DT_SITUACAO_ESPECIAL|ANO_ABERTURA|ANO_ABERTURA_n|ANO_SIT_CADASTRAL|ANO_SIT_CADASTRAL_N|TEMPO_FUNCIONAMENTO|
+--------+--------+-------+---------+-----------

In [18]:
df_outliers.count()

4313367

In [20]:
# Analisando o tempo de funcionamento
estatisticas_descritivas4 = df_outliers.describe('TEMPO_FUNCIONAMENTO')
estatisticas_descritivas4.show()

+-------+-------------------+
|summary|TEMPO_FUNCIONAMENTO|
+-------+-------------------+
|  count|            4313367|
|   mean| -39.48778575994113|
| stddev|  368.4341498632619|
|    min|            -9420.0|
|    max|              909.0|
+-------+-------------------+



In [30]:
select_columns = ['DT_INICIO_ATV',
                  'DT_SITUACAO_CAD',
                  'ANO_ABERTURA_n',
                  'ANO_SIT_CADASTRAL_N',
                  'TEMPO_FUNCIONAMENTO']

df_validation = estabelecimento.select(*select_columns)

In [36]:
df_validation.select(*select_columns).sort('TEMPO_FUNCIONAMENTO', asc=False).show()

+-------------+---------------+--------------+-------------------+-------------------+
|DT_INICIO_ATV|DT_SITUACAO_CAD|ANO_ABERTURA_n|ANO_SIT_CADASTRAL_N|TEMPO_FUNCIONAMENTO|
+-------------+---------------+--------------+-------------------+-------------------+
|         null|             08|          null|                  8|               null|
|         null|             08|          null|                  8|               null|
|         null|             08|          null|                  8|               null|
|      9420100|             00|          9420|                  0|            -9420.0|
|      5620104|             63|          5620|                 63|            -5557.0|
|      4930204|             01|          4930|                  1|            -4929.0|
|      4755503|             01|          4755|                  1|            -4754.0|
|      4771701|             21|          4771|                 21|            -4750.0|
|      4744001|             63|          47

In [38]:
df_validation.select(*select_columns).sort('DT_INICIO_ATV', asc=False).show(50)

+-------------+---------------+--------------+-------------------+-------------------+
|DT_INICIO_ATV|DT_SITUACAO_CAD|ANO_ABERTURA_n|ANO_SIT_CADASTRAL_N|TEMPO_FUNCIONAMENTO|
+-------------+---------------+--------------+-------------------+-------------------+
|         null|             08|          null|                  8|               null|
|         null|             08|          null|                  8|               null|
|         null|             08|          null|                  8|               null|
|     11090810|       20181011|          1109|               2018|              909.0|
|     11940815|       20151229|          1194|               2015|              821.0|
|     11990622|       20051103|          1199|               2005|              806.0|
|     16010101|       16010101|          1601|               1601|                0.0|
|     16960719|       20040423|          1696|               2004|              308.0|
|     18911023|       20051103|          18

In [53]:
estabelecimento.filter(f.col('ANO_SIT_CADASTRAL')=='1903').show(50)

+--------+--------+-------+---------+--------------------+------------+---------------+----------------+------------------+----+-------------+----------+--------+-------------+--------------------+------+-----------+-------+--------+---+------------+----+--------+----+--------+-------+--------+--------------------+-----------------+--------------------+------------+--------------+-----------------+-------------------+-------------------+
|CNPJ_RAD|CNPJ_ORD|CNPJ_DV|ID_MATRIZ|         NM_FANTASIA|SITUACAO_CAD|DT_SITUACAO_CAD|MOT_SITUACAO_CAD|NM_CIDADE_EXTERIOR|PAIS|DT_INICIO_ATV|CNAE_PRINC|CNAE_SEC|TP_LOGRADOURO|           LOGADOURO|NUMERO|COMPLEMENTO| BAIRRO|     CEP| UF|CD_MUNICIPIO|DDD1|   FONE1|DDD2|   FONE2|DDD_FAX|     FAX|               EMAIL|SITUACAO_ESPECIAL|DT_SITUACAO_ESPECIAL|ANO_ABERTURA|ANO_ABERTURA_n|ANO_SIT_CADASTRAL|ANO_SIT_CADASTRAL_N|TEMPO_FUNCIONAMENTO|
+--------+--------+-------+---------+--------------------+------------+---------------+----------------+------------

#### Atividade principal

In [16]:
estabelecimento = estabelecimento.withColumn('CD_CNAE20_DIVISAO', f.substring(f.col('CNAE_PRINC'), 1, 2))

In [15]:
df_atividade = estabelecimento\
                  .select('CD_CNAE20_DIVISAO')\
                  .groupby('CD_CNAE20_DIVISAO')\
                  .count()\
                  .sort('count',ascending=False)

In [17]:
desc_div = spark.read.csv('DESC_DIVISAO_CNAE.csv', header=True, sep=';')
desc_div.show()

+---------------+--------------------+-----------------+--------------------+
|CD_SECAO_CNAE20|   DESC_SECAO_CNAE20|CD_CNAE20_DIVISAO| DESC_CNAE20_DIVISAO|
+---------------+--------------------+-----------------+--------------------+
|              A|AGRICULTURA, PECU...|               01|AGRICULTURA, PECU...|
|              A|AGRICULTURA, PECU...|               02|  PRODUÇÃO FLORESTAL|
|              A|AGRICULTURA, PECU...|               03| PESCA E AQÜICULTURA|
|              B|INDÚSTRIAS EXTRAT...|               05|EXTRAÇÃO DE CARVÃ...|
|              B|INDÚSTRIAS EXTRAT...|               06|EXTRAÇÃO DE PETRÓ...|
|              B|INDÚSTRIAS EXTRAT...|               07|EXTRAÇÃO DE MINER...|
|              B|INDÚSTRIAS EXTRAT...|               08|EXTRAÇÃO DE MINER...|
|              B|INDÚSTRIAS EXTRAT...|               09|ATIVIDADES DE APO...|
|              C|INDÚSTRIAS DE TRA...|               10|FABRICAÇÃO DE PRO...|
|              C|INDÚSTRIAS DE TRA...|               11|FABRICAÇ

In [17]:
df_atividade = df_atividade.join(desc_div,['CD_CNAE20_DIVISAO'],'left')

In [None]:
df_atividade = df_atividade.drop('CD_SECAO_CNAE20','DESC_SECAO_CNAE20','CD_CNAE20_DIVISAO')

In [28]:
df_atividade.select('DESC_CNAE20_DIVISAO','count').show()

+--------------------+-------+
| DESC_CNAE20_DIVISAO|  count|
+--------------------+-------+
|EXTRAÇÃO DE MINER...|   9857|
|    TRANSPORTE AÉREO|   5905|
|PREPARAÇÃO DE COU...| 143313|
|FABRICAÇÃO DE BEB...|  22718|
|ATIVIDADES JURÍDI...| 374208|
|FABRICAÇÃO DE VEÍ...|  31494|
|OBRAS DE INFRA-ES...| 107157|
|PUBLICIDADE E PES...|1371254|
|ATIVIDADES DE ATE...| 303301|
|ATIVIDADES DE SER...| 432814|
|FABRICAÇÃO DE OUT...|   7607|
|ATIVIDADES CINEMA...| 180726|
|AGRICULTURA, PECU...|1029183|
|FABRICAÇÃO DE MÁQ...|  51086|
|FABRICAÇÃO DE PRO...|  79861|
|            EDUCAÇÃO|1499642|
|ARMAZENAMENTO E A...| 533360|
|FABRICAÇÃO DE PRO...| 171472|
|ELETRICIDADE, GÁS...|  29437|
|SERVIÇOS DE ARQUI...| 299471|
+--------------------+-------+
only showing top 20 rows



## Sócios

In [13]:
csvSchema = StructType(
    [StructField('CNPJ_RAD',StringType(),True),\
     StructField('ID_SOCIO',StringType(),True),\
     StructField('NM_SOCIO',StringType(),True),\
     StructField('DOC_SOCIO',StringType(),True),\
     StructField('QUALIFIC_SOCIO',StringType(),True),\
     StructField('DT_ENT_SOCIEDADE',StringType(),True),\
     StructField('PAIS_SOCIO_ESTRANGEIRO',StringType(),True),\
     StructField('CPF_REPRESENTANTE',StringType(),True),\
     StructField('NM_REPRESENTANTE',StringType(),True),\
     StructField('QUALIFIC_REPRESENTANTE',StringType(),True),\
     StructField('FAIXA_ETARIA',StringType(),True)])

In [14]:
csvFilesSocio = ['RFB_CNPJ_SOCIO_0.CSV',\
                 'RFB_CNPJ_SOCIO_1.CSV',\
                 'RFB_CNPJ_SOCIO_2.CSV',\
                 'RFB_CNPJ_SOCIO_3.CSV',\
                 'RFB_CNPJ_SOCIO_4.CSV',\
                 'RFB_CNPJ_SOCIO_5.CSV',\
                 'RFB_CNPJ_SOCIO_6.CSV',\
                 'RFB_CNPJ_SOCIO_7.CSV',\
                 'RFB_CNPJ_SOCIO_8.CSV',\
                 'RFB_CNPJ_SOCIO_9.CSV']

In [15]:
socio = spark.read.csv(csvFilesSocio, header=None, schema=csvSchema, sep=';', encoding='windows-1252')

In [20]:
display(socio)

CNPJ_RAD,ID_SOCIO,NM_SOCIO,DOC_SOCIO,QUALIFIC_SOCIO,DT_ENT_SOCIEDADE,PAIS_SOCIO_ESTRANGEIRO,CPF_REPRESENTANTE,NM_REPRESENTANTE,QUALIFIC_REPRESENTANTE,FAIXA_ETARIA
2954739,2,LIDIANE BORGES PI...,***852188**,49,19990129,,***000000**,,0,5
6888391,2,WAGNER APARECIDO ...,***345608**,49,20040630,,***000000**,,0,5
6888430,2,SERGIO RICARDO LI...,***542024**,16,20070903,,***000000**,,0,4
2522368,2,MANOEL ANTONIO SI...,***363378**,22,19980513,,***000000**,,0,8
2522368,2,MIRTES MARIA DASC...,***168448**,49,19980513,,***000000**,,0,8
1335500,2,NILDA RODRIGUES D...,***008016**,16,20050912,,***000000**,,0,7
20660445,2,MARIO GONCALVES C...,***289906**,16,20050912,,***000000**,,0,9
7632414,2,EDIOMARA DE RAMOS...,***093309**,65,20051010,,***000000**,,0,4
6888866,2,ADRIANA DE PAIVA ...,***209916**,22,20040719,,***000000**,,0,6
6888866,2,RAIMUNDO MOIA JUNIOR,***437286**,49,20040719,,***000000**,,0,5


#### Quantidade de registros na base de Sócios

In [21]:
socio.count()

21565831

#### Quantidade de sócios por empresa

In [84]:
df_qtd_socio = socio\
                  .select('CNPJ_RAD')\
                  .groupby('CNPJ_RAD')\
                  .count()\
                  .withColumnRenamed('count','QTDE_SOCIOS')

In [85]:
df_qtd_socio_emp = df_qtd_socio\
                  .select('QTDE_SOCIOS')\
                  .groupby('QTDE_SOCIOS')\
                  .count()\
                  .sort('QTDE_SOCIOS',ascending=True)

In [86]:
display(df_qtd_socio_emp.limit(10))

QTDE_SOCIOS,count
1,4139135
2,6356022
3,709039
4,245421
5,92546
6,47036
7,23131
8,14064
9,8785
10,6107


Empresa com mais sócios

In [87]:
df_qtd_socio_emp.tail(1)

[Row(QTDE_SOCIOS=1149, count=1)]

#### Faixa etária dos sócios

Descrição dos códigos de faixa etária, informada pela Receita Federal

- 1: para os intervalos entre 0 a 12 anos;
- 2: para os intervalos entre 13 a 20 anos;
- 3: para os intervalos entre 21 a 30 anos;
- 4: para os intervalos entre 31 a 40 anos;
- 5: para os intervalos entre 41 a 50 anos;
- 6: para os intervalos entre 51 a 60 anos;
- 7: para os intervalos entre 61 a 70 anos;
- 8: para os intervalos entre 71 a 80 anos;
- 9: para maiores de 80 anos.
- 0: para não se aplica.

In [108]:
df_faixa_etaria = socio\
            .select('FAIXA_ETARIA')\
            .groupby('FAIXA_ETARIA')\
            .count()\
            .sort('count',ascending=False)

df_faixa_etaria = df_faixa_etaria.withColumn('DESC_FAIXA_ETARIA', 
                     f.when(f.col('FAIXA_ETARIA') == '1','0 a 12 anos')
                      .when(f.col('FAIXA_ETARIA') == '2','13 a 20 anos')
                      .when(f.col('FAIXA_ETARIA') == '3','21 a 30 anos')
                      .when(f.col('FAIXA_ETARIA') == '4','31 a 40 anos')
                      .when(f.col('FAIXA_ETARIA') == '5','41 a 50 anos')
                      .when(f.col('FAIXA_ETARIA') == '6','51 a 60 anos')
                      .when(f.col('FAIXA_ETARIA') == '7','61 a 70 anos')
                      .when(f.col('FAIXA_ETARIA') == '8','71 a 80 anos')
                      .when(f.col('FAIXA_ETARIA') == '9','80 anos ou mais')
                      .when(f.col('FAIXA_ETARIA') == '0','Não se aplica')
                     .otherwise(f.lit('Nulo'))
                    )
df_faixa_etaria.drop('FAIXA_ETARIA')

display(df_faixa_etaria.select('DESC_FAIXA_ETARIA','count'))

DESC_FAIXA_ETARIA,count
41 a 50 anos,5277844
51 a 60 anos,4597962
31 a 40 anos,4124948
61 a 70 anos,3276022
71 a 80 anos,1537384
21 a 30 anos,1390167
80 anos ou mais,746162
Não se aplica,471386
13 a 20 anos,116622
0 a 12 anos,27334


#### Nacionalidade dos sócios

Baixando o arquivo com a descrição do país

In [99]:
url = 'http://200.152.38.155/CNPJ/F.K03200$Z.D20514.PAISCSV.zip'

filename = wget.download(url)

with ZipFile(filename, 'r') as zipObj:
    print(f'\nExtraindo {filename} ...')
    zipObj.extractall()

filename_0extension = filename.replace('.zip','')
filename_to_csv = 'DESC_PAIS.CSV'
        
file_oldname = os.path.join("C:/Users/thiago.silva/CNPJ_RFB_PI_IV", filename_0extension)
file_newname_newfile = os.path.join("C:/Users/thiago.silva/CNPJ_RFB_PI_IV", filename_to_csv)
os.rename(file_oldname, file_newname_newfile)

DeleteZipFile = filename

try:
    os.remove(DeleteZipFile)
except OSError as e:
    print(e)
else:
    print(f'Arquivo {DeleteZipFile} não será mais necessário e por isso foi deletado!')
    print(f'Arquivo {filename_to_csv} disponibilizado em C:/Users/thiago.silva/CNPJ_RFB_PI_IV')

  0% [                                                                                ]    0 / 2745100% [................................................................................] 2745 / 2745
Extraindo F.K03200$Z.D20514.PAISCSV.zip ...
Arquivo F.K03200$Z.D20514.PAISCSV.zip não será mais necessário e por isso foi deletado!
Arquivo DESC_PAIS.CSV disponibilizado em C:/Users/thiago.silva/CNPJ_RFB_PI_IV


In [103]:
csvSchema = StructType(
    [StructField('PAIS_SOCIO_ESTRANGEIRO',StringType(),True),\
     StructField('DESC_PAIS_SOCIO_ESTRANGEIRO',StringType(),True)])

tb_pais = spark.read.csv('DESC_PAIS.CSV', header=None, schema=csvSchema, sep=';', encoding='windows-1252')

In [102]:
df_pais_socio = socio\
                  .select('PAIS_SOCIO_ESTRANGEIRO')\
                  .groupby('PAIS_SOCIO_ESTRANGEIRO')\
                  .count()\
                  .sort('count',ascending=False)

In [104]:
df_pais_socio = df_pais_socio.join(tb_pais,['PAIS_SOCIO_ESTRANGEIRO'],'left')

In [106]:
df_pais_socio.select('DESC_PAIS_SOCIO_ESTRANGEIRO','count').show(truncate=False)

+---------------------------+--------+
|DESC_PAIS_SOCIO_ESTRANGEIRO|count   |
+---------------------------+--------+
|null                       |21484051|
|ESTADOS UNIDOS             |15818   |
|ITALIA                     |7303    |
|ESPANHA                    |6166    |
|PORTUGAL                   |5333    |
|URUGUAI                    |4243    |
|ALEMANHA                   |3731    |
|FRANCA                     |3665    |
|Países Baixos (Holanda)    |3392    |
|ARGENTINA                  |3387    |
|SUICA                      |2136    |
|null                       |2118    |
|VIRGENS, ILHAS (BRITANICAS)|2055    |
|REINO UNIDO                |1683    |
|China, República Popular   |1633    |
|PANAMA                     |1490    |
|CANADA                     |1376    |
|LUXEMBURGO                 |1111    |
|JAPAO                      |1024    |
|BELGICA                    |795     |
+---------------------------+--------+
only showing top 20 rows

