<a href="https://colab.research.google.com/github/mzsmarcos/projeto_pipelineETL/blob/main/Etapa_8_categoria%C3%A7%C3%A3o.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Categorização

Configuração do PySpark -  instalar o PySpark no Colab

In [None]:
# Instalar o PySpark
!pip install pyspark




Executar a instalação do PySpark e configurar o ambiente Spark.

Isso é feito definindo algumas variáveis de ambiente para garantir que o Spark funcione corretamente no Colab.

In [None]:
# Instalar Java 8
!apt-get install openjdk-8-jdk -y

# Baixar o Apache Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# Extrair o Apache Spark
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Definir variáveis de ambiente
import os # Módulo para interagir com o sistema operacional, como manipulação de arquivos e diretórios
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Instalar as bibliotecas PySpark e Findspark
!pip install -q pyspark==3.5.0 findspark



Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java libatk-wrapper-java-jni libfontenc1
  libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin libgtk2.0-common libice-dev librsvg2-common
  libsm-dev libxkbfile1 libxt-dev libxtst6 libxxf86dga1 openjdk-8-jdk-headless openjdk-8-jre
  openjdk-8-jre-headless x11-utils
Suggested packages:
  gvfs libice-doc libsm-doc libxt-doc openjdk-8-demo openjdk-8-source visualvm libnss-mdns
  fonts-nanum fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei
  fonts-indic mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java libatk-wrapper-java-jni libfontenc1
  libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin libgtk2.0-common libice-dev librsvg2-common
  libsm-dev libxkbfile1 libxt-dev libxtst6 libxxf86dga1 openjdk-

In [None]:
# Verificar se o arquivo foi baixado
!ls -lh


total 382M
drwxr-xr-x  1 root root 4.0K Mar 17 13:32 sample_data
drwxr-xr-x 13 1000 1000 4.0K Sep  9  2023 spark-3.5.0-bin-hadoop3
-rw-r--r--  1 root root 382M Sep  9  2023 spark-3.5.0-bin-hadoop3.tgz


Criar a Sessão Spark

In [None]:
# Importar as bibliotecas
import findspark # Permite localizar e configurar o Spark no ambiente, garantindo que o PySpark possa ser usado
findspark.init("/content/spark-3.5.0-bin-hadoop3")

from pyspark.sql import SparkSession  # Importa a classe SparkSession para criar uma sessão do Spark e interagir com DataFrames

# Criar a sessão Spark
spark = SparkSession.builder \
    .appName("PySparkExample") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Verificar se a sessão foi criada corretamente
print("Versão do Spark:", spark.version)


Versão do Spark: 3.5.0


In [None]:
# Os
import os

In [None]:
# Montando o Google Drive no Colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# leitura dos arquivos necessários

df_empresas_brasil = spark.read.parquet("/content/drive/MyDrive/Projeto_ETL_Trust_Works/Dados_abertos/parquet/df_empresas_brasil")
df_cnae = spark.read.parquet("/content/drive/MyDrive/Projeto_ETL_Trust_Works/Dados_abertos/parquet/cnae.parquet")

In [None]:
df_empresas_brasil.show(n=20, truncate=False)

+-----------+----------+-------+----------------+-------------------------+-----------------+------------------+---------------+---------------------+---------------------------------------------------------------+--------------+------------------------------+------+-------------+------------------+--------+---+------------+------------+------------+------------+------------+------------+-----------------------------+-----------------+----------------------+------------------+------------------------------------------------------------+--------------------------+---------------------+----------------------------+--------------------+-----------------------------+------------------------+-----------------------------+----------------------+----------------------------+------------------------------------------+--------------+-----------------+--------------------+-------------+------------------+-----------------+------------+--------------+-------------+--------------+-------------+---

In [None]:
# Nulos cod_cnae

from pyspark.sql import functions as F

df_empresas_brasil.select(F.count(F.when(F.col("cod_cnae").isNull(), True)).alias("Nulos_cod_cnae")).show()

