# Codigo MVP---Engenharia-de-Dados-Brasil-em-Movimento-Futebol-PIB-e-as-Rotas-da-Migra-o-Interna

### 1.Importação de Bibliotecas

In [0]:
import pandas as pd
import requests
from pyspark.sql.functions import col, when, trim, regexp_replace


%pip install plotly

import plotly.express as px



Python interpreter will be restarted.
Python interpreter will be restarted.


### 1.1 Importação de Dados
---

In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

urls = {
    "producao_industria": "https://apisidra.ibge.gov.br/values/t/8159/n3/all/v/11599/p/all/c544/129314/d/v11599%205",
    "pib": "https://apisidra.ibge.gov.br/values/t/5938/n1/all/n3/all/v/37/p/all/d/v37%202",
    "populacao": "https://apisidra.ibge.gov.br/values/t/6579/n1/all/n3/all/v/9324/p/all/d/v9324%202",
    "obitos": "https://apisidra.ibge.gov.br/values/t/2683/n1103/all/n3/all/v/allxp/p/last%2011/c9832/0,78090,78092,78093,78094,99197,99217/c1836/0/c2/0,4,5/c260/0/c257/0",
    "nascimentos": "https://apisidra.ibge.gov.br/values/t/2680/n3/all/v/allxp/p/last%2011/c235/0/c2/0,4,5/c237/0,5349,5350,5351/c238/0/c240/0",
    "desemprego": "https://apisidra.ibge.gov.br/values/t/4093/n3/all/v/1641/p/all/c2/all"
}


for nome, url in urls.items():
    print(f"🔄 Baixando dados de: {nome}")
    try:
        response = requests.get(url)
        response.raise_for_status()
        dados = response.json()


        if not dados or len(dados) < 2:
            raise ValueError("Resposta inválida ou vazia da API.")

        colunas = list(dados[0].values())

        linhas = [dict(zip(colunas, item.values())) for item in dados[1:]]

        df_pandas = pd.DataFrame(linhas)

        df_spark = spark.createDataFrame(df_pandas)

        globals()[nome] = df_spark

        df_spark.createOrReplaceTempView(f"temp_{nome}")

        print(f"✅ DataFrame Spark `{nome}` criado com {df_spark.count()} linhas.")

    except Exception as e:
        print(f"❌ Erro ao criar `{nome}`: {type(e).__name__}: {e}")




🔄 Baixando dados de: producao_industria
✅ DataFrame Spark `producao_industria` criado com 3528 linhas.
🔄 Baixando dados de: pib
✅ DataFrame Spark `pib` criado com 560 linhas.
🔄 Baixando dados de: populacao
✅ DataFrame Spark `populacao` criado com 560 linhas.
🔄 Baixando dados de: obitos
✅ DataFrame Spark `obitos` criado com 6468 linhas.
🔄 Baixando dados de: nascimentos
✅ DataFrame Spark `nascimentos` criado com 3564 linhas.
🔄 Baixando dados de: desemprego
✅ DataFrame Spark `desemprego` criado com 4212 linhas.


In [0]:
file_location = "/FileStore/tables/Campeonato_Brasilerio_Serie_A_e_B.csv"
file_type = "csv"

infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"

brasileiro = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

### 2. Pré-tratamento para inserção dos dados no banco de dados Bronze

In [0]:
def limpar_colunas(df):
    df.columns = (
        df.columns
        .str.strip()
        .str.upper()
        .str.normalize("NFKD") 
        .str.encode("ascii", errors="ignore")
        .str.decode("utf-8")
        .str.replace(r"[^\w]", "_", regex=True) 
        .str.replace(r"_+", "_", regex=True) 
        .str.strip("_")
    )
    return df
    limpar_colunas(brasileiro)
    limpar_colunas(pib)
    limpar_colunas(producao_agricola)
    limpar_colunas(populacao)
    limpar_colunas(obitos)
    limpar_colunas(nascimentos)
    limpar_colunas(desemprego)



In [0]:
from pyspark.sql.functions import col, regexp_replace, trim

brasileiro = brasileiro.withColumnRenamed("Posição", "Posicao")
brasileiro = brasileiro.withColumn("Posicao_limp", trim(regexp_replace(col("Posicao"), "[^0-9]", "")))
brasileiro = brasileiro.withColumn("Posicao", col("Posicao_limp").cast("int")).drop("Posicao_limp")


### 3. Pre-Visualização dos Dados

In [0]:
pib.printSchema()

root
 |-- Nível Territorial (Código): string (nullable = true)
 |-- Nível Territorial: string (nullable = true)
 |-- Unidade de Medida (Código): string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Valor: string (nullable = true)
 |-- Brasil e Unidade da Federação (Código): string (nullable = true)
 |-- Brasil e Unidade da Federação: string (nullable = true)
 |-- Variável (Código): string (nullable = true)
 |-- Variável: string (nullable = true)
 |-- Ano (Código): string (nullable = true)
 |-- Ano: string (nullable = true)



### 4.1 Criação do Banco de de Dados Bronze

Foi definida uma camada Bronze para a ingestão inicial dos dados, onde todas as fontes (SIDRA, Excel de clubes, logs de migração etc.) são carregadas na forma bruta, sem transformações ou agregações. Nessa camada, cada tabela reflete exatamente o formato original:
- Manter os dados brutos nessa camada garante traçabilidade (linhagem) e serve como ponto de partida para todas as transformações subsequentes nas camadas Silver e Gold.

