# ETL

## Definição da Constelação de Fatos

**Tabelas de dimensão**

- dim_data (data_pk, data_completa, data_dia, data_mes, data_semestre, data_ano)
- dim_localidade (localidade_pk, latitude, longitude, cidade, estado, regiao, pais)
- dim_tipo_cancer (tipo_cancer_pk, tipo_cancer, mortalidade, taxa_incidencia_total)
- dim_faixa_etaria (faixa_pk, faixa_idade, idade_min, idade_max, id_idade)
- dim_metrica (metrica_pk, tipo_metrica)
- dim_sexo (sexo_pk, sexo)

**Tabelas de fatos**

- fato_cancer (ano_pk, estado_pk, tipo_cancer_pk, sexo_pk, faixa_pk, metrica_pk, obitos_cancer, incidencia_cancer, prevalencia_cancer)
- fato_clima (data_pk, localidade_pk, temperatura_media, temperatura_max, temperatura_min, radiacao_uv, radiacao_uva, radiacao_uvb, precipitacao)

**Views**

- vw_cidade (localidade_pk, latitude, longitude, clima_cidade, clima_estado, clima_regiao, clima_pais)
- vw_estado (estado_pk, cancer_regiao, cancer_pais)
- vw_dia (data_pk, clima_data_completa, clima_dia, clima_mes, clima_semestre, clima_ano)
- vw_ano (ano_pk, cancer_ano, cancer_decada)

In [1]:
import pandas as pd
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from dotenv import load_dotenv
import psycopg2
import os

# Python 3.11
# Java 11
# PySpark == 3.4

In [2]:
spark = SparkSession.builder \
    .appName("OLAP - P2") \
    .config("spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.4,"
            "com.amazonaws:aws-java-sdk-bundle:1.11.1026") \
    .config("spark.jars", "/home/rodrigo/jars/postgresql-42.7.3.jar") \
    .config("spark.jars.ivyLogLevel", "ERROR") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

print(spark.version)

:: loading settings :: url = jar:file:/home/rodrigo/.local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/rodrigo/.ivy2/cache
The jars for the packages stored in: /home/rodrigo/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e1864709-473f-4f88-98d3-21988a2dccaf;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 310ms :: artifacts dl 18ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	:: evicted modules:
	com.amazonaws#aws-java-sdk-bundle;1.11.1026 by [com.amazonaws#aws-java-sdk-bundle;1.12.262] in [default]
	------------------------------------------------------------

3.4.0


## Extract

