In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from pyspark.sql.types import StringType, StructField, StructType, IntegerType, FloatType
from pyspark.sql.functions import when, col, monotonically_increasing_id, concat, lit, expr

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/warehouse'

builder = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Python Spark DataFrames and SQL") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .enableHiveSupport() \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
#-----------------------------------------Silver------------------------------------------------------

In [None]:
# Gender Data

In [None]:
hdfs_path = "hdfs://hdfs-nn:9000/Datasets/Bronze/Gender_Stats_Dataset/Gender_Stats Data.csv"

In [None]:
#Renomear as colunas de modo a terem nomes que coincidam com o que se encontra nas respetivas linhas, substituindo os espaços
#por _ . Alteração do formato de certas colunas e ler o csv com o "customSchema" definido
common_fields = [
    "Country_Region_Situation_Name",
    "Country_Region_Situation_Code",
    "Indicator_Name",
    "Indicator_Code"
]

years_range = range(1960, 2023)

fields = [
    StructField(field, StringType() if field in common_fields else FloatType(), True)
    for field in common_fields + [str(year) for year in years_range]
]

customSchema = StructType(fields)

gender_stats_data_df = spark \
            .read\
            .option("delimiter",",")\
            .option("header","true")\
            .schema(customSchema) \
            .csv(hdfs_path)

In [None]:
#Criação uma coluna com chaves artificiais com valores sequenciais, colocando-a em primeiro lugar (GSD = Gender Stats Data)
gender_stats_data_df = (
    gender_stats_data_df
    .withColumn("Gender_Stats_Data_ID", concat(lit("GSD"), monotonically_increasing_id())
    .cast("string"))
    .select("Gender_Stats_Data_ID", *gender_stats_data_df.columns[0:])
)

In [None]:
#Criação de uma nova coluna referente ao genero quando encontradas certas palavras chave, colocando-a em sexto lugar 
#Cria coerência com os restantes datasets
gender_stats_data_df = gender_stats_data_df.withColumn(
    "Gender",
    when(
        (col("Indicator_Name").contains("female")) | (col("Indicator_Name").contains("woman")),
        "Female"
    ).when(
        (col("Indicator_Name").contains("male")) | (col("Indicator_Name").contains("man")),
        "Male"
    ).otherwise("Both")
    .cast("string")
).select(
    "Gender_Stats_Data_ID",
    "Country_Region_Situation_Name",
    "Country_Region_Situation_Code",
    "Indicator_Name",
    "Indicator_Code",
    "Gender",
    *gender_stats_data_df.columns[5:]
)

In [None]:
#Mapeamento de cada intervalo de idades, atribuindo-lhes uma label referntes ao grupo etário a que pertencem
#Cria coerência com os restantes datasets
age_group_mapping = {
    "00": "Youth 0-24 years",
    "01": "Youth 0-24 years",
    "02": "Youth 0-24 years",
    "03": "Youth 0-24 years",
    "04": "Youth 0-24 years",
    "05": "Youth 0-24 years",
    "0-4": "Youth 0-24 years",
    "15-19": "Youth 0-24 years",
    "5-14": "Youth 0-24 years",
    "7-14": "Youth 0-24 years",
    "15-24": "Youth 0-24 years",
    "12-23": "Youth 0-24 years",
    "-5": "Youth 0-24 years",
    "-15": "Youth 0-24 years",
    "by 15": "Youth 0-24 years",
    "0-14": "Youth 0-24 years",
    "5-9": "Youth 0-24 years",
    "15-19": "Youth 0-24 years",
    "20-24": "Youth 0-24 years",
    "infant": "Youth 0-24 years",
    "before 15": "Youth 0-24 years",
    "before 18": "Youth 0-24 years",
    "before 22": "Youth 0-24 years",
    "25-29": "Adult 25-34 years",
    "30-24": "Adult 25-34 years",
    "35-39": "Adult 35+ years",
    "40-44": "Adult 35+ years",
    "45-49": "Adult 35+ years",
    "50-54": "Adult 35+ years",
    "55-59": "Adult 35+ years",
    "60-64": "Adult 35+ years",
    "65+": "Adult 35+ years",
    "65-69": "Adult 35+ years",
    "70-74": "Adult 35+ years",
    "75-89": "Adult 35+ years",
    "80+": "Adult 35+ years",
    "45-54": "Adult 35+ years",
    "55-64": "Adult 35+ years",
    "35-44": "Adult 35+ years",
    "60+": "Adult 35+ years",
    "18+": "Every category",
    "15+": "Every category",
    "15-59": "Every category",
    "15-49": "Every category",
    "15-64": "Every category",
    "25+": "All adult categories",
    "30-70": "All adult categories",
    "adult": "All adult categories"
}

