In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.types as t
import pyspark.sql.functions as f

In [2]:
import pandas as pd
pd.set_option('display.max_columns', None)

In [3]:
spark = (
    SparkSession.builder
    .config('spark.serializer', "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

In [None]:
df_cnae = (
    spark.read
    .format('csv')
    .option('encoding', 'ISO-8859-1')
    .option('sep', ';')
    .option("escape", "\"")
    .schema('cod_cnae STRING, nome_cnae STRING')
    .load('gs://desafio-final/F.K03200$Z.D10710.CNAE.csv')
)

In [None]:
df_mun = (
    spark.read
    .format('csv')
    .option('encoding', 'ISO-8859-1')
    .option('sep', ';')
    .option("escape", "\"")
    .schema('cod_mun STRING, nome_mun STRING')
    .load('gs://desafio-final/F.K03200$Z.D10710.MUNIC.csv')
)

### Questão 9

In [None]:
cols = ['cnpj', 'cnpj_ordem', 'cnpj_dv', 'id_matriz', 'nome_fantasia', 
        'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral',
        'nome_cidade_ext', 'cod_pais', 'data_inicio_atividades', 
        'cnae_primario', 'cnae_secundario', 
        'tipo_logradouro', 'logradouro', 'numero', 'complemento', 
        'bairro', 'cep', 'uf', 'cod_mun', 
        'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_fax', 'fax', 
        'correio_eletronico', 'situacao_especial', 'data_situacao_especial']
schema = t.StructType([t.StructField(c, t.StringType()) for c in cols])

In [None]:
df = (
    spark.read
    .format('csv')
    .option('encoding', 'ISO-8859-1')
    .option('sep', ';')
    .option("escape", "\"")
    .schema(schema)
    .load('gs://desafio-final/estabelecimentos/*')
)

#### Leitura sem escape

In [None]:
df_teste = (
    spark.read
    .format('csv')
    .option('encoding', 'ISO-8859-1')
    .option('sep', ';')
    .schema(schema)
    .load('gs://desafio-final/estabelecimentos/*')
)

In [None]:
df_teste.filter('length(cnae_primario) != 7').limit(5).toPandas()

### Questão 10

In [None]:
df_primario = (
    df.filter('situacao_cadastral = "02"')
    .groupby('cnae_primario')
    .count()
    .withColumnRenamed('cnae_primario', 'cod_cnae')
    .orderBy(f.desc('count'))
)

In [None]:
df_secundario = (
    df.filter('situacao_cadastral = "02"')
    .withColumn('cod_cnae', f.explode(f.split('cnae_secundario', ',')))
    .groupby('cod_cnae')
    .count()
    .orderBy(f.desc('count'))
)

In [None]:
df_primario.unionByName(df_secundario).groupby('cod_cnae').agg(f.sum('count').alias('count')).orderBy(f.desc('count')).limit(5).toPandas()

### Questão 11

In [None]:
df.withColumn('data_inicio_atividades', f.to_date(f.col('data_inicio_atividades'), 'yyyyMMdd')).limit(5).toPandas()

### Questão 12

In [None]:
(
    df
    .filter('uf = "SP" and situacao_cadastral != "02"')
    .count()
)

### Questão 13

In [None]:
df.join(df_mun, 'cod_mun').filter('initcap(nome_mun) = "Belo Horizonte"')\
.filter('(cnae_primario = "6204000" or cnae_secundario like "%6204000%")')\
.count()

### Questão 14

In [None]:
df.join(df_mun, 'cod_mun').filter('nome_fantasia like "%IGTI%" and cod_mun = 4123')\
.toPandas()

## Questão 15

In [None]:
df.withColumn('data_inicio_atividades', f.to_date(f.col('data_inicio_atividades'), 'yyyyMMdd')).filter('data_inicio_atividades >= "2020-01-01"').count()