In [3]:
schema_cancer = StructType([
    StructField("measure_id", IntegerType(), True),
    StructField("measure_name", StringType(), True),
    StructField("location_id", IntegerType(), True),
    StructField("location_name", StringType(), True),
    StructField("sex_id", IntegerType(), True),
    StructField("sex_name", StringType(), True),
    StructField("age_id", IntegerType(), True),
    StructField("age_name", StringType(), True),
    StructField("cause_id", IntegerType(), True),
    StructField("cause_name", StringType(), True),
    StructField("metric_id", IntegerType(), True),
    StructField("metric_name", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("val", DoubleType(), True),
    StructField("upper", DoubleType(), True),
    StructField("lower", DoubleType(), True),
])

df_cancer1 = spark.read.csv("s3a://datalake/cancer-1.csv", schema=schema_cancer, header=True, inferSchema=True)
df_cancer2 = spark.read.csv("s3a://datalake/cancer-2.csv", schema=schema_cancer, header=True, inferSchema=True)
df_cancer = df_cancer1.union(df_cancer2)

print("Numero de Tuplas: ", df_cancer.count())
df_cancer.show()

25/06/25 16:19:33 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

Numero de Tuplas:  520506
+----------+------------+-----------+------------------+------+---------+------+-----------+--------+--------------------+---------+-----------+----+--------------------+--------------------+--------------------+
|measure_id|measure_name|location_id|     location_name|sex_id| sex_name|age_id|   age_name|cause_id|          cause_name|metric_id|metric_name|year|                 val|               upper|               lower|
+----------+------------+-----------+------------------+------+---------+------+-----------+--------+--------------------+---------+-----------+----+--------------------+--------------------+--------------------+
|         1|      Óbitos|       4761|Mato Grosso do Sul|     1|Masculino|     8|15 -19 anos|     459|Melanoma maligno ...|        1|     Número|2001| 0.03482425671072571| 0.04761777852821798|0.025552037408807626|
|         1|      Óbitos|       4761|Mato Grosso do Sul|     2| Feminino|     8|15 -19 anos|     459|Melanoma maligno ...|

In [4]:
schema_clima = StructType([
    StructField("cidade", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("YEAR", IntegerType(), True),
    StructField("DOY", IntegerType(), True),
    StructField("T2M", DoubleType(), True),
    StructField("T2M_MAX", DoubleType(), True),
    StructField("T2M_MIN", DoubleType(), True),
    StructField("ALLSKY_SFC_UV_INDEX", DoubleType(), True),
    StructField("ALLSKY_SFC_UVA", DoubleType(), True),
    StructField("ALLSKY_SFC_UVB", DoubleType(), True),
    StructField("PRECTOTCORR", DoubleType(), True),
    StructField("codigo_ibge", IntegerType(), True),
    StructField("capital", StringType(), True),
    StructField("estado", StringType(), True),
])

df_clima = spark.read.csv("s3a://datalake/climate.csv", schema=schema_clima, header=True, inferSchema=True)

print("Numero de Tuplas: ", df_clima.count())
df_clima.show()



Numero de Tuplas:  42721900


                                                                                

+---------------+--------+---------+----+---+-----+-------+-------+-------------------+--------------+--------------+-----------+-----------+-------+------+
|         cidade|latitude|longitude|YEAR|DOY|  T2M|T2M_MAX|T2M_MIN|ALLSKY_SFC_UV_INDEX|ALLSKY_SFC_UVA|ALLSKY_SFC_UVB|PRECTOTCORR|codigo_ibge|capital|estado|
+---------------+--------+---------+----+---+-----+-------+-------+-------------------+--------------+--------------+-----------+-----------+-------+------+
|Abadia de Goiás|-16.7573| -49.4412|2001|  1|23.12|  26.25|  20.85|               2.29|           1.2|          0.04|      10.58|    5200050|  False| Goiás|
|Abadia de Goiás|-16.7573| -49.4412|2001|  2|21.86|  24.46|  19.76|               1.95|          1.05|          0.03|        4.7|    5200050|  False| Goiás|
|Abadia de Goiás|-16.7573| -49.4412|2001|  3|21.72|  25.39|  18.91|               2.27|           1.2|          0.04|        4.3|    5200050|  False| Goiás|
|Abadia de Goiás|-16.7573| -49.4412|2001|  4|23.14|  27.84

## Transform

In [5]:
df_data = df_clima.withColumn(
    "data_completa",
    F.expr("date_add(to_date(concat(YEAR, '-01-01')), DOY - 1)")
).select(
    "data_completa",
    F.dayofmonth("data_completa").alias("data_dia"),
    F.month("data_completa").alias("data_mes"),
    ((F.month("data_completa")-1)/6).cast("int").alias("data_semestre"),
    F.year("data_completa").alias("data_ano")
).distinct()

windowSpec = Window.orderBy("data_completa")

df_data = df_data.withColumn(
    "data_pk",
    F.row_number().over(windowSpec)
)

df_data = df_data.withColumn(
    "data_decada",
    (F.floor(F.col("data_ano") / 10) * 10).cast("int")
)

df_data = df_data.select("data_pk", "data_completa", "data_dia", "data_mes", "data_semestre", "data_ano", "data_decada")

print("Numero de Tuplas: ", df_data.count())
df_data.show()

25/06/25 16:20:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:20:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:20:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Numero de Tuplas:  7670


25/06/25 16:21:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:21:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:21:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+-------------+--------+--------+-------------+--------+-----------+
|data_pk|data_completa|data_dia|data_mes|data_semestre|data_ano|data_decada|
+-------+-------------+--------+--------+-------------+--------+-----------+
|      1|   2001-01-01|       1|       1|            0|    2001|       2000|
|      2|   2001-01-02|       2|       1|            0|    2001|       2000|
|      3|   2001-01-03|       3|       1|            0|    2001|       2000|
|      4|   2001-01-04|       4|       1|            0|    2001|       2000|
|      5|   2001-01-05|       5|       1|            0|    2001|       2000|
|      6|   2001-01-06|       6|       1|            0|    2001|       2000|
|      7|   2001-01-07|       7|       1|            0|    2001|       2000|
|      8|   2001-01-08|       8|       1|            0|    2001|       2000|
|      9|   2001-01-09|       9|       1|            0|    2001|       2000|
|     10|   2001-01-10|      10|       1|            0|    2001|       2000|

                                                                                

In [6]:
estado_para_regiao = {
    "Acre": "Norte",
    "Alagoas": "Nordeste",
    "Amapá": "Norte",
    "Amazonas": "Norte",
    "Bahia": "Nordeste",
    "Ceará": "Nordeste",
    "Distrito Federal": "Centro-Oeste",
    "Espírito Santo": "Sudeste",
    "Goiás": "Centro-Oeste",
    "Maranhão": "Nordeste",
    "Mato Grosso": "Centro-Oeste",
    "Mato Grosso do Sul": "Centro-Oeste",
    "Minas Gerais": "Sudeste",
    "Pará": "Norte",
    "Paraíba": "Nordeste",
    "Paraná": "Sul",
    "Pernambuco": "Nordeste",
    "Piauí": "Nordeste",
    "Rio de Janeiro": "Sudeste",
    "Rio Grande do Norte": "Nordeste",
    "Rio Grande do Sul": "Sul",
    "Rondônia": "Norte",
    "Roraima": "Norte",
    "Santa Catarina": "Sul",
    "São Paulo": "Sudeste",
    "Sergipe": "Nordeste",
    "Tocantins": "Norte"
}

df_localidade = (
    df_clima.select("cidade", "estado", "latitude", "longitude")
            .distinct()
            .withColumn("pais", F.lit("Brasil"))
            .withColumn("regiao", F.lit(None))
)

for estado, regiao in estado_para_regiao.items():
    df_localidade = df_localidade.withColumn(
        "regiao",
        F.when(F.col("estado") == estado, regiao).otherwise(F.col("regiao"))
    )


windowSpec = Window.orderBy("cidade", "estado", "latitude", "longitude")
df_localidade = (
    df_localidade.withColumn("localidade_pk", F.row_number().over(windowSpec))
                .select("localidade_pk", "latitude", "longitude", "cidade", "estado", "regiao", "pais")
)

print("Numero de Tuplas: ", df_localidade.count())
df_localidade.show(truncate=False)




25/06/25 16:22:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:22:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:22:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Numero de Tuplas:  5570


25/06/25 16:22:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:22:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:22:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------+--------+---------+-------------------+-------------------+------------+------+
|localidade_pk|latitude|longitude|cidade             |estado             |regiao      |pais  |
+-------------+--------+---------+-------------------+-------------------+------------+------+
|1            |-16.7573|-49.4412 |Abadia de Goiás    |Goiás              |Centro-Oeste|Brasil|
|2            |-18.4831|-47.3916 |Abadia dos Dourados|Minas Gerais       |Sudeste     |Brasil|
|3            |-16.197 |-48.7057 |Abadiânia          |Goiás              |Centro-Oeste|Brasil|
|4            |-1.7218 |-48.8788 |Abaetetuba         |Pará               |Norte       |Brasil|
|5            |-19.1551|-45.4444 |Abaeté             |Minas Gerais       |Sudeste     |Brasil|
|6            |-7.3459 |-39.0416 |Abaiara            |Ceará              |Nordeste    |Brasil|
|7            |-8.7207 |-39.1162 |Abaré              |Bahia              |Nordeste    |Brasil|
|8            |-23.3049|-50.3133 |Abatiá          

                                                                                

In [7]:
df_sexo = df_cancer.select(
    F.col("sex_name").alias("sexo")
).distinct()

df_sexo = df_sexo.withColumn("sexo_pk", F.monotonically_increasing_id())

print("Numero de Tuplas: ", df_sexo.count())
df_sexo.show()

                                                                                

Numero de Tuplas:  2
+---------+-------+
|     sexo|sexo_pk|
+---------+-------+
| Feminino|      0|
|Masculino|      1|
+---------+-------+



                                                                                

In [8]:
total_casos = df_cancer.filter(F.col("metric_name") == "Número") \
    .agg(F.sum("val").alias("total_casos_geral")) \
    .collect()[0]["total_casos_geral"]

df_tipo_cancer = (
    df_cancer.filter(F.col("metric_name") == "Número") 
            .groupBy("cause_name")
            .agg(
                F.sum(F.when(F.col("measure_name") == "Óbitos", F.col("val")).otherwise(0)).alias("total_obitos"),
                F.sum(F.when(F.col("measure_name") != "Óbitos", F.col("val")).otherwise(0)).alias("total_casos")
            ) 
            .withColumn(
                "mortalidade",
                F.round(F.when(F.col("total_casos") > 0, (F.col("total_obitos") / F.col("total_casos")) * 100).otherwise(0),2)
            ) 
            .withColumn(
                "taxa_incidencia_total",
                F.round((F.col("total_casos") / F.lit(total_casos)) * 100, 2)
            ) 
            .withColumn(
                "tipo_cancer_pk",
                F.monotonically_increasing_id()
            ) 
            .select(
                "tipo_cancer_pk",
                F.col("cause_name").alias("tipo_cancer"),
                "mortalidade",
                "taxa_incidencia_total"
            )
)

print("Numero de Tuplas: ", df_tipo_cancer.count())
df_tipo_cancer.show(truncate=False)

                                                                                

Numero de Tuplas:  3




+--------------+------------------------------------------------------------+-----------+---------------------+
|tipo_cancer_pk|tipo_cancer                                                 |mortalidade|taxa_incidencia_total|
+--------------+------------------------------------------------------------+-----------+---------------------+
|0             |Câncer de pele não melanoma (carcinoma basocelular)         |0.0        |80.57                |
|1             |Melanoma maligno da pele                                    |10.89      |9.49                 |
|2             |Câncer de pele não melanoma (carcinoma de células escamosas)|14.89      |7.76                 |
+--------------+------------------------------------------------------------+-----------+---------------------+



                                                                                

In [9]:
df_faixa_etaria = (
    df_cancer.select(
        F.col("age_id").alias("id_idade"),
        F.col("age_name").alias("faixa_etaria")
    )
    .distinct()
    .withColumn(
        "faixa_etaria",
        F.regexp_replace(F.regexp_replace(F.col("faixa_etaria"), "anos", ""), " ", "")
    )
    .withColumn(
        "idade_min",
        F.when(
            F.col("faixa_etaria").contains("+"),
            F.regexp_replace(F.col("faixa_etaria"), "[^0-9]", "").cast("int")
        ).otherwise(
            F.split(F.col("faixa_etaria"), "-")[0].cast("int")
        )
    )
    .withColumn(
        "idade_max",
        F.when(
            F.col("faixa_etaria").contains("+"),
            F.lit(130)
        ).otherwise(
            F.split(F.col("faixa_etaria"), "-")[1].cast("int")
        )
    )
    .withColumn("faixa_pk", F.monotonically_increasing_id())
    .select("faixa_pk", "faixa_etaria", "idade_min", "idade_max", "id_idade")
)

print("Numero de Tuplas: ", df_faixa_etaria.count())
df_faixa_etaria.show()

                                                                                

Numero de Tuplas:  20
+--------+------------+---------+---------+--------+
|faixa_pk|faixa_etaria|idade_min|idade_max|id_idade|
+--------+------------+---------+---------+--------+
|       0|       35-39|       35|       39|      12|
|       1|       40-44|       40|       44|      13|
|       2|       25-29|       25|       29|      10|
|       3|       70-74|       70|       74|      19|
|       4|       20-24|       20|       24|       9|
|       5|       65-69|       65|       69|      18|
|       6|       10-14|       10|       14|       7|
|       7|       50-54|       50|       54|      15|
|       8|       80-84|       80|       84|      30|
|       9|       90-94|       90|       94|      32|
|      10|       15-19|       15|       19|       8|
|      11|         95+|       95|      130|     235|
|      12|       75-79|       75|       79|      20|
|      13|       30-34|       30|       34|      11|
|      14|       60-64|       60|       64|      17|
|      15|         5-9| 

                                                                                

In [10]:
df_metrica = (
    df_cancer
    .select(F.col("metric_name").alias("tipo_metrica"))
    .distinct()
    
)

windowSpec = Window.orderBy("tipo_metrica")

df_metrica = (
    df_metrica
    .withColumn("metrica_pk", F.row_number().over(windowSpec))
    .select("metrica_pk", "tipo_metrica")
)

df_metrica.show()

25/06/25 16:23:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:23:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:23:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:23:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:23:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:23:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

+----------+------------+
|metrica_pk|tipo_metrica|
+----------+------------+
|         1|      Número|
|         2|  Percentual|
|         3|        Taxa|
+----------+------------+



In [11]:
df_fato_clima = (
    df_clima.withColumn(
            "data_completa",
            F.expr("date_add(to_date(concat(YEAR, '-01-01')), DOY - 1)")
        ) 
        .join(df_data, ["data_completa"], "left") 
        .join(df_localidade, ["cidade", "latitude", "longitude"], "left") 
        .select(
            F.col("data_pk"),
            F.col("localidade_pk"),
            F.format_string("%.2f",F.col("T2M")).alias("temperatura_media"),
            F.format_string("%.2f",F.col("T2M_MAX")).alias("temperatura_max"),
            F.format_string("%.2f",F.col("T2M_MIN")).alias("temperatura_min"),
            F.format_string("%.2f",F.col("ALLSKY_SFC_UV_INDEX")).alias("radiacao_uv"),
            F.format_string("%.2f",F.col("ALLSKY_SFC_UVA")).alias("radiacao_uva"),
            F.format_string("%.2f",F.col("ALLSKY_SFC_UVB")).alias("radiacao_uvb"),
            F.format_string("%.2f",F.col("PRECTOTCORR")).alias("precipitacao")
        )
)
print("Numero de Tuplas: ", df_fato_clima.count())
df_fato_clima.show()

25/06/25 16:25:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:25:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:25:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:25:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Numero de Tuplas:  42721900


25/06/25 16:25:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:25:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:25:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:25:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:26:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:26:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 1

+-------+-------------+-----------------+---------------+---------------+-----------+------------+------------+------------+
|data_pk|localidade_pk|temperatura_media|temperatura_max|temperatura_min|radiacao_uv|radiacao_uva|radiacao_uvb|precipitacao|
+-------+-------------+-----------------+---------------+---------------+-----------+------------+------------+------------+
|      5|            1|            23.18|          29.38|          17.65|       3.03|        1.65|        0.05|        1.83|
|     11|            1|            21.46|          25.64|          17.72|       1.93|        1.07|        0.03|        4.05|
|     12|            1|            22.58|          27.68|          17.29|       2.12|        1.09|        0.04|        6.40|
|      4|            1|            23.14|          27.84|          18.30|       1.81|        0.95|        0.03|        1.72|
|     20|            1|            24.27|          28.82|          19.70|       3.11|        1.65|        0.05|        2.01|


In [12]:
df_estado = (
    df_localidade
    .filter(F.col("estado").isNotNull())
    .select(F.col("estado"))
    .distinct()
)

window_estado = Window.orderBy("estado")
df_estado = (
    df_estado
    .withColumn("estado_pk", F.row_number().over(window_estado))
    

    .select("estado_pk", "estado")
)

df_ano = (
    df_data
    .select(F.col("data_ano").alias("ano"))
    .distinct()
)

window_ano = Window.orderBy("ano")
df_ano = (
    df_ano
    .withColumn("ano_pk", F.row_number().over(window_ano))
    .select("ano_pk", "ano")
)

df_estado.show()
df_ano.show()

25/06/25 16:26:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:26:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:26:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:27:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:27:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:27:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 1

+---------+-------------------+
|estado_pk|             estado|
+---------+-------------------+
|        1|               Acre|
|        2|            Alagoas|
|        3|              Amapá|
|        4|           Amazonas|
|        5|              Bahia|
|        6|              Ceará|
|        7|   Distrito Federal|
|        8|     Espírito Santo|
|        9|              Goiás|
|       10|           Maranhão|
|       11|        Mato Grosso|
|       12| Mato Grosso do Sul|
|       13|       Minas Gerais|
|       14|             Paraná|
|       15|            Paraíba|
|       16|               Pará|
|       17|         Pernambuco|
|       18|              Piauí|
|       19|Rio Grande do Norte|
|       20|  Rio Grande do Sul|
+---------+-------------------+
only showing top 20 rows





+------+----+
|ano_pk| ano|
+------+----+
|     1|2001|
|     2|2002|
|     3|2003|
|     4|2004|
|     5|2005|
|     6|2006|
|     7|2007|
|     8|2008|
|     9|2009|
|    10|2010|
|    11|2011|
|    12|2012|
|    13|2013|
|    14|2014|
|    15|2015|
|    16|2016|
|    17|2017|
|    18|2018|
|    19|2019|
|    20|2020|
+------+----+
only showing top 20 rows



25/06/25 16:28:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:28:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:28:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [13]:

df_fato_cancer = (
    df_cancer
    .join(df_estado, df_cancer.location_name == df_estado.estado, "left")
    .join(df_sexo, df_cancer.sex_name == df_sexo.sexo, "left")
    .join(df_faixa_etaria, df_cancer.age_id == df_faixa_etaria.id_idade, "left") # olhar
    .join(df_tipo_cancer, df_cancer.cause_name == df_tipo_cancer.tipo_cancer, "left")
    .join(df_ano, df_cancer.year == df_ano.ano, "left")
    .join(df_metrica, df_cancer.metric_name == df_metrica.tipo_metrica, "left")
    .groupBy("ano_pk", "estado_pk", "sexo_pk", "faixa_pk", "tipo_cancer_pk", "metrica_pk")
    .pivot("measure_name")
    .agg(F.sum("val"))
)

group_cols = ["ano_pk", "estado_pk", "sexo_pk", "faixa_pk", "tipo_cancer_pk", "metrica_pk"]
pivot_cols = [c for c in df_fato_cancer.columns if c not in group_cols]

for col_name in pivot_cols:
    df_fato_cancer = df_fato_cancer.withColumn(
        col_name,
        F.format_string("%.3f", F.coalesce(F.col(col_name), F.lit(0.0)))
    )

df_fato_cancer = df_fato_cancer \
    .withColumnRenamed("Incidência", "incidencia_cancer") \
    .withColumnRenamed("Prevalência", "prevalencia_cancer") \
    .withColumnRenamed("Óbitos", "obitos_cancer")

print("Numero de Tuplas: ", df_fato_cancer.count())
df_fato_cancer.show()

25/06/25 16:28:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:28:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:28:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:28:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:28:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:28:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 1

Numero de Tuplas:  204120


25/06/25 16:29:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:29:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:29:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:29:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:29:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:29:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 1

+------+---------+-------+--------+--------------+----------+-----------------+------------------+-------------+
|ano_pk|estado_pk|sexo_pk|faixa_pk|tipo_cancer_pk|metrica_pk|incidencia_cancer|prevalencia_cancer|obitos_cancer|
+------+---------+-------+--------+--------------+----------+-----------------+------------------+-------------+
|    18|       14|      1|       5|             1|         3|           11.450|            29.483|        7.128|
|    14|       27|      0|      13|             1|         3|            0.557|             4.397|        0.153|
|     4|        8|      1|       3|             2|         1|            1.607|             2.491|        1.675|
|     9|       11|      0|       8|             1|         2|            0.000|             0.000|        0.001|
|    16|        2|      0|      16|             1|         1|            0.557|             0.327|        0.577|
|    16|       14|      0|       8|             0|         2|            0.001|             0.00

                                                                                

## Load

In [14]:
load_dotenv() 

DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")

In [16]:
def salvar(df, tabela):
    url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"
    df.write \
      .format("jdbc") \
      .option("url", url) \
      .option("dbtable", tabela) \
      .option("user", DB_USER) \
      .option("password", DB_PASSWORD) \
      .option("driver", "org.postgresql.Driver") \
      .mode('overwrite') \
      .save()

salvar(df_data, "dim_data")
salvar(df_localidade, "dim_localidade")
salvar(df_sexo, "dim_sexo")
salvar(df_faixa_etaria, "dim_faixa_etaria")
salvar(df_tipo_cancer, "dim_tipo_cancer")
salvar(df_metrica, "dim_metrica")
salvar(df_fato_clima, "fato_clima")
salvar(df_fato_cancer, "fato_cancer")

25/06/25 16:34:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:34:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:34:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:34:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:34:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 16:34:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/25 1

In [17]:
def executa_sql_postgres(sql_texto):
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )
    try:
        with conn:
            with conn.cursor() as cur:
                cur.execute(sql_texto)
    finally:
        conn.close()

sql_dim_ano = """
CREATE OR REPLACE VIEW vw_ano AS
SELECT
    ROW_NUMBER() OVER (ORDER BY data_ano) AS ano_pk,
    data_ano AS cancer_ano,
    data_decada AS cancer_decada
FROM (
    SELECT DISTINCT
        data_ano,
        data_decada
    FROM
        dim_data
) AS sub;
"""

sql_dim_dia = """
CREATE OR REPLACE VIEW vw_dia AS
SELECT
    data_pk,
    data_completa AS clima_data_completa,
    data_dia AS clima_dia,
    data_mes AS clima_mes,
    data_ano AS clima_ano,
    data_decada AS clima_decada
FROM dim_data;
"""

sql_dim_estado = """
CREATE OR REPLACE VIEW vw_estado AS
SELECT
    ROW_NUMBER() OVER (ORDER BY estado) AS estado_pk,
    estado AS cancer_estado,
    regiao AS cancer_regiao,
    pais AS cancer_pais
FROM (
    SELECT DISTINCT
        estado,
        regiao,
        pais
    FROM
        dim_localidade
) AS sub;
"""

sql_dim_cidade = """
CREATE OR REPLACE VIEW vw_cidade AS
SELECT
    localidade_pk,
    latitude,
    longitude,
    cidade AS clima_cidade,
    estado AS clima_estado,
    regiao AS clima_regiao,
    pais AS clima_pais
FROM dim_localidade;
"""

executa_sql_postgres(sql_dim_ano)
executa_sql_postgres(sql_dim_dia)
executa_sql_postgres(sql_dim_cidade)
executa_sql_postgres(sql_dim_estado)