+--------------+
|Nulos_cod_cnae|
+--------------+
|             9|
+--------------+



In [None]:
# Formatação dos Cnaes do df_empresas_brasil e df_cnae

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Função para formatar o CNAE
def formatar_cnae(cnae_str):
    if cnae_str:
        cnae_str = str(cnae_str) # garante que seja string
        tamanho = len(cnae_str)
        if tamanho == 7:
            return f"{cnae_str[:4]}-{cnae_str[4:5]}/{cnae_str[5:]}"
        elif tamanho == 8:
            return f"{cnae_str[:4]}-{cnae_str[4:5]}/{cnae_str[5:]}"
        else:
            return None # Retorna None para códigos com tamanhos inválidos
    return None

formatar_cnae_udf = F.udf(formatar_cnae, StringType())

# Aplica a função ao df_empresas brasil
df_empresas_brasil = df_empresas_brasil.withColumn(
    "cod_cnae_formatado", formatar_cnae_udf(F.col("cod_cnae"))
)

# Aplica a função ao df_cnae
df_cnae = df_cnae.withColumn(
    "cod_cnae_formatado", formatar_cnae_udf(F.col("cod_cnae"))
)

df_empresas_brasil.select("cod_cnae", "cod_cnae_formatado").show(10) #Mostra os resultados
df_cnae.select("cod_cnae", "cod_cnae_formatado").show(10) #Mostra os resultados

+--------+------------------+
|cod_cnae|cod_cnae_formatado|
+--------+------------------+
| 9430800|         9430-8/00|
| 5611205|         5611-2/05|
| 9512600|         9512-6/00|
| 8112500|         8112-5/00|
| 4789099|         4789-0/99|
| 4731800|         4731-8/00|
| 4642701|         4642-7/01|
| 8599699|         8599-6/99|
| 5611201|         5611-2/01|
| 4772500|         4772-5/00|
+--------+------------------+
only showing top 10 rows

+--------+------------------+
|cod_cnae|cod_cnae_formatado|
+--------+------------------+
| 0111301|         0111-3/01|
| 0111302|         0111-3/02|
| 0111303|         0111-3/03|
| 0111399|         0111-3/99|
| 0112101|         0112-1/01|
| 0112102|         0112-1/02|
| 0112199|         0112-1/99|
| 0113000|         0113-0/00|
| 0114800|         0114-8/00|
| 0115600|         0115-6/00|
+--------+------------------+
only showing top 10 rows



In [None]:
# Criação do dicionário de categorias


# Código cria um dicionário das categorias com chave e valor.
codigos_cnae_unicos = [
    row.cod_cnae_formatado
    for row in df_cnae.select("cod_cnae_formatado").collect()
]

categorias_cnae = {}

# 62 = Tecnologia e Inovação (TI e Software)

tecnologia_e_inovacao = [
    "6201-5/00",  # Desenvolvimento de programas de computador sob encomenda
    "6202-3/00",  # Desenvolvimento e licenciamento de programas de computador customizáveis
    "6203-1/00",  # Desenvolvimento e licenciamento de programas de computador não customizáveis
    "6204-0/00",  # Consultoria em tecnologia da informação
    "6209-1/00",  # Suporte técnico, manutenção e outros serviços em tecnologia da informação
    "6311-7/00",  # Tratamento de dados, provedores de serviços de aplicação e hospedagem na internet
    "6312-5/00",  # Portais, provedores de conteúdo e outros serviços de informação na internet
]

# Mapeamento das categorias
for cnae in tecnologia_e_inovacao:
    categorias_cnae[cnae] = "Tecnologia e Inovação"





# 73 = Agências de Marketing e Publicidade

agencias_de_marketing_e_publicidade = [
    "7311-4/00",  # Agências de publicidade
    "7312-2/00",  # Criação publicitária
    "7319-0/01",  # Consultoria em publicidade
    "7319-0/02",  # Promoção de vendas
    "7319-0/03",  # Marketing direto
    "7319-0/04",  # Publicidade na internet
    "7311-4/00",  # Agências de publicidade
    "7319-0/02",  # Promoção de vendas
    "7319-0/03",  # Marketing direto
    "7319-0/99"   # Outras atividades de publicidade não especificadas anteriormente
]