#Loop "for" que a cada a cada iteração sobre os itens do mapeamento anterior cria uma condição usando a função "when"
condition = None
for age_group, label in age_group_mapping.items():
    if condition is None:
        condition = when(col("Indicator_Name").contains(age_group), label)
    else:
        condition = condition.when(col("Indicator_Name").contains(age_group), label)

#Criação de uma nova coluna baseada na condição criada para a coluna "Indicator_Name", colocando-a em sétimo lugar
gender_stats_data_df = gender_stats_data_df.withColumn("Uniform_Age_Group", condition).select(
    "Gender_Stats_Data_ID",
    "Country_Region_Situation_Name",
    "Country_Region_Situation_Code",
    "Indicator_Name",
    "Indicator_Code",
    "Gender",
    "Uniform_Age_Group",
    *gender_stats_data_df.columns[6:]
)

In [None]:
gender_stats_data_df.write \
                    .mode("overwrite") \
                    .partitionBy("Country_Region_Situation_Name") \
                    .format("delta") \
                    .option("overwriteSchema", "true") \
                    .save("hdfs://hdfs-nn:9000/warehouse/Projeto.db/Gender_Stats_Data_table")

In [None]:
# Gender Series

In [2]:
hdfs_path = "hdfs://hdfs-nn:9000/Datasets/Bronze/Gender_Stats_Dataset/Gender_Stats Series.csv"

In [3]:
gender_stats_series_df = spark.read.option("delimiter",",").option("header","true").csv(hdfs_path)

In [4]:
#Substitui os espaços nos nomes das colunas por _
new_columns = [col(c).alias(c.replace(" ", "_")) for c in gender_stats_series_df.columns]
gender_stats_series_df = gender_stats_series_df.select(*new_columns)

In [5]:
#Criação uma coluna com chaves artificiais com valores sequenciais, colocando-a em primeiro lugar (GSS = Gender Stats Series)
gender_stats_series_df = (
    gender_stats_series_df
    .withColumn("Gender_Stats_Series_ID", concat(lit("GSS"), monotonically_increasing_id())
    .cast("string"))
    .select("Gender_Stats_Series_ID", *gender_stats_series_df.columns[0:])
)

In [6]:
#Remoção da última coluna que não continha dados, apenas foi lida pela função "read" porque os dados no CSV terminam com o delimitador ","
gender_stats_series_df = gender_stats_series_df.drop("_c20")

In [7]:
#Substituição dos dados em branco por uma frase que indica a inexistência de informação sobre esses respetivos dados
gender_stats_series_df = gender_stats_series_df.na.fill('Criteria Not Available')

In [8]:
gender_stats_series_df.write \
                    .mode("overwrite") \
                    .format("delta") \
                    .save("hdfs://hdfs-nn:9000/warehouse/Projeto.db/Gender_Stats_Series_table")

In [None]:
#-----------------------------------------Gold------------------------------------------------------

In [26]:
# Ler os dados das tabelas de silver
gender_stats_data_df = spark.table("Projeto.Gender_Stats_Data_table")
gender_stats_series_df = spark.table("Projeto.Gender_Stats_Series_table")

#Filtrar os dados 
gender_stats_data_df = gender_stats_data_df \
                       .filter(col("Indicator_Name").contains("Mortality rate,") |
                               col("Indicator_Name").contains("Number of infant deaths") |
                               col("Indicator_Name").contains("Population ages") |
                               col("Indicator_Name").contains("Intentional homicides"))

gender_stats_series_df = gender_stats_series_df \
                         .filter(col("Indicator_Name").contains("Mortality rate,") |
                                 col("Indicator_Name").contains("Number of infant deaths") |
                                 col("Indicator_Name").contains("Population ages") |
                                 col("Indicator_Name").contains("Intentional homicides"))

In [27]:
# Criar a dimensão Country
dim_country_df = gender_stats_data_df \
                 .select("Country_Region_Situation_Name", 
                         "Country_Region_Situation_Code") \
                 .distinct() \
                 .orderBy("Country_Region_Situation_Name")

