# Librerias

## Importaciones de librerias

In [None]:
#PySpark & Transformación
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, DecimalType, DoubleType
from pyspark.ml.feature import Imputer, StringIndexer
from pyspark.sql.functions import col, when, avg, stddev, min, max, round, corr
import numpy as np
from pyspark.sql.functions import skewness
from pyspark.sql.functions import col, when, count

#KNN Imputer
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import KNNImputer

# Inicializacion

## Inicializacion de la sesion de Spark

In [None]:
# 1. Crear sesión de Spark
spark = SparkSession.builder \
    .appName("ETL Social Media vs Productivity") \
    .getOrCreate()

## Dar permisos de drive

# Ingesta de dataset

In [None]:
dfProductivity = spark.read.csv("social_media_vs_productivity.csv", header=True, inferSchema=True)

## Mirar el tamaño del dataset

In [None]:
# Obtener el número de filas
numero_filas = dfProductivity.count()

# Obtener el número de columnas
num_columnas = len(dfProductivity.columns)

# Imprimir el número de filas y columnas
print(f"Numero de filas: {numero_filas}")
print(f"Numero de columnas: {num_columnas}")

Numero de filas: 30000
Numero de columnas: 19


## Mostrar el esquema del dataframe

In [None]:
dfProductivity.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- daily_social_media_time: double (nullable = true)
 |-- social_platform_preference: string (nullable = true)
 |-- number_of_notifications: integer (nullable = true)
 |-- work_hours_per_day: double (nullable = true)
 |-- perceived_productivity_score: double (nullable = true)
 |-- actual_productivity_score: double (nullable = true)
 |-- stress_level: double (nullable = true)
 |-- sleep_hours: double (nullable = true)
 |-- screen_time_before_sleep: double (nullable = true)
 |-- breaks_during_work: integer (nullable = true)
 |-- uses_focus_apps: boolean (nullable = true)
 |-- has_digital_wellbeing_enabled: boolean (nullable = true)
 |-- coffee_consumption_per_day: integer (nullable = true)
 |-- days_feeling_burnout_per_month: integer (nullable = true)
 |-- weekly_offline_hours: double (nullable = true)
 |-- job_satisfaction_score: double (nullable = true)



## Mostrar las primeras 5 filas del dataframe

In [None]:
dfProductivity.show(5)

+---+------+----------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+-----------------+------------------------+------------------+---------------+-----------------------------+--------------------------+------------------------------+--------------------+----------------------+
|age|gender|  job_type|daily_social_media_time|social_platform_preference|number_of_notifications|work_hours_per_day|perceived_productivity_score|actual_productivity_score|stress_level|      sleep_hours|screen_time_before_sleep|breaks_during_work|uses_focus_apps|has_digital_wellbeing_enabled|coffee_consumption_per_day|days_feeling_burnout_per_month|weekly_offline_hours|job_satisfaction_score|
+---+------+----------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+-----------------+-----

# Conversion de tipos

## Convertir columnas numéricas a tipo adecuado


In [None]:
numeric_column_to_float = ['daily_social_media_time', 'work_hours_per_day', 'perceived_productivity_score',
                      'actual_productivity_score', 'stress_level', 'sleep_hours','screen_time_before_sleep',
                            'weekly_offline_hours', 'job_satisfaction_score']

numeric_columns_to_int8 = ['age', 'number_of_notifications', 'breaks_during_work', 'coffee_consumption_per_day',
                           'days_feeling_burnout_per_month']

for col_name in numeric_column_to_float:
    dfProductivity = dfProductivity.withColumn(col_name, col(col_name).cast(DecimalType(4, 2)))

for col_name in numeric_columns_to_int8:
    dfProductivity = dfProductivity.withColumn(col_name, col(col_name).cast(IntegerType()))

# Convertir columnas booleanas
bool_columns = ["uses_focus_apps", "has_digital_wellbeing_enabled"]
for col_name in bool_columns:
    dfProductivity = dfProductivity.withColumn(
        col_name,
        when(col(col_name) == "True", 1)
        .when(col(col_name) == "False", 0)
        .otherwise(None)
)