for cnae in agencias_de_marketing_e_publicidade:
    categorias_cnae[cnae] = "Agências de Marketing e Publicidade"



# Plataformas de emprego e recrutamento
plataformas_de_emprego_e_recrutamento = [
    "7810-8/00",  # Seleção e agenciamento de mão-de-obra
    "6319-4/00"   # Portais, provedores de conteúdo e outros serviços de informação na internet
]

for cnae in plataformas_de_emprego_e_recrutamento:
    categorias_cnae[cnae] = "Plataformas de Emprego e Recrutamento"

# 92 = Apostas
apostas = [
    "9200-3/01",  # Loterias
    "9200-3/02",  # Bingos
    "9200-3/03",  # Casas de apostas
    "9200-3/99"   # Exploração de jogos de azar e apostas não especificados anteriormente
]

for cnae in apostas:
    categorias_cnae[cnae] = "Apostas"

# 96 = Beleza e Estética
beleza_e_estetica = [
    "9602-5/01",  # Cabeleireiros
    "9602-5/02",  # Institutos de beleza
    "9602-5/03"   # Serviços de estética
]

for cnae in beleza_e_estetica:
    categorias_cnae[cnae] = "Beleza e Estética"

# Bancos
bancos = [
    "6422-3/00",  # Bancos comerciais
    "6423-1/00",  # Bancos múltiplos, com carteira comercial
    "6424-0/00"   # Caixas econômicas
]

for cnae in bancos:
    categorias_cnae[cnae] = "Bancos"

# Financeiras
financeiras = [
    "6431-0/00",  # Sociedades de crédito, financiamento e investimento - Financeiras
    "6432-8/00",  # Sociedades de crédito imobiliário
    "6433-6/00",  # Sociedades de crédito ao microempreendedor
    "6440-9/00"   # Sociedades de capitalização
]

for cnae in financeiras:
    categorias_cnae[cnae] = "Financeiras"

# Educação
educacao = [
    "8511-2/00",  # Educação infantil - creche
    "8512-1/00",  # Educação infantil - pré-escola
    "8520-1/00",  # Ensino fundamental
    "8531-7/00",  # Ensino médio
    "8532-5/00",  # Educação profissional de nível técnico
    "8541-4/00",  # Educação superior - graduação
    "8542-2/00",  # Educação superior - graduação e pós-graduação
    "8591-1/00",  # Ensino de esportes
    "8592-9/99",  # Ensino de arte e cultura não especificado anteriormente
    "8599-6/99"   # Outras atividades de ensino não especificadas anteriormente
]

for cnae in educacao:
    categorias_cnae[cnae] = "Educação"

# Conteúdo sensível (sex shop e acompanhantes)
conteudo_sensivel = [
    "4789-0/05",  # Comércio varejista de artigos eróticos
    "9609-2/99"   # Outras atividades de serviços pessoais não especificadas anteriormente
]

for cnae in conteudo_sensivel:
    categorias_cnae[cnae] = "Conteúdo Sensível"


# Seguradoras
seguradoras = [
    "6511-1/00",  # Seguros de vida
    "6512-0/00",  # Seguros não-vida
    "6520-1/00",  # Seguros saúde
    "6530-8/00"   # Previdência complementar fechada
]

for cnae in seguradoras:
    categorias_cnae[cnae] = "Seguradoras"

# Moda
moda = [
    "4771-7/01",  # Comércio varejista de artigos do vestuário e acessórios
    "1411-8/01",  # Confecção de roupas íntimas
    "1412-6/01",  # Confecção de peças do vestuário, exceto roupas íntimas e as confeccionadas sob medida
    "1412-6/02",  # Confecção, sob medida, de peças do vestuário, exceto roupas íntimas
    "1413-4/00",  # Facção de roupas
    "7410-2/00"   # Atividades de design
]

