## Crea la sesión de Pyspark e importa todas las librerías

In [None]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from sklearn import preprocessing, svm
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
import pyarrow.parquet as pq
import os
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import sum, desc
import plotly.express as px

In [2]:
import findspark
findspark.init()

In [3]:
spark = (
    SparkSession.builder
    .appName("pyspark-abs")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

## Lee el archivo

In [4]:
file_path = "/dataset/absenteeism/Absenteeism_at_work.csv"
if os.path.isfile(file_path):
    print("File exists.")
else:
    print("File does not exist.")

File exists.


In [5]:
df = spark.read.csv("/dataset/absenteeism/Absenteeism_at_work.csv", header=True, sep=";")

In [6]:
df.show(10)

+---+------------------+----------------+---------------+-------+----------------------+-------------------------------+------------+---+----------------------+----------+--------------------+---------+---+--------------+-------------+---+------+------+---------------+-------------------------+
| ID|Reason for absence|Month of absence|Day of the week|Seasons|Transportation expense|Distance from Residence to Work|Service time|Age|Work load Average/day |Hit target|Disciplinary failure|Education|Son|Social drinker|Social smoker|Pet|Weight|Height|Body mass index|Absenteeism time in hours|
+---+------------------+----------------+---------------+-------+----------------------+-------------------------------+------------+---+----------------------+----------+--------------------+---------+---+--------------+-------------+---+------+------+---------------+-------------------------+
| 11|                26|               7|              3|      1|                   289|                        

## Renombra las columnas

In [7]:
new_column_names = {
    "ID": "EmployeeID",
    "Reason for absence": "AbsenceReason",
    "Month of absence": "AbsenceMonth",
    "Day of the week": "AbsenceDayOfWeek",
    "Seasons": "AbsenceSeason",
    "Transportation expense": "TransportExpense",
    "Distance from Residence to Work": "DistanceToWork",
    "Service time": "ServiceTime",
    "Age": "EmployeeAge",
    "Work load Average/day ": "WorkloadAverage",
    "Hit target": "HitTarget",
    "Disciplinary failure": "DisciplinaryFailure",
    "Education": "EmployeeEducation",
    "Son": "NumberOfSons",
    "Social drinker": "Drinker",
    "Social smoker": "Smoker",
    "Pet": "NumberOfPets",
    "Weight": "EmployeeWeight",
    "Height": "EmployeeHeight",
    "Body mass index": "BMI",
    "Absenteeism time in hours": "AbsenteeismHours"
}

for old_col, new_col in new_column_names.items():
    df = df.withColumnRenamed(old_col, new_col)

df.show()

+----------+-------------+------------+----------------+-------------+----------------+--------------+-----------+-----------+---------------+---------+-------------------+-----------------+------------+-------+------+------------+--------------+--------------+---+----------------+
|EmployeeID|AbsenceReason|AbsenceMonth|AbsenceDayOfWeek|AbsenceSeason|TransportExpense|DistanceToWork|ServiceTime|EmployeeAge|WorkloadAverage|HitTarget|DisciplinaryFailure|EmployeeEducation|NumberOfSons|Drinker|Smoker|NumberOfPets|EmployeeWeight|EmployeeHeight|BMI|AbsenteeismHours|
+----------+-------------+------------+----------------+-------------+----------------+--------------+-----------+-----------+---------------+---------+-------------------+-----------------+------------+-------+------+------------+--------------+--------------+---+----------------+
|        11|           26|           7|               3|            1|             289|            36|         13|         33|        239.554|       97

## Define el schema

In [8]:
df.printSchema()

root
 |-- EmployeeID: string (nullable = true)
 |-- AbsenceReason: string (nullable = true)
 |-- AbsenceMonth: string (nullable = true)
 |-- AbsenceDayOfWeek: string (nullable = true)
 |-- AbsenceSeason: string (nullable = true)
 |-- TransportExpense: string (nullable = true)
 |-- DistanceToWork: string (nullable = true)
 |-- ServiceTime: string (nullable = true)
 |-- EmployeeAge: string (nullable = true)
 |-- WorkloadAverage: string (nullable = true)
 |-- HitTarget: string (nullable = true)
 |-- DisciplinaryFailure: string (nullable = true)
 |-- EmployeeEducation: string (nullable = true)
 |-- NumberOfSons: string (nullable = true)
 |-- Drinker: string (nullable = true)
 |-- Smoker: string (nullable = true)
 |-- NumberOfPets: string (nullable = true)
 |-- EmployeeWeight: string (nullable = true)
 |-- EmployeeHeight: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- AbsenteeismHours: string (nullable = true)



In [9]:
def cast_cols(df, cols, new_type):
  for col in cols: 
     df = df.withColumn(col, df[col].cast(new_type()))
  return df

In [10]:
integer_cols = ['EmployeeID',
 'AbsenceReason',
 'AbsenceMonth',
 'AbsenceDayOfWeek',
 'AbsenceSeason',
 'TransportExpense',
 'DistanceToWork',
 'ServiceTime',
 'EmployeeAge',
 'WorkloadAverage',
 'HitTarget',
 'DisciplinaryFailure',
 'EmployeeEducation',
 'NumberOfSons',
 'Drinker',
 'Smoker',
 'NumberOfPets',
 'EmployeeWeight',
 'EmployeeHeight',
 'BMI',
 'AbsenteeismHours']

In [11]:
df = cast_cols(df, integer_cols, IntegerType)
df.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- AbsenceReason: integer (nullable = true)
 |-- AbsenceMonth: integer (nullable = true)
 |-- AbsenceDayOfWeek: integer (nullable = true)
 |-- AbsenceSeason: integer (nullable = true)
 |-- TransportExpense: integer (nullable = true)
 |-- DistanceToWork: integer (nullable = true)
 |-- ServiceTime: integer (nullable = true)
 |-- EmployeeAge: integer (nullable = true)
 |-- WorkloadAverage: integer (nullable = true)
 |-- HitTarget: integer (nullable = true)
 |-- DisciplinaryFailure: integer (nullable = true)
 |-- EmployeeEducation: integer (nullable = true)
 |-- NumberOfSons: integer (nullable = true)
 |-- Drinker: integer (nullable = true)
 |-- Smoker: integer (nullable = true)
 |-- NumberOfPets: integer (nullable = true)
 |-- EmployeeWeight: integer (nullable = true)
 |-- EmployeeHeight: integer (nullable = true)
 |-- BMI: integer (nullable = true)
 |-- AbsenteeismHours: integer (nullable = true)



## Exploratory Data Analysis

In [13]:
pandas_df = df.toPandas()
pandas_df.head(10)

Unnamed: 0,EmployeeID,AbsenceReason,AbsenceMonth,AbsenceDayOfWeek,AbsenceSeason,TransportExpense,DistanceToWork,ServiceTime,EmployeeAge,WorkloadAverage,...,DisciplinaryFailure,EmployeeEducation,NumberOfSons,Drinker,Smoker,NumberOfPets,EmployeeWeight,EmployeeHeight,BMI,AbsenteeismHours
0,11,26,7,3,1,289,36,13,33,239,...,0,1,2,1,0,1,90,172,30,4
1,36,0,7,3,1,118,13,18,50,239,...,1,1,1,1,0,0,98,178,31,0
2,3,23,7,4,1,179,51,18,38,239,...,0,1,0,1,0,0,89,170,31,2
3,7,7,7,5,1,279,5,14,39,239,...,0,1,2,1,1,0,68,168,24,4
4,11,23,7,5,1,289,36,13,33,239,...,0,1,2,1,0,1,90,172,30,2
5,3,23,7,6,1,179,51,18,38,239,...,0,1,0,1,0,0,89,170,31,2
6,10,22,7,6,1,361,52,3,28,239,...,0,1,1,1,0,4,80,172,27,8
7,20,23,7,6,1,260,50,11,36,239,...,0,1,4,1,0,0,65,168,23,4
8,14,19,7,2,1,155,12,14,34,239,...,0,1,2,1,0,0,95,196,25,40
9,1,22,7,2,1,235,11,14,37,239,...,0,3,1,0,0,1,88,172,29,8


In [23]:
fig = px.ecdf(df, x="AbsenteeismHours"
             ,title="ECDF de las horas de ausentismo")
fig.show()

In [25]:
fig = px.bar(df,x="AbsenceReason", y="AbsenteeismHours", title="Horas de ausentismo según Absence Reason")
fig.show()

In [29]:
reason_hours_df = df.groupBy("AbsenceReason").agg(sum("AbsenteeismHours").alias("TotalHours"))
sorted_reasons_df = reason_hours_df.orderBy(desc("TotalHours"))

top_reason = sorted_reasons_df.first()

print(f"The AbsenceReason with the highest total hours is: {top_reason['AbsenceReason']}")
print(f"Total Hours: {top_reason['TotalHours']}")

The AbsenceReason with the highest total hours is: 13
Total Hours: 842


In [26]:
fig = px.scatter(df, x="EmployeeAge", y="AbsenteeismHours", title="")
fig.show()

In [30]:
fig = px.bar(df, x="AbsenceMonth", y="AbsenteeismHours", color='AbsenceSeason', title="Horas de ausentismo según mes")
fig.show()

In [31]:
fig= px.box(df, x="Drinker", y="AbsenteeismHours", title="Boxplot de Horas de ausentismo vs Drinker")
fig.show()

## Modelo 1: Regresión linear

In [55]:
X= pandas_df.drop('AbsenteeismHours',axis= 1)
y= pandas_df['AbsenteeismHours']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.25, random_state=123)