In [0]:
dbutils.fs.rm("dbfs:/user/hive/warehouse/bronze.db/obitos", recurse=True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/bronze.db/nascimentos", recurse=True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/bronze.db/desemprego", recurse=True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/bronze.db/brasileiro", recurse=True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/bronze.db/pib", recurse=True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/bronze.db/populacao", recurse=True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/bronze.db/industria", recurse=True)


Out[9]: True

In [0]:
brasileiro.createOrReplaceTempView("temp_brasileiro")
pib.createOrReplaceTempView("temp_pib")
populacao.createOrReplaceTempView("temp_populacao")
obitos.createOrReplaceTempView("temp_obitos")
nascimentos.createOrReplaceTempView("temp_nascimentos")
desemprego.createOrReplaceTempView("temp_desemprego")
producao_industria.createOrReplaceTempView("temp_producao_industria")

In [0]:
%sql

CREATE DATABASE IF NOT EXISTS BRONZE;

DROP TABLE IF EXISTS BRONZE.brasileiro;
DROP TABLE IF EXISTS BRONZE.PIB;
DROP TABLE IF EXISTS BRONZE.AGRICULTURA;
DROP TABLE IF EXISTS BRONZE.POPULACAO;
DROP TABLE IF EXISTS BRONZE.NASCIMENTOS; 
DROP TABLE IF EXISTS BRONZE.OBITOS;
DROP TABLE IF EXISTS BRONZE.DESEMPREGO;
DROP TABLE IF EXISTS BRONZE.INDUSTRIA;

USE BRONZE;

CREATE TABLE BRONZE.BRASILEIRO (
  ANO INT,
  POSICAO STRING,
  CLUBE STRING,
  ESTADO STRING,
  SERIE STRING
);

CREATE TABLE BRONZE.PIB (
  ANO INT,
  ESTADO STRING,
  VALOR_PIB DOUBLE
 );

  CREATE TABLE BRONZE.POPULACAO (
  ANO INT,
  ESTADO STRING,
  VALOR DOUBLE
 );

  CREATE TABLE BRONZE.OBITOS (
  ANO INT,
  ESTADO STRING,
  VALOR INT,
  SEXO STRING,
  ESTADO_CIVIL STRING
 );

  CREATE TABLE BRONZE.NASCIMENTOS (
  ANO INT,
  ESTADO STRING,
  VALOR INT,
  SEXO STRING,
  LOCAL_NASC STRING
 );

 CREATE TABLE BRONZE.DESEMPREGO (
  TRIMESTRE INT,
  ESTADO STRING,
  VALOR INT,
  SEXO STRING
 );
 
 CREATE TABLE BRONZE.INDUSTRIA (
  MES_ANO INT,
  ESTADO STRING,
  VALOR FLOAT
 );

### 4.2 Inserção dos Dados no Banco de dados BRONZE

In [0]:
%sql
INSERT INTO BRONZE.INDUSTRIA
SELECT 
  try_cast(`Mês (Código)` AS INT) AS MES_ANO,
  `Unidade da Federação` AS ESTADO,
  try_cast(`Valor` AS FLOAT) AS VALOR
FROM temp_producao_industria;

INSERT INTO BRONZE.DESEMPREGO
SELECT 
  try_cast(`Trimestre (Código)` AS INT) AS TRIMESTRE,
  `Unidade da Federação` AS ESTADO,
  try_cast(`Valor` AS INT) AS VALOR,
  `Sexo` AS SEXO
FROM temp_desemprego;

INSERT INTO BRONZE.NASCIMENTOS
SELECT 
  try_cast(`Ano` AS INT) AS ANO,
  `Unidade da Federação` AS ESTADO,
  try_cast(`Valor` AS INT) AS VALOR,
  `Sexo` AS SEXO,
  `Local do nascimento` AS LOCAL_NASC
FROM temp_nascimentos;

INSERT INTO BRONZE.PIB
SELECT 
  `Ano` AS ANO,
  `Brasil e Unidade da Federação` AS ESTADO,
  `Valor` AS VALOR_PIB
FROM temp_pib;


INSERT INTO BRONZE.POPULACAO
SELECT 
  `Ano` AS ANO,
  `Brasil e Unidade da Federação` AS ESTADO,
  `Valor` AS VALOR
FROM temp_populacao;

INSERT INTO BRONZE.OBITOS
SELECT 
  try_cast(`Ano` AS INT) AS ANO,
  `Total e Unidade da Federação` AS ESTADO,
  try_cast(`Valor` AS DOUBLE) AS VALOR,
  `Sexo` AS SEXO,
  `Estado civil` AS ESTADO_CIVIL
FROM temp_obitos;

INSERT INTO BRONZE.BRASILEIRO
SELECT Ano, Posicao, Clube, Estado, Serie
FROM temp_brasileiro;


num_affected_rows,num_inserted_rows
520,520


### 5.1 Criação do Banco de de Dados SILVER

- Criação da tabela **SIGLAS_ESTADO**, que mapeia cada estado à sua sigla e região, servindo como chave de referência para unificar e relacionar as demais tabelas do projeto.


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS SILVER;

