In [2]:
#create a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

#print spark version
print(spark.version)

3.5.0


In [3]:
#read CNAES data
df_cnaes = spark.read.csv('/home/jovyan/data/cnaes.csv',\
                        header=True, inferSchema=True,\
                            sep=';', encoding='ISO-8859-1')
#print the df_cnaes schema
df_cnaes.printSchema()

root
 |-- CNAE: integer (nullable = true)
 |-- DESCRICAO_CNAE: string (nullable = true)



In [None]:
#count the rows
df_cnaes.count()

In [None]:
# show df_cnaes
df_cnaes.show(5, truncate=False)

In [4]:
#read dataset estabelecimentos
df_estabelecimentos1 = spark.read.csv('/home/jovyan/data/estabelecimentos-1.csv',\
                        header=True, inferSchema=True, sep=';',\
                        encoding = 'ISO-8859-1')

df_estabelecimentos2 = spark.read.csv('/home/jovyan/data/estabelecimentos-2.csv',\
                        header=True, inferSchema=True, sep=';',\
                        encoding = 'ISO-8859-1')

df_estabelecimentos3 = spark.read.csv('/home/jovyan/data/estabelecimentos-3.csv',\
                        header=True, inferSchema=True, sep=';',\
                        encoding = 'ISO-8859-1')

In [5]:
#union csv files estabelecimentos-1, 2 e 3
df_estabelecimentos = df_estabelecimentos1.union(df_estabelecimentos2).union(df_estabelecimentos3)
df_estabelecimentos.coalesce(1).write.format("csv")\
    .option("delimiter", ";")\
    .option("header", "true")
#    .save("/home/jovyan/data/estabelecimentos.csv")

<pyspark.sql.readwriter.DataFrameWriter at 0x7f613c09ef90>

In [None]:
df_estabelecimentos.printSchema()

In [6]:
#rename column 'CNAE_PRINCIPAL' to 'CNAE'
df_estabelecimentos = df_estabelecimentos.withColumnRenamed('CNAE_PRINCIPAL','CNAE')

In [None]:
df_estabelecimentos.printSchema()

In [None]:
df_estabelecimentos.show(5, truncate=False)

In [7]:
#left join estabelecimentos
estabelecimento_cnaes = df_estabelecimentos.join(df_cnaes, on='CNAE', how='left')

In [None]:
estabelecimento_cnaes.show(5, truncate=False)

In [None]:
estabelecimento_cnaes.count()

In [None]:
df_estabelecimentos.count()

#### Pergunta 1: Quantos estabelecimentos existem?

In [None]:
df_estabelecimentos.count()

In [None]:
df_estabelecimentos.distinct().count()

#### Pergunta 2: Na tabela de estabelecimentos, quantas colunas existem e quantas são identificadas pelo spark como números? Use inferScehema ao ler os arquivos

In [None]:
num_columns = len(df_estabelecimentos.columns)
print(num_columns)

In [None]:
df_estabelecimentos.printSchema()

In [None]:
#count columns using list comprehension
num_numeric_columns = len([col for col, dtype in df_estabelecimentos.dtypes if dtype in ('integer')])
print("Number of numeric columns:", num_numeric_columns)


#### Pegunta 3: Usando estabelecimentos_df.write.parquet("estabelecimentos.parquet"), compare o tamanho dos arquivos parquet com os arquivos CSV originais. A economia de espaço foi da ordem de?

In [None]:
import os

csv_size1 = os.path.getsize('/home/jovyan/data/estabelecimentos-1.csv')
csv_size2 = os.path.getsize('/home/jovyan/data/estabelecimentos-2.csv')
csv_size3 = os.path.getsize('/home/jovyan/data/estabelecimentos-3.csv')

csv_size = csv_size1 + csv_size2 + csv_size3
print(f'Tamanho do arquivo CSV: {csv_size} bytes')

In [None]:
df_estabelecimentos_size = df_estabelecimentos.count()
df_estabelecimentos_size

In [None]:
#create parquet file
df_estabelecimentos.write.mode("overwrite").parquet("/home/jovyan/data/estabelecimentos.parquet")

In [None]:
parquet_size = os.path.getsize("/home/jovyan/data/estabelecimentos.parquet")
parquet_size