#Criação uma coluna com chaves artificiais com valores sequenciais, colocando-a em primeiro lugar (CD = Country Dimension)
dim_country_df = dim_country_df \
                .withColumn("Country_Dimension_ID", concat(lit("CD"), monotonically_increasing_id())
                .cast("string")) \
                .select("Country_Dimension_ID", *dim_country_df.columns[0:])

# Escrever para a tabela delta
dim_country_df.write \
              .format("delta") \
              .mode("overwrite") \
              .option("overwriteSchema", "true") \
              .save("hdfs://hdfs-nn:9000/warehouse/Projeto.db/dim_country")

In [28]:
# Criar a dimensão Temporal Measurement
dim_temporal_measurement_df = gender_stats_series_df \
                             .select("Periodicity",
                                     "Base_Period") \
                             .distinct()

#Criação uma coluna com chaves artificiais com valores sequenciais, colocando-a em primeiro lugar (TMD = Temporal Measurement Dimension)
dim_temporal_measurement_df = dim_temporal_measurement_df \
                              .withColumn("Temporal_Measurement_ID", concat(lit("TMD"), monotonically_increasing_id())
                              .cast("string")) \
                              .select("Temporal_Measurement_ID", *dim_temporal_measurement_df.columns[0:])

# Escrever para a tabela delta
dim_temporal_measurement_df.write \
                           .format("delta") \
                           .mode("overwrite") \
                           .option("overwriteSchema", "true") \
                           .save("hdfs://hdfs-nn:9000/warehouse/Projeto.db/dim_temporal_measurement")

In [29]:
# Criar a dimensão Sources
dim_sources_df = gender_stats_series_df \
                .select("Notes_from_original_source",
                        "Source",
                        "Related_source_links",
                        "Other_web_links") \
                .distinct()

#Criação uma coluna com chaves artificiais com valores sequenciais, colocando-a em primeiro lugar (SD = Source Dimension)
dim_sources_df = dim_sources_df \
                .withColumn("Sources_Dimension_ID", concat(lit("SD"), monotonically_increasing_id())
                .cast("string")) \
                .select("Sources_Dimension_ID", *dim_sources_df.columns[0:])

# Escrever para a tabela delta
dim_sources_df.write \
              .format("delta") \
              .mode("overwrite") \
              .option("overwriteSchema", "true") \
              .save("hdfs://hdfs-nn:9000/warehouse/Projeto.db/dim_sources")

In [30]:
# Criar a dimensão Measurement Methods
dim_measurement_methods_df = gender_stats_series_df \
                            .select("Unit_of_measure",
                                    "Aggregation_method") \
                            .distinct()

#Criação uma coluna com chaves artificiais com valores sequenciais, colocando-a em primeiro lugar (MMD = Measurement Methods)
dim_measurement_methods_df = dim_measurement_methods_df \
                            .withColumn("Measurement_Methods_ID", concat(lit("MMD"), monotonically_increasing_id())
                            .cast("string")) \
                            .select("Measurement_Methods_ID", *dim_measurement_methods_df.columns[0:])

# Escrever para a tabela delta
dim_measurement_methods_df.write \
                          .format("delta") \
                          .mode("overwrite") \
                          .option("overwriteSchema", "true") \
                          .save("hdfs://hdfs-nn:9000/warehouse/Projeto.db/dim_measurement_methods")

In [31]:
# Criar a dimensão Gender Series