for cnae in moda:
    categorias_cnae[cnae] = "Moda"


# Comércio e varejo (todas as subclasses iniciadas com '47' correspondem ao comércio varejista)
comercio_e_varejo = [
    "4711-3/01", "4711-3/02", "4712-1/00", "4721-1/02", "4721-1/03",

]

for cnae in comercio_e_varejo:
    categorias_cnae[cnae] = "Comércio e Varejo"

# Pet
pet = [
    "4776-6/01",  # Comércio varejista de animais vivos e de artigos e alimentos para animais de estimação
    "9609-2/05",  # Serviços de estética para animais domésticos
    "7500-1/00"   # Atividades veterinárias
]

for cnae in pet:
    categorias_cnae[cnae] = "Pet"

# Saúde
saude = [
    "8610-1/01",  # Atividades de hospitais gerais
    "8621-6/01",  # Atividades de atendimento hospitalar, exceto pronto-socorro e unidades para atendimento a urgências
    "8622-4/00",  # Atividades de atendimento em pronto-socorro e unidades hospitalares para atendimento a urgências
    "8630-5/00"   # Atividades de atenção ambulatorial executadas por médicos e odontólogos
]

for cnae in saude:
    categorias_cnae[cnae] = "Saúde"

# Turismo e Lazer
turismo_e_lazer = [
    "7911-2/00",  # Agências de viagens
    "7912-1/00",  # Operadores turísticos
    "7990-2/00",  # Serviços de reservas e outros serviços de turismo não especificados anteriormente
    "9321-8/00",  # Parques de diversão e parques temáticos
    "9329-8/99"   # Outras atividades de recreação e lazer não especificadas anteriormente
]

for cnae in turismo_e_lazer:
    categorias_cnae[cnae] = "Turismo e Lazer"

# Transporte
transporte = [
    "4911-6/00", "4912-4/01", "4912-4/02",  # Transporte ferroviário
    "4921-0/01", "4921-0/02", "4922-8/00",  # Transporte rodoviário de carga
    "4930-2/01", "4930-2/02", "4930-2/03",  # Transporte rodoviário coletivo de passageiros
    "5011-4/01", "5011-4/02",                # Transporte marítimo de cabotagem
    "5021-1/01", "5021-1/02",                # Transporte por navegação interior
    "5111-1/00", "5112-0/00"                 # Transporte aéreo de passageiros
]

for cnae in transporte:
    categorias_cnae[cnae] = "Transporte"

# Setor imobiliário
setor_imobiliario = [
    "6810-2/01",  # Compra e venda de imóveis próprios
    "6810-2/02",  # Aluguel de imóveis próprios
    "6821-8/01",  # Corretagem na compra e venda e avaliação de imóveis
    "6822-6/00"   # Gestão e administração da propriedade imobiliária
]

for cnae in setor_imobiliario:
    categorias_cnae[cnae] = "Setor Imobiliário"

# 51 = Empresas aéreas
empresas_aereas = [
    "5111-1/00",  # Transporte aéreo de passageiros regular
    "5112-0/00",  # Transporte aéreo de passageiros não regular
    "5120-0/00"   # Transporte aéreo de carga
]

for cnae in empresas_aereas:
    categorias_cnae[cnae] = "Empresas Aéreas"

# Veículos
veiculos = [
    "2910-7/01",  # Fabricação de automóveis, camionetas e utilitários
    "2910-7/02",  # Fabricação de chassis com motor para automóveis, camionetas e utilitários
    "2920-4/01",  # Fabricação de caminhões e ônibus
    "2920-4/02",  # Fabricação de chassis com motor para caminhões e ônibus
    "4511-1/01",  # Comércio a varejo de automóveis, camionetas e utilitários novos
    "4511-1/02",  # Comércio a varejo de automóveis, camionetas e utilitários usados
    "4511-1/03",  # Comércio por atacado de automóveis, camionetas e utilitários novos e usados
    "4520-0/01"   # Serviços de manutenção e reparação mecânica de veículos automotores
]

