In [None]:
# Importações necessárias
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import explode, col
import requests

# Configurar SparkSession
spark = (
    SparkSession.builder
    .appName("API to PySpark")
    .master("local")
    .getOrCreate()
)

# Função para consumir dados da API

def fetch_data_from_api(api_url):
    try:
        response = requests.get(api_url)
        response.raise_for_status()  # Verifica erros HTTP
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Erro ao acessar API: {e}")
        return []

# URL da API (exemplo fictício)
api_url = "https://servicodados.ibge.gov.br/api/v3/agregados/1839/periodos/2022/variaveis/630?localidades=N1[all]"

# Obter dados da API
raw_data = fetch_data_from_api(api_url)

print(raw_data)


[{'id': '630', 'variavel': 'Número de empresas', 'unidade': 'Unidades', 'resultados': [{'classificacoes': [{'id': '319', 'nome': 'Faixas de pessoal ocupado', 'categoria': {'104029': 'Total'}}, {'id': '9117', 'nome': 'Tipo de indústria', 'categoria': {'99735': 'Total'}}], 'series': [{'localidade': {'id': '1', 'nivel': {'id': 'N1', 'nome': 'Brasil'}, 'nome': 'Brasil'}, 'serie': {'2022': '346105'}}]}]}]


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Iniciar sessão Spark
spark = SparkSession.builder.appName("DadosAninhados").getOrCreate()

# Criar DataFrame
df = spark.read.json(spark.sparkContext.parallelize(raw_data))

In [None]:
df.printSchema()  # Exibe o esquema do DataFrame
df.show(truncate=False)  # Mostra os dados completos sem truncar