In [None]:
size_difference = df_estabelecimentos_size - parquet_size
percentage_difference = size_difference / df_estabelecimentos_size * 100
print("Size difference:", size_difference, "bytes")

print("Parquet file is", percentage_difference, "% smaller than CSV file")

In [None]:
estabelecimento_cnaes.show(5,truncate=False)

#### Pergunta 4: Vamos usar Spark SQL para obter algumas contagens. Primeiro, vamos ver quantos estabelecimentos não tem logradouro cadastrado.

In [None]:
df_estabelecimentos.createOrReplaceTempView("estabelecimentos")

In [None]:
spark.sql("SELECT * FROM estabelecimentos").show(5)

In [None]:
spark.sql("SELECT COUNT(*) FROM estabelecimentos\
            WHERE CNPJ_BASICO IS NULL").show()

In [None]:
spark.sql("SELECT COUNT(*) FROM estabelecimentos\
            WHERE LOGRADOURO IS NULL").show()

#### Pergunta 5: Contar quantos estabelecimentos ficam localizados em uma avenida

In [8]:
df_estabelecimentos.show(5,truncate=False)

+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+----+---------------------+-------+---------------+---------------+-----------------+------+---------------------------------+---------+--------+---+---------+-----+--------+-----+--------+-------+--------+---------------------------+-----------------+----------------------+
|CNPJ_BASICO|CNPJ_ORDEM|CNPJ_DV|IDENTIFICADOR_MATRIZ_FILIAL|NOME_FANTASIA|SITUACAO_CADASTRAL|DATA_SITUACAO_CADASTRAL|MOTIVO_SITUACAO_CADASTRAL|NOME_DA_CIDADE_NO_EXTERIOR|PAIS|DATA_INICIO_ATIVIDADE|CNAE   |CNAE_SECUNDARIA|TIPO_LOGRADOURO|LOGRADOURO       |NUMERO|COMPLEMENTO                      |BAIRRO   |CEP     |UF |MUNICIPIO|DDD_1|TEL_1   |DDD_2|TEL_2   |DDD_FAX|FAX     |CORREIO_ELETRONICO         |SITUACAO_ESPECIAL|DATA_SITUACAO_ESPECIAL|
+-----------+----------+-------+---------------------------+-------------+------------------+---------------

In [9]:
df_estabelecimentos.createOrReplaceTempView("estabelecimentos")

In [10]:
spark.sql("SELECT COUNT(*) FROM estabelecimentos\
            WHERE LOGRADOURO IS NOT NULL\
            AND UPPER(LOGRADOURO) LIKE 'AVENIDA%'").show()

+--------+
|count(1)|
+--------+
|   52587|
+--------+



In [13]:
spark.sql("SELECT COUNT(*) FROM estabelecimentos\
    WHERE UPPER(TIPO_LOGRADOURO) = 'RUA'").show()

+--------+
|count(1)|
+--------+
|13755467|
+--------+



In [12]:
spark.sql("SELECT COUNT(*) FROM estabelecimentos\
    WHERE UPPER(TIPO_LOGRADOURO) = 'AVENIDA'").show()

+--------+
|count(1)|
+--------+
| 3972511|
+--------+



#### Pergunta 6: Quantos CEPs distintos existem entre os estabelecimentos?

In [16]:
spark.sql("SELECT * FROM estabelecimentos LIMIT 5").show()

+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+----+---------------------+-------+---------------+---------------+-----------------+------+--------------------+---------+--------+---+---------+-----+--------+-----+--------+-------+--------+--------------------+-----------------+----------------------+
|CNPJ_BASICO|CNPJ_ORDEM|CNPJ_DV|IDENTIFICADOR_MATRIZ_FILIAL|NOME_FANTASIA|SITUACAO_CADASTRAL|DATA_SITUACAO_CADASTRAL|MOTIVO_SITUACAO_CADASTRAL|NOME_DA_CIDADE_NO_EXTERIOR|PAIS|DATA_INICIO_ATIVIDADE|   CNAE|CNAE_SECUNDARIA|TIPO_LOGRADOURO|       LOGRADOURO|NUMERO|         COMPLEMENTO|   BAIRRO|     CEP| UF|MUNICIPIO|DDD_1|   TEL_1|DDD_2|   TEL_2|DDD_FAX|     FAX|  CORREIO_ELETRONICO|SITUACAO_ESPECIAL|DATA_SITUACAO_ESPECIAL|
+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+-----

