In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = (
    SparkSession
    .builder
    .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
    .config("fs.gs.auth.service.account.enable", "true")
    .config("fs.gs.auth.service.account.json.keyfile", ".secrets/river-key-314614-9317f78dbb6d.json")
    .config("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")
    .config("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    .getOrCreate()
)

In [3]:
spark

## Lendo dados em trusted

#### CNAE

In [4]:
cnae_path = 'gs://desafio-final/F.K03200$Z.D10710.CNAE.csv'
cnae_schema = 'cod_cnae INT, cnae_descricao STRING'

In [5]:
df_cnae = (
    spark.read
    .format('csv')
    .option('encoding', 'ISO-8859-1')
    .option('sep', ';')
    .option('escape', '\"')
    .schema(cnae_schema)
    .load(cnae_path)
)

In [6]:
df_cnae.printSchema()

root
 |-- cod_cnae: integer (nullable = true)
 |-- cnae_descricao: string (nullable = true)



In [7]:
df_cnae.limit(5).toPandas()

Unnamed: 0,cod_cnae,cnae_descricao
0,111301,Cultivo de arroz
1,111302,Cultivo de milho
2,111303,Cultivo de trigo
3,111399,Cultivo de outros cereais não especificados an...
4,112101,Cultivo de algodão herbáceo


In [8]:
df_cnae.count()

1359

In [9]:
(
    df_cnae.write
    .format('parquet')
    .mode('overwrite')
    .option('compression', 'snappy')
    .save('data-cnpj/trusted/cnae/')
)

#### MUNICIPIOS

In [10]:
munic_path = 'gs://desafio-final/F.K03200$Z.D10710.MUNIC.csv'
munic_schema = 'cod_municipio INT, municipio STRING'

In [11]:
df_munic = (
    spark.read
    .format('csv')
    .option('encoding', 'ISO-8859-1')
    .option('sep', ';')
    .option('escape', '\"')
    .schema(munic_schema)
    .load(munic_path)
)

In [12]:
df_munic.printSchema()

root
 |-- cod_municipio: integer (nullable = true)
 |-- municipio: string (nullable = true)



In [13]:
df_munic.limit(5).toPandas()

Unnamed: 0,cod_municipio,municipio
0,1,GUAJARA-MIRIM
1,2,ALTO ALEGRE DOS PARECIS
2,3,PORTO VELHO
3,4,BURITIS
4,5,JI-PARANA


In [14]:
df_munic.count()

5571

In [15]:
(
    df_munic.write
    .format('parquet')
    .mode('overwrite')
    .option('compression', 'snappy')
    .save('data-cnpj/trusted/munic/')
)

#### ESTABELECIMENTOS

In [34]:
estab_path = 'gs://desafio-final/estabelecimentos'
estab_schema = '''
                cnpj_basico STRING,
                cnpj_ordem STRING,
                cnpj_dv STRING,
                cod_ident_matriz STRING,
                nome_fantasia STRING,
                cod_situacao_cad INT,
                data_situacao_cad STRING,
                cod_motivo_situacao STRING,
                nome_cidade_exterior STRING,
                cod_pais STRING,
                data_inicio STRING,
                cod_cnae STRING,
                cod_cnae_secundaria STRING,
                tipo_logradouro STRING,
                logradouro STRING,
                numero STRING,
                complemento STRING,
                bairro STRING,
                cep STRING,
                uf STRING,
                cod_municipio STRING,
                ddd_1 INT,
                telefone_1 STRING,
                ddd2 INT,
                telefone_2 STRING,
                ddd_fax INT,
                fax STRING,
                email STRING,
                situacao_especial STRING,
                data_situacao_especial STRING
                '''

In [35]:
df_estab = (
    spark.read
    .format('csv')
    .option('encoding', 'ISO-8859-1')
    .option('sep', ';')
    .option('escape', '\"')
    .schema(estab_schema)
    .load(estab_path)
)

In [36]:
df_estab = df_estab.withColumn('data_inicio', F.to_date(F.col('data_inicio'), 'yyyyMMdd'))
df_estab = df_estab.withColumn('data_situacao_cad', F.to_date(F.col('data_situacao_cad'), 'yyyyMMdd'))
df_estab = df_estab.withColumn('data_situacao_especial', F.to_date(F.col('data_situacao_especial'), 'yyyyMMdd'))

In [37]:
df_estab.printSchema()

root
 |-- cnpj_basico: string (nullable = true)
 |-- cnpj_ordem: string (nullable = true)
 |-- cnpj_dv: string (nullable = true)
 |-- cod_ident_matriz: string (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- cod_situacao_cad: integer (nullable = true)
 |-- data_situacao_cad: date (nullable = true)
 |-- cod_motivo_situacao: string (nullable = true)
 |-- nome_cidade_exterior: string (nullable = true)
 |-- cod_pais: string (nullable = true)
 |-- data_inicio: date (nullable = true)
 |-- cod_cnae: string (nullable = true)
 |-- cod_cnae_secundaria: string (nullable = true)
 |-- tipo_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- cod_municipio: string (nullable = true)
 |-- ddd_1: integer (nullable = true)
 |-- telefone_1: string (nullable = true)
 |-- 

In [38]:
df_estab.count()

49198427

In [39]:
(
    df_estab.write
    .format('parquet')
    .mode('overwrite')
    .option('compression', 'snappy')
    .save('data-cnpj/trusted/estab/')
)

## Carregando dataframes em trusted

In [4]:
df_munic_trusted = (
    spark.read
    .format('parquet')
    .load('data-cnpj/trusted/munic/')
)

In [5]:
df_munic_trusted.count()

5571

In [6]:
df_munic_trusted.printSchema()

root
 |-- cod_municipio: integer (nullable = true)
 |-- municipio: string (nullable = true)



In [5]:
df_cnae_trusted = (
    spark.read
    .format('parquet')
    .load('data-cnpj/trusted/cnae/')
)

In [8]:
df_cnae_trusted.count()

1359

In [9]:
df_cnae_trusted.printSchema()

root
 |-- cod_cnae: integer (nullable = true)
 |-- cnae_descricao: string (nullable = true)



In [40]:
df_estab_trusted = (
    spark.read
    .format('parquet')
    .load('data-cnpj/trusted/estab/')
)

In [41]:
df_estab_trusted.count()

49198427

In [42]:
df_estab_trusted.printSchema()

root
 |-- cnpj_basico: string (nullable = true)
 |-- cnpj_ordem: string (nullable = true)
 |-- cnpj_dv: string (nullable = true)
 |-- cod_ident_matriz: string (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- cod_situacao_cad: integer (nullable = true)
 |-- data_situacao_cad: date (nullable = true)
 |-- cod_motivo_situacao: string (nullable = true)
 |-- nome_cidade_exterior: string (nullable = true)
 |-- cod_pais: string (nullable = true)
 |-- data_inicio: date (nullable = true)
 |-- cod_cnae: string (nullable = true)
 |-- cod_cnae_secundaria: string (nullable = true)
 |-- tipo_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- cod_municipio: string (nullable = true)
 |-- ddd_1: integer (nullable = true)
 |-- telefone_1: string (nullable = true)
 |-- 

## Join dataframes em refined


In [43]:
df_refined_cnpj = df_estab_trusted.join(df_munic_trusted, 'cod_municipio')

In [44]:
df_refined_cnpj = df_refined_cnpj.join(df_cnae_trusted, 'cod_cnae')

In [45]:
df_refined_cnpj.count()

49198427

In [46]:
df_refined_cnpj.printSchema()

root
 |-- cod_cnae: string (nullable = true)
 |-- cod_municipio: string (nullable = true)
 |-- cnpj_basico: string (nullable = true)
 |-- cnpj_ordem: string (nullable = true)
 |-- cnpj_dv: string (nullable = true)
 |-- cod_ident_matriz: string (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- cod_situacao_cad: integer (nullable = true)
 |-- data_situacao_cad: date (nullable = true)
 |-- cod_motivo_situacao: string (nullable = true)
 |-- nome_cidade_exterior: string (nullable = true)
 |-- cod_pais: string (nullable = true)
 |-- data_inicio: date (nullable = true)
 |-- cod_cnae_secundaria: string (nullable = true)
 |-- tipo_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- ddd_1: integer (nullable = true)
 |-- telefone_1: string (nullable = true)
 |-- 

In [17]:
(
    df_refined_cnpj.write
    .format('parquet')
    .mode('overwrite')
    .option('compression', 'snappy')
    .save('data-cnpj/refined/cnpj')
)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/camera/anaconda3/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

## Carrega dados de refined

In [None]:
df_refined_cnpj = (
    spark.read
    .format('parquet')
    .load('data-cnpj/refined/cnpj/')
)

In [None]:
df_refined_cnpj.printSchema()

In [None]:
df_refined_cnpj.count()

In [None]:
df_refined_cnpj.limit(5).toPandas()

## Questões

#### 1. Qual o código do CNAE mais presente nas empresas ativas? Quantas empresas utilizam esse CNAE?

In [50]:
df_refined_cnpj.filter(F.expr('cod_situacao_cad = 2')).count()

20231934

In [57]:
(
    df_refined_cnpj
    .filter(F.col('cod_situacao_cad') == 2)
    .withColumn('cnae', F.concat_ws(',', F.col('cod_cnae'), F.col('cod_cnae_secundaria')))
    .withColumn('cnae', F.split(F.col('cnae'), ','))
    .withColumn('cnae', F.explode(F.col('cnae')))
    .groupBy('cnae')
    .agg(F.count('cnpj_basico').alias('qtde'))
    .orderBy(F.col('qtde').desc())
    .show(1)
)

+-------+-------+
|   cnae|   qtde|
+-------+-------+
|4781400|1781558|
+-------+-------+
only showing top 1 row



#### 2. Quantos CNPJs não ativos existem no estado de São Paulo? 

In [96]:
( 
    df_refined_cnpj
    .filter(F.col('cod_situacao_cad') != 2)
    .filter(F.col('uf') == "SP")
    .count()
)

7966472

#### 3. Quantas empresas de “Consultoria em tecnologia da informação” existem em Belo Horizonte? 

In [101]:
(
    df_cnae_trusted
    .filter(F.col('cnae_descricao') == 'Consultoria em tecnologia da informação')
    .show()
)

+--------+--------------------+
|cod_cnae|      cnae_descricao|
+--------+--------------------+
| 6204000|Consultoria em te...|
+--------+--------------------+



In [107]:
( 
    df_refined_cnpj
    .withColumn('cnae', F.concat_ws(',', F.col('cod_cnae'), F.col('cod_cnae_secundaria')))
    .withColumn('cnae', F.split(F.col('cnae'), ','))
    .withColumn('cnae', F.explode(F.col('cnae')))
    .filter(F.col('cnae') == '6204000')
    .filter(F.col('municipio') == 'BELO HORIZONTE')
    .count()
)

6930

####  4. Qual o CNAE primário do IGTI?

Dica: O IGTI está localizado em Belo Horizonte.

In [110]:
(
    df_refined_cnpj
    .select(F.col('nome_fantasia'), F.col('cod_cnae'), F.col('cnae_descricao'))
    .filter(F.col('municipio') == 'BELO HORIZONTE')
    .filter(F.expr('nome_fantasia LIKE "IGTI"'))
    .limit(5)
    .show(truncate=False)            
)

+-------------+--------+---------------------------------------------+
|nome_fantasia|cod_cnae|cnae_descricao                               |
+-------------+--------+---------------------------------------------+
|IGTI         |8532500 |Educação superior - graduação e pós-graduação|
+-------------+--------+---------------------------------------------+



#### 5. Quantas empresas foram abertas desde 2020?

In [112]:
(
    df_refined_cnpj
    .filter(F.year(F.col('data_inicio')) >= '2020')
    .count()
)

6314456

In [113]:
sc.stop()