#Tratado de nulos

## Mostrar numero de nulos

In [None]:
null_counts = dfProductivity.select([count(when(col(c).isNull(), c)).alias(c) for c in dfProductivity.columns])
print("Valores nulos por columna:")
null_counts.show(truncate=False)

Valores nulos por columna:
+---+------+--------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+-----------+------------------------+------------------+---------------+-----------------------------+--------------------------+------------------------------+--------------------+----------------------+
|age|gender|job_type|daily_social_media_time|social_platform_preference|number_of_notifications|work_hours_per_day|perceived_productivity_score|actual_productivity_score|stress_level|sleep_hours|screen_time_before_sleep|breaks_during_work|uses_focus_apps|has_digital_wellbeing_enabled|coffee_consumption_per_day|days_feeling_burnout_per_month|weekly_offline_hours|job_satisfaction_score|
+---+------+--------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+-----------+--

A pesar de que la principal herramienta de transformación de este proyecto sea PySpark, nos pareció una idea muy interesante poder comparar dos datasets distintos:


*   Imputación por KNN (vecinos cercanos)
*   Impuación estadística por media/mediana

De esta forma, podremos comparar más adelante en el modelo predictivo cuál dá un mejor resultado.



##KNN Imputer

In [None]:
# 1. Copia del dataset (convertir a Pandas DataFrame)
dataset_KNN = dfProductivity.toPandas()

# 2. Codificar columnas categóricas
cat_columns = ['gender', 'social_platform_preference', 'uses_focus_apps']
# Ensure 'uses_focus_apps' is treated as categorical in pandas
dataset_KNN['uses_focus_apps'] = dataset_KNN['uses_focus_apps'].astype(str)

encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
encoded_cats = pd.DataFrame(encoder.fit_transform(dataset_KNN[cat_columns]),
                           columns=encoder.get_feature_names_out(cat_columns),
                           index=dataset_KNN.index) # Keep original index for merging

# 3. Combinar columnas numéricas y codificadas
num_columns = ['daily_social_media_time', 'job_satisfaction_score', 'sleep_hours',
               'actual_productivity_score', 'screen_time_before_sleep', 'stress_level', 'perceived_productivity_score']

# Ensure numeric columns are of a suitable type for KNNImputer
for col in num_columns:
    dataset_KNN[col] = pd.to_numeric(dataset_KNN[col], errors='coerce')

data_for_imputation = pd.concat([dataset_KNN[num_columns], encoded_cats], axis=1)

# 4. Aplicar KNNImputer
imputer = KNNImputer(n_neighbors=3, weights='uniform')
imputed_data = imputer.fit_transform(data_for_imputation)

# 5. Reconstruir dataset
# Assign imputed data back to the relevant columns in the pandas DataFrame
dataset_KNN[num_columns] = imputed_data[:, :len(num_columns)]

# For categorical columns, fill remaining NaNs if any (after one-hot encoding, there shouldn't be in the encoded part, but the original categorical columns might have NaNs)
# Use the mode from the *original* PySpark DataFrame to avoid issues with potential changes during the toPandas conversion
for col in cat_columns:
    # Find mode from the original PySpark DataFrame
    mode_value = dfProductivity.groupBy(col).count().orderBy("count", ascending=False).collect()[0][col]
    dataset_KNN[col] = dataset_KNN[col].fillna(mode_value)


# 6. Mostrar tabla de calidad
tabla_calidad = pd.DataFrame({
    'Cantidad Nulos': dataset_KNN.isnull().sum(),
    'Porcentaje Nulos (%)': (dataset_KNN.isnull().sum() / len(dataset_KNN)) * 100
})
print("\nTabla de calidad del dato:")
print(tabla_calidad)


Tabla de calidad del dato:
                                Cantidad Nulos  Porcentaje Nulos (%)
