In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [5]:
print("====================== Read Data =====================")
df_estab = spark.read.format('parquet').load('gs://igti-bootcamp-otacilio/refined/estabelecimento')



In [8]:
df_estab.printSchema()

root
 |-- situacao_cadastral: integer (nullable = true)
 |-- cnpj_basico: string (nullable = true)
 |-- cnpj_ordem: string (nullable = true)
 |-- cnpj_dv: string (nullable = true)
 |-- identificador: string (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_cidade_exterior: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- data_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundario: string (nullable = true)
 |-- tipo_logadouro: string (nullable = true)
 |-- logadouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: integer (nullable = true)
 |-- tel_1: inte

In [37]:
# Qual o código do CNAE mais presente nas empresas ativas? Quantas empresas utilizam esse CNAE?

(
    df_estab
    .where(col('situacao_cadastral_desc')=='ATIVA')
    .groupBy('cnae_fiscal_principal').agg(count('cnae_fiscal_principal').alias('qtd'))
    .orderBy(col('qtd').desc())
    .limit(5)
    .toPandas()
)

# 4781400, 991.316

                                                                                

Unnamed: 0,cnae_fiscal_principal,qtd
0,4781400,991316
1,9602501,754449
2,4399103,458648
3,7319002,453914
4,4712100,444980


In [35]:
# Qual o código do CNAE mais presente nas empresas ativas? Quantas empresas utilizam esse CNAE?
# Precisa somar secundario e primario!

(
    df_estab
    .where(col('situacao_cadastral_desc')=='ATIVA')
    .withColumn('cnae_fiscal_secundario', split(col('cnae_fiscal_secundario'),','))
    .select('*', explode(col('cnae_fiscal_secundario')).alias('unique_sec_cnae'))
    .groupBy('unique_sec_cnae').agg(count('cnae_fiscal_principal').alias('qtd'))
    .where(col('unique_sec_cnae')=="4781400")
    .limit(5)
    .toPandas()
)

# 4781400, 991.316
#991316 + 790242 = 1.781.558

                                                                                

Unnamed: 0,unique_sec_cnae,qtd
0,4781400,790242


In [36]:
991316 + 790242

1781558

In [11]:
# Quantos CNPJs não ativos existem no estado de São Paulo?

(
    df_estab
    .where(col('uf')=='SP')
    .where(col('situacao_cadastral_desc')!='ATIVA')
    .count()
)

#7.966.472

                                                                                

7966472

In [25]:
# Quantas empresas de “Consultoria em tecnologia da informação” existem em Belo Horizonte?

(
    df_estab
    .withColumn('cnae_fiscal_secundario', split(col('cnae_fiscal_secundario'),','))
    .where((col('cnae_desc')=="Consultoria em tecnologia da informação") | (array_contains(col('cnae_fiscal_secundario'), "6204000")))
    .where(col('municipio_desc')=='BELO HORIZONTE')
    .count()
)

#6.930

                                                                                

6930

In [30]:
# Qual o CNAE primário do IGTI?

(
    df_estab
    .where(col('nome_fantasia').like('IGTI'))
    .where(col('identificador_desc')=='MATRIZ')
    .select('cnae_fiscal_principal','cnae_desc')
    .toPandas()
)

# 8532500, Educação superior - graduação e pós-graduação

                                                                                

Unnamed: 0,cnae_fiscal_principal,cnae_desc
0,8532500,Educação superior - graduação e pós-graduação


In [31]:
# Quantas empresas foram abertas desde 2020?

(
    df_estab
    .where(year(col('data_inicio_atividade'))>=2020)
    .count()
)

# 6.314.456

                                                                                

6314456