# Notebook TFM: ETL - Transformación de datos RAW a CURATED
<div style="background-color:#F2EDED;">
<br/>
<div>
<img src="https://uploads-ssl.webflow.com/614b1fe22fa8b90ef41aeffe/6265cb48f9496b1cefc9ab75_logotipo-mbit-39.png" width="200px" align="left" CLASS="TextWrap" style="background-color:#2a3f3f; margin-left: 10px;">
<img src="https://branding-guidelines.msf.es/esp/imgs/logo/Logo-01.jpg" width="100px" align="right" CLASS="TextWrap" style="background-color:#2a3f3f;">
</div>
<br/>
<br/>
<br/>
<div>
<h1><font color="#2a3f3f" size=4 style="margin-left: 10px;">MODELO DE PROBABILIDAD A TESTAR EN MÉDICOS SIN FRONTERAS</font></h1>
</div>
<br/>
<div style="text-align: right; margin-right: 10px; margin-bottom: 10px;">
<font color="#2a3f3f" size=3>Elio López Salamanca </font><br>
<font color="#2a3f3f" size=3>Sergio Israel Calleja Chimeno</font><br>
</div>
</div>

In [2]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [23]:
spark

Calculation started (calculation_id=5ec5a984-045a-3c0d-1863-c40e3f628662) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
<pyspark.sql.session.SparkSession object at 0x7f90a02bb0>



In [1]:
import logging
import re
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.functions import sha2, concat_ws, current_timestamp
from pyspark.sql.functions import col, regexp_replace, translate, StringType

Calculation started (calculation_id=5ec5a984-0594-4fa9-ed54-db076cd9a470) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
Execution Interrupted. Attempting to stop calculation execution (calculation_id=5ec5a984-0594-4fa9-ed54-db076cd9a470)
Calculation 5ec5a984-0594-4fa9-ed54-db076cd9a470 is in state: COMPLETED
Calculation completed.


### 1.  Extracción de los datos utilizando el Metastore de Glue

In [25]:
# Extracción de lo datos raw
def extract_data(table_name: str) -> DataFrame:
    try:
        spark.sql("USE msfdb_raw")
        df = spark.sql(f"SELECT * FROM msfdb_raw.{table_name}")
        logger.info(f"Data extracted from {table_name}")
        
        return df
    except Exception as e:
        logger.error(f"Error extracting data from {table_name}: {str(e)}")
        raise

Calculation started (calculation_id=3cc5a984-0757-acb1-f8a9-0e71e82d7cbd) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


### 2. Aquí empieza las definiciones de funciones para la transformación y limpieza de los datos

In [26]:
def rename_columns(df: DataFrame) -> (DataFrame):
    prefix_patterns = ["npsp__", "npsp4hub", "npo02__", "npe0[1-9]__", "msf_"]
    suffix_patterns = ["__c"]
    
    prefix_regex = re.compile("|".join(prefix_patterns))
    suffix_regex = re.compile("|".join(suffix_patterns))
        
    def new_column_name(old_name):
        new_name = prefix_regex.sub("", old_name)
        new_name = suffix_regex.sub("", new_name)
        new_name = re.sub(r"_(\d)", r"\1", new_name)
        new_name = ''.join(word.capitalize() for word in new_name.split('_'))
        return new_name
    
    name_mapping = {old_name: new_column_name(old_name) for old_name in df.columns}
    logger.info(f"Columns mapping after rename: {name_mapping}")
    
    df = df.selectExpr(["{} as {}".format(old, new) for old, new in name_mapping.items()])
    
    print("Schema en rename_columns:")
    df.printSchema()
    
    logger.info(f"rename_columns executed. Columns reanamed")
    return df

Calculation started (calculation_id=e2c5a984-086b-3a9c-8dcc-1e7db5adb6a5) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [27]:
#  eliminar filas con valores nulos
def drop_null_rows(df: DataFrame) -> DataFrame:
    
    df.na.drop()
    logger.info(f"drop_null_rows executed")
    return df

def drop_duplicate_rows(df: DataFrame) -> DataFrame:
    
    df.dropDuplicates()
    logger.info(f"drop_duplicate_rows executed")
    return df

Calculation started (calculation_id=08c5a984-0998-b505-9dd5-a0986138cd61) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [28]:
# seleccionar solo las columnas que estánn en curated_col_names
def select_columns(df: DataFrame, curated_col_names: list) -> DataFrame:
    try:

        selected_columns = [col_name for col_name in df.columns if col_name in curated_col_names]
        
        if not selected_columns:
            raise ValueError("No matching columns found!")
        
        df_filtered = df.select(*selected_columns)
        
        print("Schema en select_columns:")
        df.printSchema()
    
        logger.info(f"select_columns executed")
        return df_filtered
    
    except Exception as e:
        logger.error(f"Error in select_columns: {str(e)}")
        raise