# Join da informação necessária de ambas as tabelas
# Pivotagem das várias colunas de anos em linhas
gender_stats_df = gender_stats_data_df \
                  .join(gender_stats_series_df, gender_stats_data_df.Indicator_Code == gender_stats_series_df.Series_Code) \
                  .select("Projeto.Gender_Stats_Data_table.Indicator_Name", "Projeto.Gender_Stats_Data_table.Indicator_Code",
                          "Projeto.Gender_Stats_Data_table.Country_Region_Situation_Code", "Projeto.Gender_Stats_Series_table.Periodicity",
                          "Projeto.Gender_Stats_Series_table.Source", "Projeto.Gender_Stats_Series_table.Aggregation_method",
                          "Projeto.Gender_Stats_Data_table.Gender", "Projeto.Gender_Stats_Data_table.Uniform_Age_Group", 
                          "Projeto.Gender_Stats_Series_table.Topic", "Projeto.Gender_Stats_Series_table.Short_definition",
                          "Projeto.Gender_Stats_Series_table.Long_definition", "Projeto.Gender_Stats_Series_table.Other_notes",
                          "Projeto.Gender_Stats_Series_table.Limitations_and_exceptions", "Projeto.Gender_Stats_Series_table.General_comments", 
                          "Projeto.Gender_Stats_Series_table.Statistical_concept_and_methodology", "Projeto.Gender_Stats_Series_table.Development_relevance",
                          "Projeto.Gender_Stats_Series_table.Related_indicators", "Projeto.Gender_Stats_Series_table.License_Type",
                          expr("stack(63, 'Projeto.Gender_Stats_Data_table.1960', `1960`, 'Projeto.Gender_Stats_Data_table.1961', `1961`, 'Projeto.Gender_Stats_Data_table.1962', `1962`, 'Projeto.Gender_Stats_Data_table.1963', `1963`, 'Projeto.Gender_Stats_Data_table.1964', `1964`, 'Projeto.Gender_Stats_Data_table.1965', `1965`, 'Projeto.Gender_Stats_Data_table.1966', `1966`, 'Projeto.Gender_Stats_Data_table.1967', `1967`, 'Projeto.Gender_Stats_Data_table.1968', `1968`, 'Projeto.Gender_Stats_Data_table.1969', `1969`, 'Projeto.Gender_Stats_Data_table.1970', `1970`, 'Projeto.Gender_Stats_Data_table.1971', `1971`, 'Projeto.Gender_Stats_Data_table.1972', `1972`, 'Projeto.Gender_Stats_Data_table.1973', `1973`, 'Projeto.Gender_Stats_Data_table.1974', `1974`, 'Projeto.Gender_Stats_Data_table.1975', `1975`, 'Projeto.Gender_Stats_Data_table.1976', `1976`, 'Projeto.Gender_Stats_Data_table.1977', `1977`, 'Projeto.Gender_Stats_Data_table.1978', `1978`, 'Projeto.Gender_Stats_Data_table.1979', `1979`, 'Projeto.Gender_Stats_Data_table.1980', `1980`, 'Projeto.Gender_Stats_Data_table.1981', `1981`, 'Projeto.Gender_Stats_Data_table.1982', `1982`, 'Projeto.Gender_Stats_Data_table.1983', `1983`, 'Projeto.Gender_Stats_Data_table.1984', `1984`, 'Projeto.Gender_Stats_Data_table.1985', `1985`, 'Projeto.Gender_Stats_Data_table.1986', `1986`, 'Projeto.Gender_Stats_Data_table.1987', `1987`, 'Projeto.Gender_Stats_Data_table.1988', `1988`, 'Projeto.Gender_Stats_Data_table.1989', `1989`, 'Projeto.Gender_Stats_Data_table.1990', `1990`, 'Projeto.Gender_Stats_Data_table.1991', `1991`, 'Projeto.Gender_Stats_Data_table.1992', `1992`, 'Projeto.Gender_Stats_Data_table.1993', `1993`, 'Projeto.Gender_Stats_Data_table.1994', `1994`, 'Projeto.Gender_Stats_Data_table.1995', `1995`, 'Projeto.Gender_Stats_Data_table.1996', `1996`, 'Projeto.Gender_Stats_Data_table.1997', `1997`, 'Projeto.Gender_Stats_Data_table.1998', `1998`, 'Projeto.Gender_Stats_Data_table.1999', `1999`, 'Projeto.Gender_Stats_Data_table.2000', `2000`, 'Projeto.Gender_Stats_Data_table.2001', `2001`, 'Projeto.Gender_Stats_Data_table.2002', `2002`, 'Projeto.Gender_Stats_Data_table.2003', `2003`, 'Projeto.Gender_Stats_Data_table.2004', `2004`, 'Projeto.Gender_Stats_Data_table.2005', `2005`, 'Projeto.Gender_Stats_Data_table.2006', `2006`, 'Projeto.Gender_Stats_Data_table.2007', `2007`, 'Projeto.Gender_Stats_Data_table.2008', `2008`, 'Projeto.Gender_Stats_Data_table.2009', `2009`, 'Projeto.Gender_Stats_Data_table.2010', `2010`, 'Projeto.Gender_Stats_Data_table.2011', `2011`, 'Projeto.Gender_Stats_Data_table.2012', `2012`, 'Projeto.Gender_Stats_Data_table.2013', `2013`, 'Projeto.Gender_Stats_Data_table.2014', `2014`, 'Projeto.Gender_Stats_Data_table.2015', `2015`, 'Projeto.Gender_Stats_Data_table.2016', `2016`, 'Projeto.Gender_Stats_Data_table.2017', `2017`, 'Projeto.Gender_Stats_Data_table.2018', `2018`, 'Projeto.Gender_Stats_Data_table.2019', `2019`, 'Projeto.Gender_Stats_Data_table.2020', `2020`, 'Projeto.Gender_Stats_Data_table.2021', `2021`, 'Projeto.Gender_Stats_Data_table.2022', `2022`) as (Year,Number)"))