CREATE OR REPLACE TABLE SILVER.SIGLAS_ESTADO AS
SELECT * FROM VALUES
  ('Acre', 'AC', 'Norte'),
  ('Amapá', 'AP', 'Norte'),
  ('Amazonas', 'AM', 'Norte'),
  ('Pará', 'PA', 'Norte'),
  ('Rondônia', 'RO', 'Norte'),
  ('Roraima', 'RR', 'Norte'),
  ('Tocantins', 'TO', 'Norte'),
  ('Alagoas', 'AL', 'Nordeste'),
  ('Bahia', 'BA', 'Nordeste'),
  ('Ceará', 'CE', 'Nordeste'),
  ('Maranhão', 'MA', 'Nordeste'),
  ('Paraíba', 'PB', 'Nordeste'),
  ('Pernambuco', 'PE', 'Nordeste'),
  ('Piauí', 'PI', 'Nordeste'),
  ('Rio Grande do Norte', 'RN', 'Nordeste'),
  ('Sergipe', 'SE', 'Nordeste'),
  ('Distrito Federal', 'DF', 'Centro-Oeste'),
  ('Goiás', 'GO', 'Centro-Oeste'),
  ('Mato Grosso', 'MT', 'Centro-Oeste'),
  ('Mato Grosso do Sul', 'MS', 'Centro-Oeste'),
  ('Espírito Santo', 'ES', 'Sudeste'),
  ('Minas Gerais', 'MG', 'Sudeste'),
  ('Rio de Janeiro', 'RJ', 'Sudeste'),
  ('São Paulo', 'SP', 'Sudeste'),
  ('Paraná', 'PR', 'Sul'),
  ('Rio Grande do Sul', 'RS', 'Sul'),
  ('Santa Catarina', 'SC', 'Sul'
) AS tbl(ESTADO, SIGLA, REGIAO);



num_affected_rows,num_inserted_rows


- Importação das tabelas brutas da camada Bronze e aplicação de limpeza, padronização e agregações para consolidação na camada Silver.


In [0]:
%sql

CREATE OR REPLACE TABLE SILVER.DESEMPREGO AS
SELECT 
  CAST(TRIMESTRE / 100 AS INT) AS ANO,
  ESTADO,
  SEXO,
  SUM(VALOR) AS VALOR_TOTAL
FROM BRONZE.DESEMPREGO
WHERE VALOR IS NOT NULL
GROUP BY 
  CAST(TRIMESTRE / 100 AS INT),
  ESTADO,
  SEXO;

CREATE OR REPLACE TABLE SILVER.BRASILEIRO AS
SELECT 
  CASE 
    WHEN UPPER(TRIM(CLUBE)) IN ('ATLÉTICO GO', 'ATLETICO-GO', 'ATLETICO GO') THEN 'ATLÉTICO-GO'
    ELSE UPPER(TRIM(CLUBE))
  END AS CLUBE,
  ANO,
  POSICAO,
  UPPER(TRIM(ESTADO)) AS ESTADO,
  UPPER(TRIM(SERIE)) AS SERIE
FROM BRONZE.BRASILEIRO
WHERE CLUBE IS NOT NULL;

CREATE OR REPLACE TABLE SILVER.INDUSTRIA AS
SELECT 
  CAST(MES_ANO / 100 AS INT) AS ANO,
  CAST(MES_ANO % 100 AS INT) AS MES,
  ESTADO,
  SUM(VALOR) AS VALOR
FROM BRONZE.INDUSTRIA
GROUP BY 
  CAST(MES_ANO / 100 AS INT),
  CAST(MES_ANO % 100 AS INT),
  ESTADO;

CREATE OR REPLACE TABLE SILVER.NASCIMENTOS AS
SELECT * FROM BRONZE.NASCIMENTOS;

CREATE OR REPLACE TABLE SILVER.POPULACAO AS
SELECT * FROM BRONZE.POPULACAO;

CREATE OR REPLACE TABLE SILVER.PIB AS
SELECT * FROM BRONZE.PIB;

CREATE OR REPLACE TABLE SILVER.OBITOS AS
SELECT * FROM BRONZE.OBITOS;



num_affected_rows,num_inserted_rows


- Criação da tabela **T_ESTADO_ANO**, responsável por consolidar todas as combinações de ano e estado, estruturando o modelo em formato estrela e servindo como dimensão central para as análises.


In [0]:
%sql
CREATE OR REPLACE TABLE SILVER.T_ESTADO_ANO AS
SELECT 
  a.ANO,
  s.ESTADO AS ESTADO,
  s.SIGLA
FROM (

  SELECT DISTINCT ANO FROM SILVER.DESEMPREGO
  UNION
  SELECT DISTINCT ANO FROM SILVER.BRASILEIRO
  UNION
  SELECT DISTINCT ANO FROM SILVER.INDUSTRIA
  UNION
  SELECT DISTINCT ANO FROM SILVER.NASCIMENTOS
  UNION
  SELECT DISTINCT ANO FROM SILVER.POPULACAO
  UNION
  SELECT DISTINCT ANO FROM SILVER.PIB
  UNION
  SELECT DISTINCT ANO FROM SILVER.OBITOS
) a
CROSS JOIN SILVER.SIGLAS_ESTADO s;