Calculation started (calculation_id=54c5a984-0ad0-b49e-1157-9bab8d45eaf6) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [29]:
def filter_data(df: DataFrame, filters: list) -> DataFrame:
    try:
        
        if not filters:
            logger.info("No filters applied")
            return df
        
        for condition in filters:
            df = df.filter(condition)
        
        print("Schema en filter_data:")
        df.printSchema()
        
        logger.info("Data filtered")
        return df
    except Exception as e:
        logger.error(f"Error filtering data: {str(e)}")
        raise

Calculation started (calculation_id=aec5a984-0c4f-fc9b-2931-88d203de075f) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [30]:
def clean_data(df: DataFrame) -> DataFrame:
    try:
        chars = "áéíóúñ"
        replace = "aeioun"
        
        string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
        
        all_cols = [
            translate(col(col_name), chars, replace).alias(col_name) if col_name in string_cols else col(col_name)
            for col_name in df.columns
        ]
        
        df = df.select(*all_cols)
        
        print("Schema en clean_data:")
        df.printSchema()
        
        logger.info("Data cleaned")
        return df
    except Exception as e:
        logger.error(f"Error cleaning data: {str(e)}")
        raise

Calculation started (calculation_id=a0c5a984-0da8-adf0-9090-71f4f82fda67) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [31]:
def add_metadata_columns(df: DataFrame) -> DataFrame:
    try:

        columns_to_hash = [df[col] for col in df.columns]
        df = df.withColumn("hash_key", sha2(concat_ws("|", *columns_to_hash), 256))
        
        df = df.withColumn("processing_timestamp", current_timestamp())
        
        logger.info("Metadata columns added")
        return df
    except Exception as e:
        logger.error(f"Error adding metadata columns: {str(e)}")
        raise

Calculation started (calculation_id=d4c5a984-0ed8-9fff-329d-e33770773ec2) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


### 3. Carga de datos en 'Curated' en formato 'Delta'

In [32]:
from pyspark.sql import DataFrame
def load_data_curated(df: DataFrame, load_path: str, partition_field: str = "noPartition", primary_key: str = None) -> None:
    try:
        write_options = {
            "overwriteSchema": "true",
        }
        if primary_key is not None:
            write_options["primaryKey"] = primary_key
        
        if partition_field != "noPartition":
            df.coalesce(5) \
                .write \
                .format("delta") \
                .mode("overwrite") \
                .partitionBy(partition_field) \
                .save(load_path)
        else:
            df.coalesce(5) \
                .write \
                .format("delta") \
                .mode("overwrite") \
                .save(load_path)
        
        logger.info(f"Data loaded to {load_path}")
        
    except Exception as e:
        logger.error(f"Error loading data to {load_path}: {str(e)}")
        raise

Calculation started (calculation_id=6ec5a984-1012-1640-fad5-ac74d6df337f) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


### 4. Fichero/Objeto JSON de configuración de mapeo de entidades

In [58]:
# Configuración de entidades: tablas y columnas