for cnae in veiculos:
    categorias_cnae[cnae] = "Veículos"

# 10 e 11 = Alimentos e Bebidas
alimentos_e_bebidas = [
    # Códigos iniciados por '10' - Fabricação de produtos alimentícios
    "1011-2/01", "1011-2/02", "1012-1/01", "1012-1/02", "1012-1/03",
    "1013-0/00", "1019-0/01", "1019-0/99", "1020-1/00", "1031-7/00",
    "1032-5/00", "1033-3/01", "1033-3/02", "1039-2/00", "1041-4/00",
    "1042-2/00", "1051-1/00", "1052-0/00", "1053-8/00", "1054-6/00",
    "1055-4/00", "1059-7/00", "1061-3/00", "1062-1/00", "1063-0/00",
    "1064-8/00", "1065-6/00", "1069-9/00", "1071-1/00", "1072-9/00",
    "1073-7/00", "1074-5/00", "1079-6/00", "1081-8/00", "1082-6/00",
    "1083-4/00", "1084-2/00", "1091-5/00", "1092-3/00", "1093-1/00",
    "1094-0/00", "1095-8/00", "1096-6/00", "1099-1/01", "1099-1/99",
    "4639-7/01"
    # Códigos iniciados por '11' - Fabricação de bebidas
    "1101-7/01", "1101-7/02", "1102-5/01", "1102-5/02", "1102-5/03",
    "1102-5/04", "1103-3/02", "1103-3/03", "1103-3/04", "1104-1/00",
    "1105-0/00", "1106-8/00", "1107-6/00"
]

for cnae in alimentos_e_bebidas:
    categorias_cnae[cnae] = "Alimentos e Bebidas"

# Construção e Infraestrutura
construcao_e_infraestrutura = [
    "4110-7/01",  # Construção de edifícios residenciais
    "4110-7/02",  # Construção de edifícios comerciais
    "4211-2/00",  # Construção de rodovias
    "4299-5/99"   # Outras obras de construção não especificadas anteriormente
]

for cnae in construcao_e_infraestrutura:
    categorias_cnae[cnae] = "Construção e Infraestrutura"

# Agronegócio e Pecuária
agronegocio_e_pecuaria = [
    "0111-3/01",  # Cultivo de cereais
    "0112-1/01",  # Cultivo de soja
    "0141-1/00",  # Criação de bovinos
    "0142-0/00",  # Criação de suínos
    "0143-8/00"   # Criação de aves
]

for cnae in agronegocio_e_pecuaria:
    categorias_cnae[cnae] = "Agronegócio e Pecuária"

# Logística e Armazenagem
logistica_e_armazenagem = [
    "5229-1/00",  # Armazenamento e depósito de mercadorias
    "5320-2/00"   # Serviços de entrega e transporte de encomendas
]

for cnae in logistica_e_armazenagem:
    categorias_cnae[cnae] = "Logística e Armazenagem"

# Serviços Administrativos e de Apoio
servicos_administrativos_e_apoio = [
    "8211-1/00",  # Serviços combinados de escritório e apoio administrativo
    "8219-9/00"   # Outras atividades de apoio administrativo
]

for cnae in servicos_administrativos_e_apoio:
    categorias_cnae[cnae] = "Serviços Administrativos e de Apoio"

# Consultoria e Treinamento Profissional
consultoria_e_treinamento = [
    "7020-4/00",  # Consultoria em gestão empresarial
    "8599-6/99"   # Outras atividades de ensino e treinamentos profissionais não especificadas anteriormente
]

for cnae in consultoria_e_treinamento:
    categorias_cnae[cnae] = "Consultoria e Treinamento Profissional"

print(categorias_cnae)


