### Setting up file paths

In [None]:
workdir = './work/'
parquetdir = './parquet/'
source_csv = 'votacao_candidato_munzona_2022_BRASIL.csv'
extracted_file = workdir +'extracted/'+source_csv

### Creating Spark session

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark_session = SparkSession.builder.appName('spark').getOrCreate()

### Reading file

In [None]:
from pyspark.sql.dataframe import DataFrame

df: DataFrame = spark_session.read.options(header="true", delimiter=";", encoding="ISO-8859-1").csv(extracted_file)


### Selecting relevant columns

In [None]:
# Defining relevant columns
relevant_columns=[
    "NR_TURNO",
    "DS_ELEICAO",
    "TP_ABRANGENCIA",
    "SG_UF",
    "NM_MUNICIPIO",
    "NR_ZONA",
    "DS_CARGO",
    "NR_CANDIDATO",
    "NM_CANDIDATO",
    "NM_URNA_CANDIDATO",
    "DS_SITUACAO_CANDIDATURA",
    "NR_PARTIDO",
    "SG_PARTIDO",
    "NM_PARTIDO",
    "NM_COLIGACAO",
    "DS_COMPOSICAO_COLIGACAO",
    "ST_VOTO_EM_TRANSITO",
    "QT_VOTOS_NOMINAIS",
    "NM_TIPO_DESTINACAO_VOTOS",
    "QT_VOTOS_NOMINAIS_VALIDOS",
    "DS_SIT_TOT_TURNO"
]

# Selecting relevant columns
selected_columns_df = df.select(relevant_columns)

### Printing first lines

In [None]:
selected_columns_df.show()

### Defining transformation functions

In [None]:
import pyspark.sql.functions as F
from typing import Dict, List
from pyspark.sql.dataframe import DataFrame, Column

def get_columns_list_from_dimension(dimension: Dict[str, List[str]]):
    return [col for cols in dimension for col in cols]

def get_table_name_and_records(dataframe: DataFrame, dimension_table_name_and_columns: Dict[str, List[str]]) -> List[tuple[str, DataFrame]]:
    dimensions = []

    for dimension_table_name, dimension_columns in dimension_table_name_and_columns:
        dimension_records = dataframe.select(*dimension_columns).distinct()
        surrogate_key_column_name = f"sk_{dimension_table_name.replace('dim_', '')}"

        # add unique and increasing id to dimension (but not consecutive)
        unique_and_increasing_id = F.monotonically_increasing_id()
        dimension_records = dimension_records.withColumn(
            surrogate_key_column_name,
            unique_and_increasing_id
        )

        dimension_table_in_tuple = (dimension_table_name, dimension_records)

        dimensions.append(dimension_table_in_tuple)
    
    return dimensions


def transform_spark_dataframe_into_star_schema(
    original_dataframe: DataFrame,
    fact_columns: List[str]  = ["col1", "col2"],
    fact_table_name = "tabela_fato",
    mapping_dimension_columns: Dict[str, List[str]] = {'dim1':["col3", "col4"], "dim2":["col5", "col6"]},
):
    dimension_columns_separated_by_dimension = mapping_dimension_columns.values()

    dimension_columns = get_columns_list_from_dimension(dimension_columns_separated_by_dimension)

    columns_from_fact_and_dimension = fact_columns + dimension_columns

    original_dataframe = original_dataframe.select(*columns_from_fact_and_dimension)

    dimension_table_name_and_columns = mapping_dimension_columns.items()

    dimensions = get_table_name_and_records(original_dataframe, dimension_table_name_and_columns)

    # Substitui as colunas de dimensão pelo respectivo SK na tabela fato
    # ------------------------------------------------------------------
    for table_name, records in dimensions:
        # join the dimension dataframe to the original dataframe
        dimension_columns_by_dimension_from_dataframe = [
            original_dataframe[column] == records[column]
            for column in mapping_dimension_columns[table_name]
        ]
        
        original_dataframe = original_dataframe.join(
            F.broadcast(records), 
            on=dimension_columns_by_dimension_from_dataframe,
            how="left"
        )

        # drop the original columns
    original_dataframe = original_dataframe.drop(*mapping_dimension_columns[table_name])

    fact_table = (fact_table_name, original_dataframe)
    
    return dimensions + [fact_table]

### Executing transformation

In [None]:
star_schema = transform_spark_dataframe_into_star_schema(
    selected_columns_df,
    fact_columns=["QT_VOTOS_NOMINAIS_VALIDOS", "QT_VOTOS_NOMINAIS"],
    fact_table_name="tabela_fato",
    mapping_dimension_columns={
        'dim_municipio': ["SG_UF", "NM_MUNICIPIO"],
        'dim_cargo': ["DS_CARGO"],
        'dim_ds_eleicao':["DS_ELEICAO"],
        'dim_partido':["SG_PARTIDO","NM_PARTIDO", "NR_PARTIDO"],
        'dim_candidato':["NM_CANDIDATO", "NR_CANDIDATO", "NM_URNA_CANDIDATO"],
        'dim_turno':["NR_TURNO"],
        'dim_tp_agrangencia':["TP_ABRANGENCIA"],
        'dim_zona':["NR_ZONA"],
        'dim_situacao_candidatura':["DS_SITUACAO_CANDIDATURA"],
        'dim_coligacao':["NM_COLIGACAO", "DS_COMPOSICAO_COLIGACAO"],
        "dim_voto_transito":["ST_VOTO_EM_TRANSITO"],
        'dim_situacaof_turno':["DS_SIT_TOT_TURNO"],
        'dim_destinacao_voto':["NM_TIPO_DESTINACAO_VOTOS"]
    },   
)

### Setting up connection parameters

In [None]:
hostname_or_ip = "dw"
port = "5432"
db = "star"
user = "star"
password = "star"

db_url = "jdbc:postgresql://" + hostname_or_ip + ":" + port + "/" + db

properties = {
    "user": user,
    "password": password,
    "driver": "org.postgresql.Driver"
}


### Writing to DW

In [None]:
for item in star_schema:
    table_name, original_dataframe = item
    print(f"Writing {table_name} to DW")
    original_dataframe.write.jdbc(url=db_url, table=table_name, mode="overwrite", properties=properties)

### Cleaning up

In [None]:
# Stopping spark session
# spark_session.stop()

# Cleaning up files 
# Delete the directory and all its contents
# import shutil

# shutil.rmtree(workdir+'extracted/')