num_affected_rows,num_inserted_rows


> Criação de uma tabela com dados e calculo de Migração por Estado

In [0]:
%sql
CREATE OR REPLACE TABLE SILVER.MIGRACAO_ESTADUAL AS

WITH 
BASE_COMPLETA AS (
  SELECT 
    ea.ANO,
    ea.ESTADO,
    ea.SIGLA,
    LAG(ea.ANO) OVER (PARTITION BY ea.ESTADO ORDER BY ea.ANO) AS ANO_ANTERIOR
  FROM SILVER.T_ESTADO_ANO ea
),

POPULACAO_ANALISADA AS (
  SELECT
    bc.ANO,
    bc.ESTADO,
    bc.SIGLA,
    p.VALOR AS POPULACAO_ATUAL,
    LAG(p.VALOR) OVER (PARTITION BY bc.ESTADO ORDER BY bc.ANO) AS POPULACAO_ANTERIOR,
    COALESCE(n.VALOR, 0) AS NASCIMENTOS,
    COALESCE(o.VALOR, 0) AS OBITOS,
    (COALESCE(n.VALOR, 0) - COALESCE(o.VALOR, 0)) AS CRESCIMENTO_NATURAL
  FROM BASE_COMPLETA bc
  LEFT JOIN SILVER.POPULACAO p ON bc.ANO = p.ANO AND bc.ESTADO = p.ESTADO
  LEFT JOIN (
    SELECT ANO, ESTADO, SUM(VALOR) AS VALOR 
    FROM SILVER.NASCIMENTOS 
    GROUP BY ANO, ESTADO
  ) n ON bc.ANO = n.ANO AND bc.ESTADO = n.ESTADO
  LEFT JOIN (
    SELECT ANO, ESTADO, SUM(VALOR) AS VALOR 
    FROM SILVER.OBITOS 
    GROUP BY ANO, ESTADO
  ) o ON bc.ANO = o.ANO AND bc.ESTADO = o.ESTADO
  WHERE bc.ANO_ANTERIOR IS NOT NULL 
)

SELECT
  pa.ANO,
  pa.ESTADO,
  pa.SIGLA,
  pa.POPULACAO_ATUAL,
  pa.POPULACAO_ANTERIOR,
  pa.NASCIMENTOS,
  pa.OBITOS,
  pa.CRESCIMENTO_NATURAL,
   (pa.POPULACAO_ATUAL - (pa.POPULACAO_ANTERIOR + pa.CRESCIMENTO_NATURAL)) AS MIGRACAO_LIQUIDA,
  
  CASE
    WHEN (pa.POPULACAO_ATUAL - (pa.POPULACAO_ANTERIOR + pa.CRESCIMENTO_NATURAL)) > 0 THEN 'Entrada líquida'
    WHEN (pa.POPULACAO_ATUAL - (pa.POPULACAO_ANTERIOR + pa.CRESCIMENTO_NATURAL)) < 0 THEN 'Saída líquida'
    ELSE 'Equilíbrio'
  END AS TIPO_FLUXO,
  
  ROUND(
    ABS(pa.POPULACAO_ATUAL - (pa.POPULACAO_ANTERIOR + pa.CRESCIMENTO_NATURAL)) / 
    NULLIF(pa.POPULACAO_ANTERIOR, 0) * 100, 
    2
  ) AS PERCENTUAL_MIGRACAO
  
FROM POPULACAO_ANALISADA pa
WHERE pa.POPULACAO_ATUAL IS NOT NULL
  AND pa.NASCIMENTOS > 0
  AND pa.OBITOS > 0
  AND pa.POPULACAO_ANTERIOR IS NOT NULL;

num_affected_rows,num_inserted_rows


### 6.1 Criação do Banco de de Dados GOLD

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS GOLD;

-- Enviando as tabelas para GOLD
CREATE OR REPLACE TABLE GOLD.DESEMPREGO AS SELECT * FROM SILVER.DESEMPREGO;
CREATE OR REPLACE TABLE GOLD.BRASILEIRO AS SELECT * FROM SILVER.BRASILEIRO;
CREATE OR REPLACE TABLE GOLD.INDUSTRIA AS SELECT * FROM SILVER.INDUSTRIA;
CREATE OR REPLACE TABLE GOLD.NASCIMENTOS AS SELECT * FROM SILVER.NASCIMENTOS;
CREATE OR REPLACE TABLE GOLD.POPULACAO AS SELECT * FROM SILVER.POPULACAO;
CREATE OR REPLACE TABLE GOLD.PIB AS SELECT * FROM SILVER.PIB;
CREATE OR REPLACE TABLE GOLD.OBITOS AS SELECT * FROM SILVER.OBITOS;
CREATE OR REPLACE TABLE GOLD.T_ESTADO_ANO AS SELECT * FROM SILVER.T_ESTADO_ANO;
CREATE OR REPLACE TABLE GOLD.SIGLAS_ESTADO AS SELECT * FROM SILVER.SIGLAS_ESTADO;
CREATE OR REPLACE TABLE GOLD.MIGRACAO_ESTADUAL as SELECT * FROM SILVER.MIGRACAO_ESTADUAL


num_affected_rows,num_inserted_rows


In [0]:
%sql
select* from gold.MIGRACAO_ESTADUAL
limit 5