root
 |-- id: string (nullable = true)
 |-- resultados: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- classificacoes: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- categoria: struct (nullable = true)
 |    |    |    |    |    |-- 104029: string (nullable = true)
 |    |    |    |    |    |-- 99735: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- nome: string (nullable = true)
 |    |    |-- series: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- localidade: struct (nullable = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- nivel: struct (nullable = true)
 |    |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |    |-- nome: string (nullable = true)
 |    |    |    |    |    |-- nome: string (n

In [None]:
#  explode para transformar cada elemento do array em uma linha separada
from pyspark.sql.functions import col, explode

df_resultados = df.withColumn("resultado", explode(col("resultados"))).select(
    "id",
    "unidade",
    "variavel",
    col("resultado.classificacoes").alias("classificacoes"),
    col("resultado.series").alias("series")
)
df_resultados.show(truncate=False)


+---+--------+------------------+-------------------------------------------------------------------------------------------+---------------------------------------+
|id |unidade |variavel          |classificacoes                                                                             |series                                 |
+---+--------+------------------+-------------------------------------------------------------------------------------------+---------------------------------------+
|630|Unidades|Número de empresas|[{{Total, NULL}, 319, Faixas de pessoal ocupado}, {{NULL, Total}, 9117, Tipo de indústria}]|[{{1, {N1, Brasil}, Brasil}, {346105}}]|
+---+--------+------------------+-------------------------------------------------------------------------------------------+---------------------------------------+



In [None]:
#  desaninhamos a coluna classificacoes (que também é um array)
df_classificacoes = df_resultados.withColumn("classificacao", explode(col("classificacoes"))).select(
    "id",
    "unidade",
    "variavel",
    col("classificacao.id").alias("classificacao_id"),
    col("classificacao.nome").alias("classificacao_nome"),
    col("classificacao.categoria").alias("categoria"),
    "series"
)
df_classificacoes.show(truncate=False)


+---+--------+------------------+----------------+-------------------------+-------------+---------------------------------------+
|id |unidade |variavel          |classificacao_id|classificacao_nome       |categoria    |series                                 |
+---+--------+------------------+----------------+-------------------------+-------------+---------------------------------------+
|630|Unidades|Número de empresas|319             |Faixas de pessoal ocupado|{Total, NULL}|[{{1, {N1, Brasil}, Brasil}, {346105}}]|
|630|Unidades|Número de empresas|9117            |Tipo de indústria        |{NULL, Total}|[{{1, {N1, Brasil}, Brasil}, {346105}}]|
+---+--------+------------------+----------------+-------------------------+-------------+---------------------------------------+



In [None]:
# desaninhar a coluna series, que também é um array:

df_series = df_classificacoes.withColumn("serie", explode(col("series"))).select(
    "id",
    "unidade",
    "variavel",
    "classificacao_id",
    "classificacao_nome",
    "categoria",
    col("serie.localidade.id").alias("localidade_id"),
    col("serie.localidade.nome").alias("localidade_nome"),
    col("serie.localidade.nivel.id").alias("nivel_id"),
    col("serie.localidade.nivel.nome").alias("nivel_nome"),
    col("serie.serie").alias("serie_valores")
)
df_series.show(truncate=False)


+---+--------+------------------+----------------+-------------------------+-------------+-------------+---------------+--------+----------+-------------+
|id |unidade |variavel          |classificacao_id|classificacao_nome       |categoria    |localidade_id|localidade_nome|nivel_id|nivel_nome|serie_valores|
+---+--------+------------------+----------------+-------------------------+-------------+-------------+---------------+--------+----------+-------------+
|630|Unidades|Número de empresas|319             |Faixas de pessoal ocupado|{Total, NULL}|1            |Brasil         |N1      |Brasil    |{346105}     |
|630|Unidades|Número de empresas|9117            |Tipo de indústria        |{NULL, Total}|1            |Brasil         |N1      |Brasil    |{346105}     |
+---+--------+------------------+----------------+-------------------------+-------------+-------------+---------------+--------+----------+-------------+



In [None]:
df_series.select("categoria").show(truncate=False)


+-------------+
|categoria    |
+-------------+
|{Total, NULL}|
|{NULL, Total}|
+-------------+



In [None]:
df_categoria = df_series.select(
    "*",
    col("categoria.104029").alias("categoria_104029"),
    col("categoria.99735").alias("categoria_99735")
).drop("categoria")

df_categoria.show(truncate=False)


+---+--------+------------------+----------------+-------------------------+-------------+---------------+--------+----------+-------------+----------------+---------------+
|id |unidade |variavel          |classificacao_id|classificacao_nome       |localidade_id|localidade_nome|nivel_id|nivel_nome|serie_valores|categoria_104029|categoria_99735|
+---+--------+------------------+----------------+-------------------------+-------------+---------------+--------+----------+-------------+----------------+---------------+
|630|Unidades|Número de empresas|319             |Faixas de pessoal ocupado|1            |Brasil         |N1      |Brasil    |{346105}     |Total           |NULL           |
|630|Unidades|Número de empresas|9117            |Tipo de indústria        |1            |Brasil         |N1      |Brasil    |{346105}     |NULL            |Total          |
+---+--------+------------------+----------------+-------------------------+-------------+---------------+--------+----------+----

In [None]:
from pyspark.sql.functions import create_map, lit, explode

# Converter para MAP antes de explodir
df_series = df_series.withColumn(
    "categoria_map",
    create_map(lit("104029"), col("categoria.104029"), lit("99735"), col("categoria.99735"))
)

# Explodir o mapa com dois aliases para chave e valor
# Use explode with alias to create two columns: 'categoria_chave', 'categoria_valor'
df_categoria = df_series.select(
    "*",
    explode(col("categoria_map")).alias("categoria_chave", "categoria_valor")
).drop("categoria", "categoria_map", "unidade").na.drop()  # Remover colunas desnecessárias

df_categoria.show(truncate=False)

+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+-------------+---------------+---------------+
|id |variavel          |classificacao_id|classificacao_nome       |localidade_id|localidade_nome|nivel_id|nivel_nome|serie_valores|categoria_chave|categoria_valor|
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+-------------+---------------+---------------+
|630|Número de empresas|319             |Faixas de pessoal ocupado|1            |Brasil         |N1      |Brasil    |{346105}     |104029         |Total          |
|630|Número de empresas|9117            |Tipo de indústria        |1            |Brasil         |N1      |Brasil    |{346105}     |99735          |Total          |
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+-------------+---------------+---------------+



In [None]:
from pyspark.sql.functions import create_map, lit, explode

# Convert the 'serie_valores' struct to a map
df_categoria = df_categoria.withColumn(
    "serie_valores_map",
    create_map(lit("2022"), col("serie_valores.2022"))  # Create a map with "2022" as key and the value from 'serie_valores.2022'
)

# Explode the map to create separate rows for each year and value
df_final = df_categoria.select(
    "*",
    explode(col("serie_valores_map")).alias("ano", "valor")  # Explode the map and rename columns to 'ano' and 'valor'
).drop("serie_valores", "serie_valores_map")  # Drop the original 'serie_valores' and 'serie_valores_map' columns

df_final.show(truncate=False)

+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+
|id |variavel          |classificacao_id|classificacao_nome       |localidade_id|localidade_nome|nivel_id|nivel_nome|categoria_chave|categoria_valor|ano |valor |
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+
|630|Número de empresas|319             |Faixas de pessoal ocupado|1            |Brasil         |N1      |Brasil    |104029         |Total          |2022|346105|
|630|Número de empresas|9117            |Tipo de indústria        |1            |Brasil         |N1      |Brasil    |99735          |Total          |2022|346105|
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+



In [None]:
df_categoria.select("serie_valores").show(truncate=False)

+-------------+
|serie_valores|
+-------------+
|{346105}     |
|{346105}     |
+-------------+



In [None]:
df_final = df_categoria.select(
    "*",
    col("serie_valores.2022").alias("ano_2022")
).drop("serie_valores")

df_final.show(truncate=False)


+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+-----------------+--------+
|id |variavel          |classificacao_id|classificacao_nome       |localidade_id|localidade_nome|nivel_id|nivel_nome|categoria_chave|categoria_valor|serie_valores_map|ano_2022|
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+-----------------+--------+
|630|Número de empresas|319             |Faixas de pessoal ocupado|1            |Brasil         |N1      |Brasil    |104029         |Total          |{2022 -> 346105} |346105  |
|630|Número de empresas|9117            |Tipo de indústria        |1            |Brasil         |N1      |Brasil    |99735          |Total          |{2022 -> 346105} |346105  |
+---+------------------+----------------+-------------------------+-------------+---------------+--------+---------

In [None]:
from pyspark.sql.functions import create_map, lit, explode

# Converter para um mapa
df_categoria = df_categoria.withColumn(
    "serie_valores_map",
    create_map(lit("2022"), col("serie_valores.2022"))
)

# Explodir o mapa
# Use explode with two aliases for key and value
df_final = df_categoria.select(
    "*",
    explode(col("serie_valores_map")).alias("ano", "valor")  # Alias for key and value
).drop("serie_valores", "serie_valores_map")

df_final.show(truncate=False)

+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+
|id |variavel          |classificacao_id|classificacao_nome       |localidade_id|localidade_nome|nivel_id|nivel_nome|categoria_chave|categoria_valor|ano |valor |
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+
|630|Número de empresas|319             |Faixas de pessoal ocupado|1            |Brasil         |N1      |Brasil    |104029         |Total          |2022|346105|
|630|Número de empresas|9117            |Tipo de indústria        |1            |Brasil         |N1      |Brasil    |99735          |Total          |2022|346105|
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+



In [None]:
# Salvar em CSV, sobrescrevendo o arquivo se ele já existir
df_final.write.mode("overwrite").csv("dados_processados.csv", header=True)

# Ver primeiras linhas para análise
df_final.show(50, truncate=False)

+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+
|id |variavel          |classificacao_id|classificacao_nome       |localidade_id|localidade_nome|nivel_id|nivel_nome|categoria_chave|categoria_valor|ano |valor |
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+
|630|Número de empresas|319             |Faixas de pessoal ocupado|1            |Brasil         |N1      |Brasil    |104029         |Total          |2022|346105|
|630|Número de empresas|9117            |Tipo de indústria        |1            |Brasil         |N1      |Brasil    |99735          |Total          |2022|346105|
+---+------------------+----------------+-------------------------+-------------+---------------+--------+----------+---------------+---------------+----+------+



In [None]:
import pandas as pd
import numpy as np

In [None]:
df = pd.read_csv("/content/20250123051559.csv", sep=";")
df

In [None]:
df.dropna(subset=['Dados gerais das empresas industriais'], inplace=True)
df

In [None]:
df

Unnamed: 0,Unnamed: 1,Unnamed: 2,Unnamed: 3,Dados gerais das empresas industriais
,,Indústrias de transformação,Indústrias extrativas,Total
"Consumo de matérias-primas, materiais auxiliares e componentes",Mil Reais,3118206683,87749538,3205956221
Custos das operações industriais,Mil Reais,3464254326,170906700,3635161026
"Custos e despesas - gastos de pessoal - salários, retiradas e outras remunerações",Mil Reais,385099186,18559811,403658997
Custos e despesas - gastos de pessoal - total,Mil Reais,582407361,30072642,612480003
Custos e despesas - total,Mil Reais,6153024464,373232437,6526256901
Número de empresas,Unidades,339432,6673,346105
Pessoal ocupado em 31/12,Pessoas,8057973,225684,8283657
Receita - total,Mil Reais,6967386866,552794074,7520180940
Receita líquida de vendas - de produtos e serviços industriais,Mil Reais,5613029144,432822690,6045851834


In [None]:
df2 = pd.read_csv("/content/dados_processados.csv/part-00000-a0bcba50-315f-42ef-8ee3-1c94091a770b-c000.csv")
df2

Unnamed: 0,id,variavel,classificacao_id,classificacao_nome,localidade_id,localidade_nome,nivel_id,nivel_nome,categoria_chave,categoria_valor,ano,valor
0,630,Número de empresas,319,Faixas de pessoal ocupado,1,Brasil,N1,Brasil,104029,Total,2022,346105
1,630,Número de empresas,9117,Tipo de indústria,1,Brasil,N1,Brasil,99735,Total,2022,346105


In [None]:
# Rename the column "variavel" to "Numero de empresas"
df2 = df2.rename(columns={"variavel": "Numero de empresas"})

# Replace the values in the "Numero de empresas" column with the values from the "valor" column.
df2["Numero de empresas"] = df2["valor"].drop(columns=["valor"])
# Display the updated DataFrame to confirm the changes.

df2 = df2.drop(columns=["valor", "categoria_valor" ])

df2

Unnamed: 0,id,Numero de empresas,classificacao_id,classificacao_nome,localidade_id,localidade_nome,nivel_id,nivel_nome,categoria_chave,ano
0,630,346105,319,Faixas de pessoal ocupado,1,Brasil,N1,Brasil,104029,2022
1,630,346105,9117,Tipo de indústria,1,Brasil,N1,Brasil,99735,2022
