In [None]:
spark

In [None]:
sc

Importamos librerías necesarias de spark

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, countDistinct, regexp_replace, split, trim, count, when
from pyspark.sql.types import DoubleType, DateType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

import re

Iniciamos sesión de spark

In [49]:
spark = SparkSession.builder.appName("PMOProjectDataPrep").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Obtenemos los datos desde la capa raw del lake en S3

In [50]:
df = spark.read.csv("s3://raw-proyecto-integrador/pmo_data/PMO_data.csv", header=True, inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+--------------------+--------------------+----------+----------+--------------------+----------+--------+------+----------------+----------------+-------------------+--------+--------------------+-------------------+---------------+--------------------+-------------+----------------+---------------+--------------------+-----------------------+------------------------+-------------+----------+--------------+--------------------+-----------------+----------+--------------------+--------------------+---------------+---------+------------+-------------+-------------+-------------+-----------+----------------+-----------+--------------+--------------------+---------------+---------------------+----------------+--------------+---------+-----------------+----------+---------------+-----------+------------+---------+
|_c0|          Project ID|      Reporting Name|        Project Name| createdon|   Updated|               Phase|     State|Duration|Effort|Effort Complete

Ajustamos los tipos de datos de las columnas (conocemos a priori los tipos de datos, en el caso de los datos numéricos, hacemos algunos cambios a los separadores de decimales para que todos tengan el mismo formato)

In [52]:
date_columns = ['createdon', 'Updated', 'Task Start Date','Training start','Contract date','1st Go Live']
cols_str_float = ['Num_Risks','Avg_Risk_Exposure','Num_Issues','Avg_Issue_Score','Num_Changes',
                  'Change_Score','Num_Tasks','Total Go Lives','Total Headcount','VOC Score','VOC Score IT',
                  'proj_prioritizationscore','Progress','Duration', 'Effort', 'Effort Completed', 'Effort Remaining']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
for col_name in cols_str_float:
    df = df.withColumn(col_name, regexp_replace(col(col_name), ",", ".").cast(DoubleType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
for col_name in date_columns:
    df = df.withColumn(col_name, col(col_name).cast(DateType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Filtramos el dataset para obtener solo los proyectos cerrados

In [55]:
df = df.filter(col("State") == "(3) Closed")
df = df.drop("State")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Verificamos los tipos de datos de las columnas

In [56]:
def get_column_types(df):
    schema = df.dtypes
    dict_types = {'string': [], 'date': [], 'numeric': []}
    for col_name, dtype in schema:
        if dtype in ['string']:
            dict_types['string'].append(col_name)
        elif dtype in ['date']:
            dict_types['date'].append(col_name)
        else:
            dict_types['numeric'].append(col_name)
    return dict_types

column_types = get_column_types(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
column_types

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'string': ['Project ID', 'Reporting Name', 'Project Name', 'Phase', 'Project Manager ID', 'Project Manager', 'Business unit', 'Project Priority', 'Portfolio', 'proj_issuehealthname', 'proj_schedulehealthname', 'ProjectTrend', 'Category', 'Competency', 'Client', 'Work Type', 'EHO Completed', 'Contract type', 'IMP Project Type', 'Country', 'Site', 'IT Project Type', 'Annual Contract Value', 'Line of Business', 'Overall Health'], 'date': ['createdon', 'Updated', 'Task Start Date', 'Training start', 'Contract date', '1st Go Live'], 'numeric': ['_c0', 'Duration', 'Effort', 'Effort Completed', 'Effort Remaining', 'Finish', 'Progress', 'Start', 'proj_prioritizationscore', 'Commercial', 'Training Billable', 'Total Headcount', 'VOC Score', 'VOC Score IT', 'Total Go Lives', 'Num_Risks', 'Avg_Risk_Exposure', 'Num_Issues', 'Avg_Issue_Score', 'Num_Changes', 'Change_Score', 'Num_Tasks']}

Hacemos un resúmen de los datos

In [58]:
df.summary().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------------------+------------------+-----------------+-------------------+--------------------+----------------+-------------+----------------+-----------------+--------------------+-----------------------+------------------------+-------------+--------+----------+--------------------+-------------------+------------------+------------------+------------------+-------------+-------------+--------------------+-------+------------------+-----+----------------+---------------------+------------------+--------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-----------------+
|summary|               _c0|          Project ID|      Reporting Name|        Project Name|               Phase|         Duration|            Effort|  Effort Completed| Effort Remaining|           Progress|  Projec

Observamos que hay un registro donde la característica Project Priority tiene un nulo, eliminamos esta fila  puesto que según el conocedor del negocio es un error

In [59]:
df.groupBy("Project Priority").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-----+
|Project Priority|count|
+----------------+-----+
|            NULL|    1|
|    (1) Critical|   16|
|    (3) Moderate| 1989|
|         (4) Low|   24|
|        (2) High|   88|
+----------------+-----+

In [60]:
df = df.filter(col("Project Priority").isNotNull())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Mapeamos la columna Project Priority a una columna numérica (conservando la relación ordinal)




*   (1) Critica - 1
*   (2) High - 2
*   (3) Moderate - 3
*   (4) Low - 4


In [61]:
df = df.withColumn("Project Priority", split(regexp_replace(col("Project Priority"), "[()]", ""), " ").getItem(0).cast("int"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Verificamos que el mapeo sea exitoso y revisamos qué cantidad de proyectos hay por prioridad

In [62]:
df.groupBy("Project Priority").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-----+
|Project Priority|count|
+----------------+-----+
|               1|   16|
|               2|   88|
|               3| 1989|
|               4|   24|
+----------------+-----+

Revisamos el porcentaje de nulos de cada columna

In [63]:
total_rows = df.count()
null_percentages = df.select([
    (count(when(col(c).isNull(), c)) / total_rows).alias(c)
    for c in df.columns
])

nulls_df = null_percentages.selectExpr(
    f"stack({len(df.columns)}, " +
    ", ".join([f"'{c}', `{c}`" for c in df.columns]) +
    ") as (column_name, null_percentage)"
)
nulls_df.sort(col("null_percentage").desc()).show(n=100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+
|         column_name|     null_percentage|
+--------------------+--------------------+
|        VOC Score IT|   0.971185640056684|
|           VOC Score|   0.958904109589041|
|Annual Contract V...|   0.870099196976854|
|     IT Project Type|  0.6457250826641474|
|       Contract date|  0.6287198866320265|
|          Competency|   0.627302786962683|
|           Work Type|  0.6268304204062353|
|            Category|  0.6254133207368918|
|       Contract type|  0.6244685876239963|
|         1st Go Live|  0.6169107227208314|
|      Total Go Lives|  0.6159659896079358|
|      Training start|  0.6136041568256967|
|    IMP Project Type|  0.5989607935758149|
|    Line of Business|  0.4718941898913557|
|              Client| 0.37600377893245157|
|      Reporting Name| 0.13320736891828058|
|        ProjectTrend| 0.03542749173358526|
|                Site| 0.01983939537080775|
|             Country|0.010392064241851677|
|              Finish|0.00614076

Prescindimos de las siguientes columnas:


*   VOC Score IT
*   VOC Score
*   Annual Contract V
*   IT Project Type
*   Contract date
*   Competency
*   Work Type
*   Category
*   Contract type
*   1st Go Live
*   Total Go Lives
*   Training start
*   IMP Project Type

Las razones para no tenerlas en cuenta es que según el conocedor del negocio, no suelen llenarse correctamente y están compuestas por nulos en más del 50% de los casos.


In [64]:
nulls_filtered = nulls_df.where(nulls_df["Null_Percentage"] > 0.5)
nulls_filtered.sort(col("Null_Percentage").desc()).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+
|         column_name|   null_percentage|
+--------------------+------------------+
|        VOC Score IT| 0.971185640056684|
|           VOC Score| 0.958904109589041|
|Annual Contract V...| 0.870099196976854|
|     IT Project Type|0.6457250826641474|
|       Contract date|0.6287198866320265|
|          Competency| 0.627302786962683|
|           Work Type|0.6268304204062353|
|            Category|0.6254133207368918|
|       Contract type|0.6244685876239963|
|         1st Go Live|0.6169107227208314|
|      Total Go Lives|0.6159659896079358|
|      Training start|0.6136041568256967|
|    IMP Project Type|0.5989607935758149|
+--------------------+------------------+

In [65]:
columns_to_drop = [row['column_name'] for row in nulls_filtered.select('column_name').collect()]
columns_to_drop

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Training start', 'Category', 'Competency', 'Work Type', 'VOC Score', 'VOC Score IT', 'Contract date', 'Contract type', '1st Go Live', 'IMP Project Type', 'Total Go Lives', 'IT Project Type', 'Annual Contract Value']

In [66]:
df_cleaned = df.drop(*columns_to_drop)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Renombramos las columnas (en formato snake case) para que no contengan espacios ni combinación de mayúsculas y minúsculas

In [67]:
def to_snake_case(name):
    name = re.sub(r'[^0-9a-zA-Z]+', '_', name)
    name = re.sub(r'([a-z])([A-Z])', r'\1_\2', name)
    return name.lower().strip('_')
new_column_names = {col_name: to_snake_case(col_name) for col_name in df.columns}

for old_name, new_name in new_column_names.items():
    df_cleaned = df_cleaned.withColumnRenamed(old_name, new_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
df_cleaned.describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+------------------+------------------+-----------------+-------------------+--------------------+----------------+-------------+-------------------+-----------------+--------------------+-----------------------+------------------------+-------------+--------------------+------------------+-------------+-------+-----+------------------+--------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-----------------+
|summary|               c0|          project_id|      reporting_name|        project_name|               phase|         duration|            effort|  effort_completed| effort_remaining|           progress|  project_manager_id| project_manager|business_unit|   project_priority|        portfolio|proj_issuehealthname|proj_schedulehealthname|proj_prioritizationscore|project_trend|    

Revisamos emdidas de tendencia para las variables numéricas, no inputamos datos en este paso puesto que más adelante en el pipeline partiremos los datos en train y test y no queremos contaminar los datos de train inputando con medidas calculadas con todo el dataset

In [69]:
numeric_cols = [f.name for f in df_cleaned.schema.fields if str(f.dataType) in ['DoubleType()', 'IntegerType()', 'FloatType()', 'LongType()', 'ShortType()']]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
numeric_cols

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['c0', 'duration', 'effort', 'effort_completed', 'effort_remaining', 'progress', 'project_priority', 'proj_prioritizationscore', 'total_headcount', 'num_risks', 'avg_risk_exposure', 'num_issues', 'avg_issue_score', 'num_changes', 'change_score', 'num_tasks']

In [71]:
df_cleaned.select(*numeric_cols).describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+-----------------+------------------+------------------+-----------------+-------------------+-------------------+------------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-----------------+
|summary|               c0|         duration|            effort|  effort_completed| effort_remaining|           progress|   project_priority|proj_prioritizationscore|   total_headcount|         num_risks| avg_risk_exposure|        num_issues|    avg_issue_score|        num_changes|       change_score|        num_tasks|
+-------+-----------------+-----------------+------------------+------------------+-----------------+-------------------+-------------------+------------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-----------------+
|  count|             2117|          

In [72]:
s3_output_path = "s3://trusted-proyecto-integrador/pmo_data"
df_cleaned.write.mode("overwrite").parquet(s3_output_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…