{'6201-5/00': 'Tecnologia e Inovação', '6202-3/00': 'Tecnologia e Inovação', '6203-1/00': 'Tecnologia e Inovação', '6204-0/00': 'Tecnologia e Inovação', '6209-1/00': 'Tecnologia e Inovação', '6311-7/00': 'Tecnologia e Inovação', '6312-5/00': 'Tecnologia e Inovação', '7311-4/00': 'Agências de Marketing e Publicidade', '7312-2/00': 'Agências de Marketing e Publicidade', '7319-0/01': 'Agências de Marketing e Publicidade', '7319-0/02': 'Agências de Marketing e Publicidade', '7319-0/03': 'Agências de Marketing e Publicidade', '7319-0/04': 'Agências de Marketing e Publicidade', '7319-0/99': 'Agências de Marketing e Publicidade', '7810-8/00': 'Plataformas de Emprego e Recrutamento', '6319-4/00': 'Plataformas de Emprego e Recrutamento', '9200-3/01': 'Apostas', '9200-3/02': 'Apostas', '9200-3/03': 'Apostas', '9200-3/99': 'Apostas', '9602-5/01': 'Beleza e Estética', '9602-5/02': 'Beleza e Estética', '9602-5/03': 'Beleza e Estética', '6422-3/00': 'Bancos', '6423-1/00': 'Bancos', '6424-0/00': 'Ban

In [None]:
# Código inclui a coluna de categorias no df_empresas_brasil utilizando o dicionário criado


def categorizar_empresa(cnae):
    return categorias_cnae.get(cnae, "Outras Categorias")

categorizar_empresa_udf = F.udf(categorizar_empresa, StringType())

df_empresas_brasil = df_empresas_brasil.withColumn(
    "categoria_empresa", categorizar_empresa_udf(F.col("cod_cnae_formatado"))
)

df_empresas_brasil.select("cod_cnae_formatado", "categoria_empresa").show(10)

+------------------+--------------------+
|cod_cnae_formatado|   categoria_empresa|
+------------------+--------------------+
|         9430-8/00|   Outras Categorias|
|         5611-2/05|   Outras Categorias|
|         9512-6/00|   Outras Categorias|
|         8112-5/00|   Outras Categorias|
|         4789-0/99|   Outras Categorias|
|         4731-8/00|   Outras Categorias|
|         4642-7/01|   Outras Categorias|
|         8599-6/99|Consultoria e Tre...|
|         5611-2/01|   Outras Categorias|
|         4772-5/00|   Outras Categorias|
+------------------+--------------------+
only showing top 10 rows



In [None]:
df_empresas_brasil.show(n=20, truncate=False)

+-----------+----------+-------+----------------+-------------------------+-----------------+------------------+---------------+---------------------+---------------------------------------------------------------+--------------+------------------------------+------+-------------+------------------+--------+---+------------+------------+------------+------------+------------+------------+-----------------------------+-----------------+----------------------+------------------+------------------------------------------------------------+--------------------------+---------------------+----------------------------+--------------------+-----------------------------+------------------------+-----------------------------+----------------------+----------------------------+------------------------------------------+--------------+-----------------+--------------------+-------------+------------------+-----------------+------------+--------------+-------------+--------------+-------------+---

In [None]:
# Configurações de memória do Spark
spark = SparkSession.builder \
    .appName("Projeto ETL") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Caminho do diretório no Google Drive
drive_directory = "/content/drive/MyDrive/Projeto_ETL_Trust_Works/Dados_abertos/parquet" # ajustas caminho ao usar código no ambiente


# Configurações otimizadas para PySpark caso preciso
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
spark.conf.set("spark.sql.parquet.mergeSchema", "false")
spark.conf.set("spark.sql.parquet.filterPushdown", "true")
spark.conf.set("spark.sql.files.maxPartitionBytes", "128m")
spark.conf.set("spark.sql.shuffle.partitions", "200")


# Reparticionamento e salvamento do arquivo
    df_empresas_brasil.repartition(200) \
        .write \
        .mode("overwrite") \
        .option("compression", "snappy") \
        .parquet(output_path)

    print(f"DataFrame final salvo com sucesso em: {output_path}")