# Colocar o formato dos dados da coluna "Year" como xxxx (ex: 1960)
gender_stats_df = gender_stats_df.withColumn("Year", expr("substring(Year, 33, 4)").cast("int"))

# Filtrar para os anos necessários
gender_stats_df = gender_stats_df.filter((col("Year") >= 2010) & (col("Year") <= 2022))

#Criação uma coluna com chaves artificiais com valores sequenciais, colocando-a em primeiro lugar (GS = Gender Series)
dim_gender_series_df = gender_stats_df \
                       .withColumn("Gender_Series_ID", concat(lit("GS"), monotonically_increasing_id())
                       .cast("string")) \
                       .select("Gender_Series_ID", *gender_stats_df.columns[0:])

In [32]:
# Criação da tabela de factos Gender Data

# Juntar o ID da dimensão Country
gd_gs_df = dim_gender_series_df \
           .join(dim_country_df, dim_gender_series_df.Country_Region_Situation_Code == dim_country_df.Country_Region_Situation_Code) \
           .select("Gender_Series_ID", "Country_Dimension_ID", "Periodicity", "Source", "Aggregation_method", "Indicator_Code", "Number")

In [33]:
# Juntar o ID da dimensão Temporal Measurement
gd_tm_df = gd_gs_df \
           .join(dim_temporal_measurement_df, gd_gs_df.Periodicity == dim_temporal_measurement_df.Periodicity) \
           .select("Gender_Series_ID", "Country_Dimension_ID", "Temporal_Measurement_ID", "Source", "Aggregation_method", "Indicator_Code", "Number")

In [34]:
# Juntar o ID da dimensão Sources
gd_s_df = gd_tm_df \
          .join(dim_sources_df, gd_tm_df.Source == dim_sources_df.Source) \
          .select("Gender_Series_ID", "Country_Dimension_ID", "Temporal_Measurement_ID", "Sources_Dimension_ID", "Aggregation_method", "Indicator_Code", "Number")

In [35]:
# Juntar o Id da dimensão Measurement Methods
gd_mm_df = gd_s_df \
          .join(dim_measurement_methods_df, gd_s_df.Aggregation_method == dim_measurement_methods_df.Aggregation_method) \
          .select("Gender_Series_ID", "Country_Dimension_ID", "Temporal_Measurement_ID", "Sources_Dimension_ID", "Measurement_Methods_ID", "Indicator_Code", 
                  "Number")

In [37]:
# Foi colocada aqui a elimação de colunas da dimensão Gender Series pois era necessário
# anteriormente estas mesmo para juntar as outras dimensões
#Eliminar colunas que não fazem parte da dimensão
dim_gender_series_df = dim_gender_series_df.drop("Number")
dim_gender_series_df = dim_gender_series_df.drop("Country_Region_Situation_Code")
dim_gender_series_df = dim_gender_series_df.drop("Periodicity")
dim_gender_series_df = dim_gender_series_df.drop("Source")
dim_gender_series_df = dim_gender_series_df.drop("Aggregation_method")

In [38]:
# Tabela de Factos
gender_data_df = gd_mm_df \
                 .withColumnRenamed("Number", "Indicator_Number")

In [41]:
# Escrever para a tabela delta a dimensão Gender Series
dim_gender_series_df.write \
                    .format("delta") \
                    .mode("overwrite") \
                    .option("overwriteSchema", "true") \
                    .save("hdfs://hdfs-nn:9000/warehouse/Projeto.db/dim_gender_series")

In [42]:
# Escrever para a tabela delta
gender_data_df.write \
              .format("delta") \
              .mode("overwrite") \
              .option("overwriteSchema", "true") \
              .save("hdfs://hdfs-nn:9000/warehouse/Projeto.db/gender_data")

In [43]:
spark.stop()