age                                          0                   0.0
gender                                       0                   0.0
job_type                                     0                   0.0
daily_social_media_time                      0                   0.0
social_platform_preference                   0                   0.0
number_of_notifications                      0                   0.0
work_hours_per_day                           0                   0.0
perceived_productivity_score                 0                   0.0
actual_productivity_score                    0                   0.0
stress_level                                 0                   0.0
sleep_hours                                  0                   0.0
screen_time_before_sleep                     0                   0.0
breaks_during_work                           0                   0.0
uses_f

## Mirar distribucion de datos de las columnas para imputar

In [None]:
numeric_columns = [
        "age", "daily_social_media_time", "work_hours_per_day", "perceived_productivity_score",
        "actual_productivity_score", "stress_level", "sleep_hours", "screen_time_before_sleep",
        "breaks_during_work", "coffee_consumption_per_day", "days_feeling_burnout_per_month",
        "weekly_offline_hours", "job_satisfaction_score"
    ]

#EL skewness: indica si los datos están distribuidos simétricamente
#(como una distribución normal) o si tienen un sesgo (skewed) hacia un lado
print("Asimetría (skewness) de las columnas numéricas:")
for col_name in numeric_columns:
    skew_value = dfProductivity.select(skewness(col_name).alias("skewness")).collect()[0]["skewness"]
    print(f"{col_name}: {skew_value:.3f}")

Asimetría (skewness) de las columnas numéricas:
age: 0.015
daily_social_media_time: 1.191
work_hours_per_day: -0.038
perceived_productivity_score: -0.011
actual_productivity_score: -0.007
stress_level: 0.001
sleep_hours: 0.004
screen_time_before_sleep: 0.304
breaks_during_work: 0.007
coffee_consumption_per_day: 0.675
days_feeling_burnout_per_month: -0.008
weekly_offline_hours: 0.423
job_satisfaction_score: 0.009


## Imputación media


Trato de los nulos en columnas numericas

In [None]:
#En este caso, el skewness ayuda a decidir si imputar valores faltantes con la media
#(para distribuciones simétricas, skewness ≈ 0) o la mediana
#(para distribuciones sesgadas, |skewness| > 0.3 o 1)


# Columnas para imputar con mediana (skewness > 0.3)
median_columns = [
    "daily_social_media_time", "screen_time_before_sleep"
]

# Columnas para imputar con media (skewness ≤ 0.3)
mean_columns = [
    "perceived_productivity_score","actual_productivity_score", "stress_level",
    "sleep_hours","job_satisfaction_score"
]

# Imputar con mediana
for col_name in median_columns:
    # El approxQuantile calcula cuantiles aproximados de una columna de forma distribuida
    # El 0.5 indica que queremos el percentil 50, que corresponde a la mediana
    # El 0.05 es el parámetro de error relativo (relative error), que controla la precisión de la aproximación.
    median_value = dfProductivity.approxQuantile(col_name, [0.5], 0.05)[0]
    if median_value is not None:
        dfProductivity = dfProductivity.na.fill({col_name: median_value})
        print(f"Imputado {col_name} con mediana: {median_value:.2f}")

# Imputar con media
for col_name in mean_columns:
    # Aplica la función de agregación avg, que calcula la media, sobre la columna.
    # Hace una coleccion de esos valores y mira el valor columna a columna
    mean_value = dfProductivity.select(col_name).agg({col_name: "avg"}).collect()[0][f"avg({col_name})"]
    if mean_value is not None:
        dfProductivity = dfProductivity.na.fill({col_name: float(mean_value)})
        print(f"Imputado {col_name} con media: {mean_value:.2f}")

Imputado daily_social_media_time con mediana: 3.00
Imputado screen_time_before_sleep con mediana: 1.00
Imputado perceived_productivity_score con media: 5.51
Imputado actual_productivity_score con media: 4.95
Imputado stress_level con media: 5.51
Imputado sleep_hours con media: 6.50
Imputado job_satisfaction_score con media: 4.96