ANO,ESTADO,SIGLA,POPULACAO_ATUAL,POPULACAO_ANTERIOR,NASCIMENTOS,OBITOS,CRESCIMENTO_NATURAL,MIGRACAO_LIQUIDA,TIPO_FLUXO,PERCENTUAL_MIGRACAO
2012,Acre,AC,758786.0,746386.0,60886,12889,47997,-35597.0,Saída líquida,4.77
2013,Acre,AC,776463.0,758786.0,63364,12890,50474,-32797.0,Saída líquida,4.32
2014,Acre,AC,790101.0,776463.0,64180,13666,50514,-36876.0,Saída líquida,4.75
2015,Acre,AC,803513.0,790101.0,63796,13879,49917,-36505.0,Saída líquida,4.62
2016,Acre,AC,816687.0,803513.0,60238,14740,45498,-32324.0,Saída líquida,4.02


### Criação de uma tabela de Rank do Campeonato Brasileiro

In [0]:
%sql

CREATE OR REPLACE TABLE GOLD.PONTUACAO_ESTADOS AS

WITH DADOS_BRASILEIRO AS (
  SELECT
    ANO,
    ESTADO,
    SERIE,
    POSICAO,

    CASE 
      WHEN SERIE = 'A' THEN (20 - (POSICAO - 1)) * 6  
      WHEN SERIE = 'B' THEN (20 - (POSICAO - 1)) * 1 
      ELSE 0
    END AS PONTOS
  FROM SILVER.BRASILEIRO
  WHERE SERIE IN ('A', 'B') 
    AND POSICAO <= 20
    AND POSICAO > 0 
)

SELECT
  ANO,
  ESTADO,
  SUM(PONTOS) AS PONTUACAO_TOTAL,
  COUNT(*) AS QTD_CLUBES,
  SUM(CASE WHEN SERIE = 'A' THEN 1 ELSE 0 END) AS QTD_SERIE_A,
  SUM(CASE WHEN SERIE = 'B' THEN 1 ELSE 0 END) AS QTD_SERIE_B,
  MAX(CASE WHEN SERIE = 'A' THEN (20 - (POSICAO - 1)) ELSE 0 END) AS MELHOR_POSICAO_A,
  MAX(CASE WHEN SERIE = 'B' THEN (20 - (POSICAO - 1)) ELSE 0 END) AS MELHOR_POSICAO_B,
  ROUND(SUM(PONTOS) / COUNT(*), 2) AS MEDIA_PONTOS_POR_CLUBE
FROM DADOS_BRASILEIRO
GROUP BY ANO, ESTADO
ORDER BY ANO DESC, PONTUACAO_TOTAL DESC;

num_affected_rows,num_inserted_rows


> Os 10 Times que mais pontuaram no Rank em todo periodo analisao

In [0]:
%sql
SELECT
  CLUBE,
  ESTADO,
  SUM(
    CASE 
      WHEN SERIE = 'A' THEN (20 - (POSICAO - 1)) * 6  
      WHEN SERIE = 'B' THEN (20 - (POSICAO - 1)) * 1 
      ELSE 0
    END
  ) AS PONTOS_TOTAIS
FROM SILVER.BRASILEIRO
WHERE SERIE IN ('A', 'B')
  AND POSICAO BETWEEN 1 AND 20
GROUP BY CLUBE, ESTADO
ORDER BY PONTOS_TOTAIS DESC
LIMIT 10;


CLUBE,ESTADO,PONTOS_TOTAIS
ATLÉTICO-MG,MG,1188.0
FLAMENGO,RJ,1182.0
PALMEIRAS,SP,1148.0
CORINTHIANS,SP,1092.0
GRÊMIO,RS,1087.0
SÃO PAULO,SP,1062.0
INTERNACIONAL,RS,1003.0
SANTOS,SP,962.0
FLUMINENSE,RJ,900.0
BOTAFOGO,RJ,797.0


> Criação da Tabela Rank por Estado

In [0]:
%sql

CREATE OR REPLACE TABLE GOLD.RANKING_ESTADOS AS

SELECT
  ANO,
  ESTADO,
  SUM(20 - (POSICAO - 1)) AS PONTUACAO_TOTAL, 
  COUNT(*) AS QTD_CLUBES,
  MAX(20 - (POSICAO - 1)) AS MELHOR_POSICAO,
  ROUND(AVG(20 - (POSICAO - 1)), 2) AS MEDIA_PONTOS_POR_CLUBE
FROM SILVER.BRASILEIRO
WHERE POSICAO BETWEEN 1 AND 20 
  AND POSICAO IS NOT NULL
  AND ESTADO IS NOT NULL
  AND ANO IS NOT NULL
GROUP BY ANO, ESTADO
ORDER BY ANO DESC, PONTUACAO_TOTAL DESC;

num_affected_rows,num_inserted_rows


- Somatorio do PIB e e das Pontuações por estado

In [0]:
from pyspark.sql.functions import col