In [None]:
spark.sql("SELECT COUNT(*) FROM estabelecimentos\
            WHERE CEP IS NULL").show()

In [14]:
spark.sql("SELECT COUNT(DISTINCT CEP) AS CEPs_distintos\
            FROM estabelecimentos\
            WHERE CEP IS NOT NULL").show()

+--------------+
|CEPs_distintos|
+--------------+
|        958396|
+--------------+



In [17]:
df_estabelecimentos.select('CEP').distinct().count()

958397

#### Pergunta 7: Quantos CNAEs existem na tabela de CNAES?

In [None]:
df_cnaes.createOrReplaceTempView("cnaes")

In [None]:
spark.sql("SELECT * FROM cnaes").show(5)

In [None]:
spark.sql("SELECT COUNT(CNAE) FROM cnaes").show()

#### Pergunta 8: Quantos estabelecimentos possuem um CNAE relacionado a cultivo?

In [None]:
estabelecimento_cnaes.createOrReplaceTempView("estabelecimento_cnaes")

In [None]:
spark.sql("SELECT COUNT(*) FROM estabelecimento_cnaes\
            WHERE UPPER(DESCRICAO_CNAE) LIKE 'CULTIVO%'").show()

#### Pergunta 11: Quantos estabelecimentos são filiais?

In [21]:
df_estabelecimentos.printSchema()

root
 |-- CNPJ_BASICO: integer (nullable = true)
 |-- CNPJ_ORDEM: integer (nullable = true)
 |-- CNPJ_DV: integer (nullable = true)
 |-- IDENTIFICADOR_MATRIZ_FILIAL: integer (nullable = true)
 |-- NOME_FANTASIA: string (nullable = true)
 |-- SITUACAO_CADASTRAL: integer (nullable = true)
 |-- DATA_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- MOTIVO_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- NOME_DA_CIDADE_NO_EXTERIOR: string (nullable = true)
 |-- PAIS: integer (nullable = true)
 |-- DATA_INICIO_ATIVIDADE: integer (nullable = true)
 |-- CNAE: integer (nullable = true)
 |-- CNAE_SECUNDARIA: string (nullable = true)
 |-- TIPO_LOGRADOURO: string (nullable = true)
 |-- LOGRADOURO: string (nullable = true)
 |-- NUMERO: string (nullable = true)
 |-- COMPLEMENTO: string (nullable = true)
 |-- BAIRRO: string (nullable = true)
 |-- CEP: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- MUNICIPIO: integer (nullable = true)
 |-- DDD_1: string (nullable = true)
 |-- TEL_

In [18]:
df_estabelecimentos.show(5,truncate=False)

+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+----+---------------------+-------+---------------+---------------+-----------------+------+---------------------------------+---------+--------+---+---------+-----+--------+-----+--------+-------+--------+---------------------------+-----------------+----------------------+
|CNPJ_BASICO|CNPJ_ORDEM|CNPJ_DV|IDENTIFICADOR_MATRIZ_FILIAL|NOME_FANTASIA|SITUACAO_CADASTRAL|DATA_SITUACAO_CADASTRAL|MOTIVO_SITUACAO_CADASTRAL|NOME_DA_CIDADE_NO_EXTERIOR|PAIS|DATA_INICIO_ATIVIDADE|CNAE   |CNAE_SECUNDARIA|TIPO_LOGRADOURO|LOGRADOURO       |NUMERO|COMPLEMENTO                      |BAIRRO   |CEP     |UF |MUNICIPIO|DDD_1|TEL_1   |DDD_2|TEL_2   |DDD_FAX|FAX     |CORREIO_ELETRONICO         |SITUACAO_ESPECIAL|DATA_SITUACAO_ESPECIAL|
+-----------+----------+-------+---------------------------+-------------+------------------+---------------

In [25]:
from pyspark.sql.functions import col

df_estabelecimentos.select('IDENTIFICADOR_MATRIZ_FILIAL')\
    .where(col('IDENTIFICADOR_MATRIZ_FILIAL') == 2).count()

1093082