In [None]:
from pyspark.sql.functions import col, when, count

print("\nValores nulos después de imputación:")
null_counts_after_imputation = dfProductivity.select([count(when(col(c).isNull(), c)).alias(c) for c in dfProductivity.columns])
null_counts_after_imputation.show(truncate=False)


Valores nulos después de imputación:
+---+------+--------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+-----------+------------------------+------------------+---------------+-----------------------------+--------------------------+------------------------------+--------------------+----------------------+
|age|gender|job_type|daily_social_media_time|social_platform_preference|number_of_notifications|work_hours_per_day|perceived_productivity_score|actual_productivity_score|stress_level|sleep_hours|screen_time_before_sleep|breaks_during_work|uses_focus_apps|has_digital_wellbeing_enabled|coffee_consumption_per_day|days_feeling_burnout_per_month|weekly_offline_hours|job_satisfaction_score|
+---+------+--------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+---

## Validez Categorica

Mirar los valores unicos en las columnas categoricas

In [None]:
# Lista de columnas categóricas
categorical_columns = ["gender", "job_type", "social_platform_preference"]

# 1. Inspeccionar valores únicos
print("### Inspección de valores únicos ###")
for col_name in categorical_columns:
    print(f"\nValores únicos en {col_name}:")
    unique_values = dfProductivity.select(col_name).distinct().na.drop().collect()
    unique_values = [row[col_name] for row in unique_values]
    print(unique_values)

### Inspección de valores únicos ###

Valores únicos en gender:
['Female', 'Other', 'Male']

Valores únicos en job_type:
['Education', 'Student', 'Finance', 'Health', 'IT', 'Unemployed']

Valores únicos en social_platform_preference:
['TikTok', 'Instagram', 'Twitter', 'Telegram', 'Facebook']


In [None]:
# Lista de valores esperados
valid_gender = {'Female', 'Other', 'Male'}
valid_job_type = {'Education', 'Student', 'Finance', 'Health', 'IT', 'Unemployed'}
valid_social_platform_preference = {'TikTok', 'Instagram', 'Twitter', 'Telegram', 'Facebook'}

# Re-import col to ensure it's the function
from pyspark.sql.functions import col

# Validar que todos los valores estén dentro de los valores esperados
# Guarda en df_invalid si hay algún género que no este en valid_gender...
dfProductivity_invalid = dfProductivity.filter(
    (~col("gender").isin(*valid_gender)) |
    (~col("job_type").isin(*valid_job_type)) |
    (~col("social_platform_preference").isin(*valid_social_platform_preference))
)

# Mostrar registros inválidos (si hay)
if dfProductivity_invalid.count() > 0:
    dfProductivity_invalid.show()
    print("Se encontraron valores no válidos en las columnas.")
else:
    print("Todos los valores son válidos.")

Todos los valores son válidos.


# Pruebas

## Validez Numérica

In [None]:
errores = []

# Validez Numérica (scores 0-10 y stress 1-10)
print("\nValidando rangos de métricas...")

score_columns = ['perceived_productivity_score', 'actual_productivity_score', 'job_satisfaction_score']
for col_name in score_columns:
    fuera_rango = dfProductivity.filter((col(col_name) < 0) | (col(col_name) > 10)).count()
    if fuera_rango > 0:
        errores.append(f" FALLO - {col_name}: {fuera_rango} valores fuera del rango 0-10")
        print(f"{col_name}: {fuera_rango} valores fuera del rango 0-10")
    else:
        print(f"{col_name}: todos los valores en rango 0-10")

# Estrés: 1-10
fuera_rango = dfProductivity.filter((col('stress_level') < 1) | (col('stress_level') > 10)).count()
if fuera_rango > 0:
    errores.append(f" FALLO - stress_level: {fuera_rango} valores fuera del rango 1-10")
    print(f"stress_level: {fuera_rango} valores fuera del rango 1-10")
else:
    print(f"stress_level: todos los valores en rango 1-10")