df_com_razao = spark.sql("""
SELECT
  se.REGIAO,
  p.ANO,
  SUM(p.PONTUACAO_TOTAL) AS TOTAL_PONTOS,
  SUM(b.VALOR_PIB) AS TOTAL_PIB
FROM GOLD.RANKING_ESTADOS p
JOIN GOLD.SIGLAS_ESTADO se 
  ON p.ESTADO = se.SIGLA
JOIN GOLD.PIB b 
  ON se.ESTADO = b.ESTADO AND p.ANO = b.ANO
GROUP BY se.REGIAO, p.ANO
""")

df_com_razao = df_com_razao.withColumn(
    "PONTOS_POR_PIB",
    col("TOTAL_PONTOS") / col("TOTAL_PIB")
)


df_final = df_com_razao.orderBy("ANO", ascending=False).orderBy("TOTAL_PONTOS", ascending=False)

display(df_final)

REGIAO,ANO,TOTAL_PONTOS,TOTAL_PIB,PONTOS_POR_PIB
Sudeste,2012,204.0,2576201246.7,7.918636025089849e-08
Sudeste,2021,197.0,4526645215.43,4.3520088415253975e-08
Sudeste,2014,197.0,3045906883.9,6.46769607571719e-08
Sudeste,2019,194.0,3780138601.74,5.132086953391119e-08
Sudeste,2017,193.0,3368742847.85,5.7291401783064727e-08
Sudeste,2015,189.0,3118372072.09,6.06085469054782e-08
Sudeste,2020,177.0,3814248806.88,4.640494340084317e-08
Sudeste,2016,173.0,3223969056.47,5.3660564654867324e-08
Sudeste,2018,173.0,3584296816.1399994,4.826609203260882e-08
Sudeste,2013,170.0,2831469388.94,6.003949774771955e-08


- Criação de Grafico do PIB/Pontos no Campeonato Brasileiro Serie A e B

In [0]:

df_pandas = df_final.toPandas()
df_pandas  = df_pandas.sort_values('ANO')
import plotly.express as px
fig = px.line(df_pandas, x='ANO', y='PONTOS_POR_PIB', color='REGIAO', 
             title='Razão Pontuação/PIB por Região ao Longo dos Anos')
fig.show()

- Verificação de correlações

In [0]:
%sql
WITH dados_combinados AS (
  SELECT
    p.ANO,
    p.ESTADO,
    p.PONTUACAO_TOTAL AS PONTOS,
    b.VALOR_PIB
  FROM GOLD.RANKING_ESTADOS p
  JOIN GOLD.SIGLAS_ESTADO se ON p.ESTADO = se.SIGLA
  JOIN GOLD.PIB b ON se.ESTADO = b.ESTADO AND p.ANO = b.ANO
)

SELECT
  corr(PONTOS, VALOR_PIB) AS correlacao_pontos_pib,
  corr(PONTOS, ANO) AS correlacao_pontos_ano,
  corr(VALOR_PIB, ANO) AS correlacao_pib_ano
FROM dados_combinados

correlacao_pontos_pib,correlacao_pontos_ano,correlacao_pib_ano
0.8319031171832897,-0.0380947947799624,0.117219258343482


In [0]:
from pyspark.sql import functions as F


df = (spark.table("GOLD.PONTUACAO_ESTADOS")
      .select("ANO", "ESTADO", "PONTUACAO_TOTAL")
      .orderBy("ANO", "ESTADO")
)


df_pandas = df.toPandas()

import plotly.express as px

fig = px.line(
    df_pandas,
    x="ANO",
    y="PONTUACAO_TOTAL",
    color="ESTADO",
    title="Evolução da Pontuação Esportiva por Estado",
    labels={
        "PONTUACAO_TOTAL": "Pontuação Total", 
        "ANO": "Ano",
        "ESTADO": "Estado"
    },
    template="plotly_white",
    height=600
)



fig.show()

In [0]:
from pyspark.sql import functions as F

df_barras = (spark.table("GOLD.PONTUACAO_ESTADOS")
             .groupBy("ESTADO")
             .agg(F.sum("PONTUACAO_TOTAL").alias("PONTUACAO_ACUMULADA"))
             .orderBy(F.desc("PONTUACAO_ACUMULADA"))
)


df_barras_pandas = df_barras.toPandas()


import plotly.express as px

fig = px.bar(
    df_barras_pandas,
    x="ESTADO",
    y="PONTUACAO_ACUMULADA",
    title="Pontuação Total Acumulada por Estado (Todos os Anos)",
    labels={
        "PONTUACAO_ACUMULADA": "Pontuação Total", 
        "ESTADO": "Estado"
    },
    color="ESTADO",
    color_discrete_sequence=px.colors.qualitative.Plotly,
    template="plotly_white"
)

fig.update_layout(
    height=600,
    xaxis_title="",
    yaxis_title="Pontuação Total",
    showlegend=False,
    xaxis={'categoryorder':'total descending'}, 
    hoverlabel=dict(
        bgcolor="white",
        font_size=12,
        font_family="Arial"
    )
)


fig.update_traces(
    marker_line_color='rgb(8,48,107)',
    marker_line_width=1.5,
    opacity=0.8,
    texttemplate='%{y:,.0f} pontos',  
    textposition='outside'
)


fig.show()

>  Dados de Migração

In [0]:
%sql
SELECT 
    ESTADO,
    AVG(MIGRACAO_LIQUIDA) AS SALDO_MIGRATORIO_MEDIO,
    AVG(PERCENTUAL_MIGRACAO) AS PERCENTUAL_MEDIO,
    COUNT(CASE WHEN TIPO_FLUXO = 'Entrada líquida' THEN 1 END) AS ANOS_ATRATOR