table_config = {
    "raw_msf_campaign": {
        "curated_name": "Campaign",
        "curated_columns": [
            "Id",
            "Campaigndonationreporting",
            "Campaignentryreporting",
            "Isemergency",
            "Isonline",
            "Objective",
            "Objectivepublic",
            "Segment",
            "Status"
        ],
        "partition_key": "Segment",
        "primary_key": "Id",
        "filters": [
            ("Segment <> ''")
        ]
    },
    "raw_msf_contact": {
        "curated_name": "Contact",
        "curated_columns": [
            "Id",
            "Ltvscore",
            "Program",
            "Programaherencias",
            "Programais",
            "Legacyconfidentiality",
            "Membertype"
        ],
        "partition_key": "Membertype",
        "primary_key": "Id",
        "filters": [
            ("Programaherencias IS NOT NULL and Program <> ''")
        ]
    },
    "raw_msf_opportunity": {
        "curated_name": "Opportunity",
        "curated_columns": [
            "Id",
            "Rating",
            "Campaignid",
            "PrimaryContact",
            "RecurringDonation",
            "Stagename",
            "Program",
            "Type",
            "Typefundraisingcontribution"
        ],
        "partition_key": "Type",
        "primary_key": "Id",

        "filters": [
            ("Type IN ('Herencia', 'Pago de Herencia', 'Recurrent Donation', 'Membership')")
        ]
    },
    "raw_msf_quotamodification": {
        "curated_name": "QuotaModification",
        "curated_columns": [
            "Id",
            "Isdeleted",
            "Recurringdonation",
            "Campaigninfluence",
            "Changeamount",
            "Changeannualizedquota",
            "Changetype",
            "Newamount",
            "Newannualizedquota",
            "Newrecurringperiod",
            "Contactid",
            "Changedate"
        ],
        "partition_key": "Changetype",
        "primary_key": "Id",
        "filters": [
            ("Isdeleted = 'false' and Changetype IS NOT NULL and Changetype <> ''")
        ]
    },
    "raw_msf_recurringdonation": {
        "curated_name": "RecurringDonation",
        "curated_columns": [
            "Id",
            "Isdeleted",
            "Annualizedquota",
            "Cancelationdate",
            "Currentcampaign",
            "Amount",
            "Contact",
            "InstallmentPeriod",
            "PaidAmount",
            "TotalPaidInstallments"
        ],
        "partition_key": "InstallmentPeriod",
        "primary_key": "Id",
        "filters": [
            ("Isdeleted = 'false' and InstallmentPeriod IS NOT NULL and InstallmentPeriod <> ''")
        ]
    },
    "raw_msf_task": {
        "curated_name": "Task",
        "curated_columns": [
            "Id",
            "Activitydate",
            "Campaign",
            "Closetype",
            "Inboundoutbound",
            "Objective",
            "Productprogram",
            "Startdate",
            "Thematic",
            "Typetemplate",
            "Ownerid",
            "Status",
            "Subject",
            "Whoid",
            "Whatid"
        ],
        "partition_key": "Thematic",
        "primary_key": "Id",
        "filters": [
            ("Thematic IS NOT NULL and Campaign is NOT NULL and YEAR(startdate)=2023")
        ]
    }
}

Calculation started (calculation_id=dac5a98e-59c0-1fac-2c93-dfa4ea7bd0fc) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


### Definición de la ETL

In [34]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run_etl(table_config: dict):
    """
    Ejecuta el pipeline ETL para cada tabla en table_config.
    table_config: Diccionario con configuración de las tablas.
    """
    for raw_table_name, config in table_config.items():
        try:
            # Extract
            df = extract_data(raw_table_name)
            print(f'Table name:{raw_table_name}')
            df.printSchema()
            
            # Transformaciones
            df_transformed = (df
                        .transform(rename_columns)
                        .transform(drop_null_rows)
                        .transform(drop_duplicate_rows)
                        .transform(lambda df: df.select(*[col for col in df.columns if col in config["curated_columns"]]))
                        .transform(lambda df: filter_data(df, config.get("filters", [])))
                        .transform(clean_data)
                        .transform(add_metadata_columns)
                       )
            df_transformed.printSchema()
         
             # Load
            load_path = f"s3://curated-msf-mbit/{config['curated_name']}"
            partition_key = config.get("partition_key", "noPartition")
            primary_key = config.get("primary_key")
        
            load_data_curated(df_transformed, load_path, partition_key, primary_key)
        
        except Exception as e:
            logger.error(f"Error processing table {raw_table_name}: {str(e)}")

Calculation started (calculation_id=32c5a984-12a5-6aef-079e-530a3625405d) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


### Ejecuta ETL

#### Antes de escribir, configuramos del n�mero m�ximo de registros escritos en un solo archivo.

In [2]:
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000000)

In [57]:
run_etl(table_config)

Calculation started (calculation_id=38c5a98d-b97e-3d85-e4c9-a2ec6e77597e) in (session=5ac5a970-2cf4-cf60-51d3-9b8a18b370f8). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
Table name:raw_msf_recurringdonation
root
 |-- id: string (nullable = true)
 |-- isdeleted: boolean (nullable = true)
 |-- msf_annualizedquota__c: double (nullable = true)
 |-- msf_cancelationdate__c: date (nullable = true)
 |-- msf_cancelationreason__c: string (nullable = true)
 |-- msf_currentcampaign__c: string (nullable = true)
 |-- msf_currentleadsource1__c: string (nullable = true)
 |-- msf_currentquotamodification__c: string (nullable = true)
 |-- msf_leadsource1__c: string (nullable = true)
 |-- msf_memberid__c: string (nullable = true)
 |-- npe03__amount__c: double (nullable = true)
 |-- npe03__contact__c: string (nullable = true)
 |-- npe03__date_established__c: date (nullable = true)
 |-- npe03__installment_period__c: string (nullable = true)
 |-- npe03__last_payment_date__c: date (nullable = true)
 |-- npe03__next_payment_date__c: date (nullable = true)
 |-- npe03__open_ended_status__c: string (nullable = true)
 |-- npe03__paid_amount__c: double (null

In [3]:
# Nos apoyamos en este tipo de funciones de DELTA LAKE para optimiar las tablas generadas 
spark.sql("OPTIMIZE Opportunity")