In [56]:
regr = LinearRegression()
regr.fit(X_train, y_train)
print(regr.score(X_test, y_test))

0.04493476332707469


In [58]:
predictions = regr.predict(X_test)

In [73]:
predictions_df = pd.DataFrame(predictions, columns=["Predictions"])

X_test_reset = X_test.reset_index(drop=True)
predictions_df_reset = predictions_df.reset_index(drop=True)

X_test_with_predictions = pd.concat([X_test_reset['EmployeeID'], predictions_df_reset], axis=1)

### Guarda las predicciones de la Regresión en un parquet

In [93]:
X_test_with_predictions.to_parquet("X_test_with_predictions.parquet", index=False, engine="pyarrow")

In [98]:
df_predict = pq.read_table("X_test_with_predictions.parquet").to_pandas()
df_predict = df.astype(str)
df_predict.head(10)

## Modelo 2: Support Vector Machine

In [101]:
clf = make_pipeline(StandardScaler(), SVC(gamma='auto'))

In [102]:
clf.fit(X_test, y_test)
print(clf.score(X_test, y_test))

0.6324324324324324


In [103]:
predictions_svm = clf.predict(X_test)

predictions_df_svm = pd.DataFrame(predictions_svm, columns=["Predictions"])
predictions_df_reset_svm = predictions_df_svm.reset_index(drop=True)

X_test_with_predictions_svm = pd.concat([X_test_reset['EmployeeID'], predictions_df_reset_svm], axis=1)

### Guarda las predicciones del SVM en un parquet

In [104]:
X_test_with_predictions_svm.to_parquet("X_test_with_predictions_svm.parquet", index=False, engine="pyarrow")

In [105]:
df_predict_svm = pq.read_table("X_test_with_predictions_svm.parquet").to_pandas()
df_predict_svm = df.astype(str)
df_predict_svm.head(10)

Unnamed: 0,EmployeeID,Predictions
0,11,11.484386567219737
1,11,12.2876859508823
2,28,0.935248445771542
3,29,2.379050193870569
4,27,9.347522928202244
5,5,-1.0256438137066013
6,17,2.1242407287445038
7,5,-4.2730098622404284
8,18,5.597063199692464
9,27,0.4450694654038756