FROM GOLD.MIGRACAO_ESTADUAL
GROUP BY ESTADO
ORDER BY SALDO_MIGRATORIO_MEDIO DESC;

ESTADO,SALDO_MIGRATORIO_MEDIO,PERCENTUAL_MEDIO,ANOS_ATRATOR
Roraima,-13508.7,3.342999999999999,1
Amapá,-26343.9,3.414,0
Acre,-30871.6,3.813999999999999,0
Tocantins,-43735.7,2.8879999999999995,0
Rondônia,-47704.3,3.511,1
Sergipe,-54224.7,2.4210000000000003,0
Espírito Santo,-65035.1,2.3630000000000004,1
Goiás,-66956.3,1.3090000000000002,2
Rio Grande do Norte,-68225.2,2.175,1
Mato Grosso do Sul,-69136.0,2.607,0


In [0]:
import plotly.express as px

fig = px.bar(
    df.sort_values('SALDO_MIGRATORIO_MEDIO', ascending=True),
    x='SALDO_MIGRATORIO_MEDIO',
    y='ESTADO',
    color='SALDO_MIGRATORIO_MEDIO',
    color_continuous_scale='RdYlBu',  
    range_color=[df['SALDO_MIGRATORIO_MEDIO'].min(), df['SALDO_MIGRATORIO_MEDIO'].max()],
    title='Saldo Migratório por Estado',
    labels={'SALDO_MIGRATORIO_MEDIO': 'Saldo (pessoas/ano)'},
    hover_data=['PERCENTUAL_MEDIO', 'ANOS_ATRATOR'],
    orientation='h',
    height=800  
)



fig.show()

In [0]:
%sql
SELECT 
    m.ESTADO,
    e.SIGLA,
    e.REGIAO,
    AVG(m.MIGRACAO_LIQUIDA) AS SALDO_MIGRATORIO_MEDIO,
    AVG(m.PERCENTUAL_MIGRACAO) AS PERCENTUAL_MEDIO,
    COUNT(CASE WHEN m.TIPO_FLUXO = 'Entrada líquida' THEN 1 END) AS ANOS_ATRATOR
FROM GOLD.MIGRACAO_ESTADUAL m
JOIN GOLD.SIGLAS_ESTADO e ON m.ESTADO = e.ESTADO
GROUP BY m.ESTADO, e.SIGLA, e.REGIAO
ORDER BY SALDO_MIGRATORIO_MEDIO DESC

ESTADO,SIGLA,REGIAO,SALDO_MIGRATORIO_MEDIO,PERCENTUAL_MEDIO,ANOS_ATRATOR
Roraima,RR,Norte,-13508.7,3.342999999999999,1
Amapá,AP,Norte,-26343.9,3.414,0
Acre,AC,Norte,-30871.6,3.813999999999999,0
Tocantins,TO,Norte,-43735.7,2.8879999999999995,0
Rondônia,RO,Norte,-47704.3,3.511,1
Sergipe,SE,Nordeste,-54224.7,2.4210000000000003,0
Espírito Santo,ES,Sudeste,-65035.1,2.3630000000000004,1
Goiás,GO,Centro-Oeste,-66956.3,1.3090000000000002,2
Rio Grande do Norte,RN,Nordeste,-68225.2,2.175,1
Mato Grosso do Sul,MS,Centro-Oeste,-69136.0,2.607,0


In [0]:
import plotly.express as px

# Ordenar por saldo migratório
df_sorted = df.sort_values('SALDO_MIGRATORIO_MEDIO')

# Criar gráfico de barras
fig = px.bar(
    df_sorted,
    x='SALDO_MIGRATORIO_MEDIO',
    y='ESTADO',
    color='REGIAO',
    orientation='h',
    height=800,
    title='Saldo Migratório por Estado e Região',
    labels={'SALDO_MIGRATORIO_MEDIO': 'Saldo Migratório Médio'},
    hover_data=['SIGLA', 'PERCENTUAL_MEDIO', 'ANOS_ATRATOR'],
    color_discrete_sequence=px.colors.qualitative.Pastel
)

# Melhorar tooltip
fig.update_traces(
    hovertemplate="<b>%{y}</b><br>Saldo: %{x:,.0f}<br>Região: %{customdata[0]}"
)
fig.show()

In [0]:
df_spark = spark.sql("""
    SELECT des.ANO, des.ESTADO, des.VALOR_TOTAL, pib.VALOR_PIB, 
           (des.VALOR_TOTAL * 100) / pib.VALOR_PIB AS DESEMPREGO_PIB
    FROM gold.desemprego des
    INNER JOIN gold.pib pib 
    ON pib.ESTADO = des.ESTADO 
    AND pib.ANO = des.ANO 
    AND des.SEXO = 'Total'
    ORDER BY des.ANO
""")

df_pandas = df_spark.toPandas()

import plotly.express as px

fig = px.line(df_pandas, x='ANO', y='DESEMPREGO_PIB', color='ESTADO',
              title="Relação entre Desemprego e PIB ao Longo dos Anos", 
              labels={"DESEMPREGO_PIB": "Desemprego / PIB", "ANO": "Ano", "ESTADO": "Estado"})