Validando rangos de métricas...
perceived_productivity_score: todos los valores en rango 0-10
actual_productivity_score: todos los valores en rango 0-10
job_satisfaction_score: todos los valores en rango 0-10
stress_level: todos los valores en rango 1-10


## Coherencia Temporal

In [None]:
print("\nValidando coherencia temporal...")

# Social media time ≤ 24h
fuera_rango = dfProductivity.filter(col('daily_social_media_time') > 1440).count()
if fuera_rango > 0:
    errores.append(f" FALLO - daily_social_media_time: {fuera_rango} valores > 24h")
    print(f"❌ daily_social_media_time: {fuera_rango} valores > 24h")
else:
    print(f"✅ daily_social_media_time: todos los valores ≤ 24h")

# Horas de trabajo ≤ 24h
fuera_rango = dfProductivity.filter(col('work_hours_per_day') > 24).count()
if fuera_rango > 0:
    errores.append(f"FALLO - work_hours_per_day: {fuera_rango} valores > 24h")
    print(f"❌ work_hours_per_day: {fuera_rango} valores > 24h")
else:
    print(f"✅ work_hours_per_day: todos los valores ≤ 24h")

# Horas de sueño ≤ 24h
fuera_rango = dfProductivity.filter(col('sleep_hours') > 24).count()
if fuera_rango > 0:
    errores.append(f"FALLO - sleep_hours: {fuera_rango} valores > 24h")
    print(f"❌ sleep_hours: {fuera_rango} valores > 24h")
else:
    print(f"✅ sleep_hours: todos los valores ≤ 24h")


Validando coherencia temporal...
✅ daily_social_media_time: todos los valores ≤ 24h
✅ work_hours_per_day: todos los valores ≤ 24h
✅ sleep_hours: todos los valores ≤ 24h


## Rangos Contextuales

In [None]:
print("\nValidando rangos contextuales (edad 16-80) con tolerancia del 2% a outliers...")

total_filas = dfProductivity.count()

fuera_rango_count = dfProductivity.filter((col('age') < 16) | (col('age') > 80)).count()

# Calculo del porcenaje de outliers
porcentaje_fuera_rango = (fuera_rango_count / total_filas) * 100 if total_filas > 0 else 0

# Porcentaje de outliers aceptado
outlier_threshold_percent = 2.0

if porcentaje_fuera_rango > outlier_threshold_percent:
    errores.append(f" FALLO - age: {fuera_rango_count} valores ({porcentaje_fuera_rango:.2f}%) fuera del rango 16-80 (excede el {outlier_threshold_percent}%)")
    print(f"   ❌ FALLO - age: {fuera_rango_count} valores ({porcentaje_fuera_rango:.2f}%) fuera del rango 16-80 (excede el {outlier_threshold_percent}%)")
else:
    print(f"   ✅ age: {fuera_rango_count} valores ({porcentaje_fuera_rango:.2f}%) fuera del rango 16-80 (dentro del umbral del {outlier_threshold_percent}%)")


Validando rangos contextuales (edad 16-80) con tolerancia del 2% a outliers...
   ✅ age: 0 valores (0.00%) fuera del rango 16-80 (dentro del umbral del 2.0%)


## Comprobar la lógica work_hours_per_day + sleep_hours ≤ 24

In [None]:
df = dfProductivity.withColumn(
    "total_hours",
    col("work_hours_per_day") + col("sleep_hours")
)

# Filtrar las filas donde total_hours > 24
df_filtered = df.filter(col("total_hours") > 24)

df_filtered.count()

0

## Validar horas de sueño fuera de rango

In [None]:
invalid_sleep = dfProductivity.filter((col("sleep_hours") <= 0) | (col("sleep_hours") > 16))
print("Horas de sueño fuera de rango:", invalid_sleep.count())

Horas de sueño fuera de rango: 0


## Validar coherencia temporal