fig.show()


In [0]:
%sql
select corr(des.VALOR_TOTAL, pib.VALOR_PIB) from gold.desemprego des
inner join gold.pib pib on pib.ESTADO = des.ESTADO and pib.ANO = des.ANO and des.SEXO = 'Total'

"corr(VALOR_TOTAL, VALOR_PIB)"
0.8846421500107723


In [0]:
df_spark = spark.sql("""
    SELECT des.ANO, des.ESTADO, des.VALOR_TOTAL, pib.VALOR_PIB, 
           (des.VALOR_TOTAL * 100) / pib.VALOR_PIB AS DESEMPREGO_PIB
    FROM gold.desemprego des
    INNER JOIN gold.pib pib 
    ON pib.ESTADO = des.ESTADO 
    AND pib.ANO = des.ANO 
    AND des.SEXO = 'Total'
    ORDER BY des.ANO
""")

df_pandas = df_spark.toPandas()

import plotly.express as px

fig = px.line(df_pandas, x='ANO', y='DESEMPREGO_PIB', color='ESTADO',
              title="Relação entre Desemprego e PIB ao Longo dos Anos", 
              labels={"DESEMPREGO_PIB": "Desemprego / PIB", "ANO": "Ano", "ESTADO": "Estado"})


fig.show()

In [0]:
df_spark = spark.sql("""
    SELECT des.ESTADO, 
           des.SEXO, 
           SUM(des.VALOR_TOTAL) AS TOTAL_DESUMPREGO
    FROM gold.desemprego des
    WHERE des.ANO = 2012 AND des.SEXO IN ('Homens', 'Mulheres')
    GROUP BY des.ESTADO, des.SEXO
""")


df_pandas = df_spark.toPandas()


df_pandas['TOTAL_DESUMPREGO'] = df_pandas.apply(
    lambda row: -row['TOTAL_DESUMPREGO'] if row['SEXO'] == 'Homens' else row['TOTAL_DESUMPREGO'],
    axis=1
)


df_pivot = df_pandas.pivot(index='ESTADO', columns='SEXO', values='TOTAL_DESUMPREGO').fillna(0).reset_index()


df_long = df_pivot.melt(id_vars='ESTADO', value_vars=['Homens', 'Mulheres'],
                        var_name='SEXO', value_name='TOTAL_DESUMPREGO')

import plotly.express as px

fig = px.bar(df_long,
             x='TOTAL_DESUMPREGO',
             y='ESTADO',
             color='SEXO',
             orientation='h',
             title="PIRÂMIDE DE DESEMPREGO POR SEXO - 2012",
             labels={'TOTAL_DESUMPREGO': 'Desemprego Total', 'ESTADO': 'Estado', 'SEXO': 'Sexo'},
             height=800)


fig.update_layout(xaxis_tickformat=',', barmode='relative')

fig.show()


In [0]:
df_spark = spark.sql("""
    SELECT des.ESTADO, 
           des.SEXO, 
           SUM(des.VALOR_TOTAL) AS TOTAL_DESUMPREGO
    FROM gold.desemprego des
    WHERE des.ANO = 2024 AND des.SEXO IN ('Homens', 'Mulheres')
    GROUP BY des.ESTADO, des.SEXO
""")


df_pandas = df_spark.toPandas()


df_pandas['TOTAL_DESUMPREGO'] = df_pandas.apply(
    lambda row: -row['TOTAL_DESUMPREGO'] if row['SEXO'] == 'Homens' else row['TOTAL_DESUMPREGO'],
    axis=1
)


df_pivot = df_pandas.pivot(index='ESTADO', columns='SEXO', values='TOTAL_DESUMPREGO').fillna(0).reset_index()


df_long = df_pivot.melt(id_vars='ESTADO', value_vars=['Homens', 'Mulheres'],
                        var_name='SEXO', value_name='TOTAL_DESUMPREGO')

import plotly.express as px

fig = px.bar(df_long,
             x='TOTAL_DESUMPREGO',
             y='ESTADO',
             color='SEXO',
             orientation='h',
             title="PIRÂMIDE DE DESEMPREGO POR SEXO - 2024",
             labels={'TOTAL_DESUMPREGO': 'Desemprego Total', 'ESTADO': 'Estado', 'SEXO': 'Sexo'},
             height=800)


fig.update_layout(xaxis_tickformat=',', barmode='relative')

fig.show()

In [0]:
%sql
SELECT 
    des.ANO, 
    des.ESTADO, 
    des.VALOR_TOTAL
FROM gold.desemprego des
WHERE des.SEXO = 'Total'
  AND (des.ANO, des.VALOR_TOTAL) IN (
    SELECT A.ANO, MAX(A.VALOR_TOTAL)
    FROM gold.desemprego A
    WHERE A.SEXO = 'Total'
    GROUP BY A.ANO
)
ORDER BY des.ANO


ANO,ESTADO,VALOR_TOTAL
2012,São Paulo,139399
2013,São Paulo,141007
2014,São Paulo,143093
2015,São Paulo,144534
2016,São Paulo,145938
2017,São Paulo,146884
2018,São Paulo,148310
2019,São Paulo,150173
2020,São Paulo,37705
2022,São Paulo,116337