In [None]:
temporal_issues = dfProductivity.filter(
    (col("work_hours_per_day") > 24) | (col("work_hours_per_day") < 0) |
    (col("screen_time_before_sleep") > 24) | (col("screen_time_before_sleep") < 0)
)
print("Coherencia temporal incorrecta:", temporal_issues.count())

Coherencia temporal incorrecta: 0


# Outliers

In [None]:
# Lista de columnas numéricas para analizar (ajusta según tus datos)
numeric_columns_for_outliers = [
    "age", "daily_social_media_time", "work_hours_per_day", "perceived_productivity_score",
    "actual_productivity_score", "stress_level", "sleep_hours", "screen_time_before_sleep",
    "breaks_during_work", "coffee_consumption_per_day", "days_feeling_burnout_per_month",
    "weekly_offline_hours", "job_satisfaction_score"
]

print("### Identificación de Outliers (usando IQR) ###")

for col_name in numeric_columns_for_outliers:
    # Calcula los cuartiles y el IQR
    quantiles = dfProductivity.approxQuantile(col_name, [0.25, 0.75], 0.05)
    q1 = quantiles[0]
    q3 = quantiles[1]
    iqr = q3 - q1

    # Define los límites para los outliers
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    # Cuenta los outliers
    outliers_count = dfProductivity.filter(
        (col(col_name) < lower_bound) | (col(col_name) > upper_bound)
    ).count()

    print(f"\nColumna '{col_name}':")
    print(f"  Q1: {q1:.2f}")
    print(f"  Q3: {q3:.2f}")
    print(f"  IQR: {iqr:.2f}")
    print(f"  Límite Inferior: {lower_bound:.2f}")
    print(f"  Límite Superior: {upper_bound:.2f}")
    print(f"  Número de outliers: {outliers_count}")

    # Opcional: Mostrar algunas filas con outliers (muestra las primeras 5 si hay)
    if outliers_count > 0:
        print("  Ejemplos de outliers:")
        dfProductivity.filter(
            (col(col_name) < lower_bound) | (col(col_name) > upper_bound)
        ).limit(5).show()

### Identificación de Outliers (usando IQR) ###

Columna 'age':
  Q1: 30.00
  Q3: 52.00
  IQR: 22.00
  Límite Inferior: -3.00
  Límite Superior: 85.00
  Número de outliers: 0

Columna 'daily_social_media_time':
  Q1: 1.86
  Q3: 3.99
  IQR: 2.13
  Límite Inferior: -1.33
  Límite Superior: 7.19
  Número de outliers: 624
  Ejemplos de outliers:
+---+------+----------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+-----------+------------------------+------------------+---------------+-----------------------------+--------------------------+------------------------------+--------------------+----------------------+
|age|gender|  job_type|daily_social_media_time|social_platform_preference|number_of_notifications|work_hours_per_day|perceived_productivity_score|actual_productivity_score|stress_level|sleep_hours|screen_time_before_sleep|breaks_during_work|uses_focus_apps|has_digit

In [None]:
# Guardar csv limpio, coalesce para que guarde el csv en una sola parte
dfProductivity.coalesce(1).write \
    .option("header", "true") \
    .mode("overwrite") \
    .csv("/content/dfProductivity_cleaned")

dataset_KNN.to_csv('/content/dfProductivity_KNN_cleaned.csv', index=False)

In [None]:
dfProductivity.show()

+---+------+----------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+-----------+------------------------+------------------+---------------+-----------------------------+--------------------------+------------------------------+--------------------+----------------------+
|age|gender|  job_type|daily_social_media_time|social_platform_preference|number_of_notifications|work_hours_per_day|perceived_productivity_score|actual_productivity_score|stress_level|sleep_hours|screen_time_before_sleep|breaks_during_work|uses_focus_apps|has_digital_wellbeing_enabled|coffee_consumption_per_day|days_feeling_burnout_per_month|weekly_offline_hours|job_satisfaction_score|
+---+------+----------+-----------------------+--------------------------+-----------------------+------------------+----------------------------+-------------------------+------------+-----------+-----------------------

In [None]:
spark.stop()