In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.preprocessing import MinMaxScaler, StandardScaler, OneHotEncoder

from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
from pathlib import Path 

In [None]:
# Initialize Spark session (usually pre-configured in Databricks)
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

In [None]:
# Define the file partition
partition = (datetime.today().date() - timedelta(days=1)).strftime("%Y-%m-%d")

In [None]:
# Transformation function
def calculate_working_minutes(row):

    logout_time = datetime.strptime(row['LogoutTime'], '%Y-%m-%dT%H:%M:%S.%fZ')
    login_time = datetime.strptime(row['LoginTime'], '%Y-%m-%dT%H:%M:%S.%fZ')
    
    duration = logout_time - login_time
    minutes = duration.total_seconds() / 60
    
    return minutes

In [None]:
# Import data from DBFS
working_mins = spark.read.option("header", True).csv(f"/Workspace/Users/vivi.belogianni@soda.io/employee_attrition/soda_settings/login_logout/PartitionDate={partition}").toPandas()

working_mins['PartitionDate'] = partition
working_mins["EmployeeID"] = working_mins["EmployeeID"].astype("int")

In [None]:
# Import data from Unity catalog 
employees = spark.sql("""SELECT * 
        FROM unity_catalog.employees.employee_info""").toPandas()

employee_survey = spark.sql("""SELECT * 
        FROM unity_catalog.employees.employee_survey""").toPandas()

manager_survey = spark.sql("""SELECT * 
        FROM unity_catalog.employees.manager_survey""").toPandas()


In [None]:
# Transofrmations

working_mins['WorkingMinutes'] = working_mins.apply(calculate_working_minutes, axis=1)

# # Handle missing values
employees[['Age', 'TotalWorkingYears', 'MonthlyIncome']] = employees[['Age', 'TotalWorkingYears', 'MonthlyIncome']].replace('NA', np.nan)

# # Convert columns to float
employees[['Age', 'TotalWorkingYears', 'MonthlyIncome']] = employees[['Age', 'TotalWorkingYears', 'MonthlyIncome']].astype(float)

# # Scale the columns 
scaler = StandardScaler()

employees[['Age', 'TotalWorkingYears', 'MonthlyIncome']] = scaler.fit_transform(employees[['Age','TotalWorkingYears', 'MonthlyIncome']])

# Encoding of categorical values 

#Initialize OneHotEncoder
encoder = OneHotEncoder(sparse_output=False)


to_encode = ['Department', 'Gender', 'JobRole']

encoded_data = encoder.fit_transform(employees[to_encode])

one_hot_df = pd.DataFrame(encoded_data, columns=encoder.get_feature_names_out(to_encode))
employees = pd.concat([employees, one_hot_df], axis=1).drop(to_encode, axis=1)


# Correlation analysis 

employees.drop(columns = ['Education', 'EmployeeCount', 'BusinessTravel',
                          'Over18', 'StandardHours', 'TrainingTimesLastYear', 
                          'EducationField', 'DistanceFromHome', 'YearsWithCurrManager'], axis=1, inplace=True)


In [None]:
# Combine data for exporting
input_data = (employees.merge(manager_survey, on='EmployeeID', how='left', suffixes=('_employees', '_manager')) 
                     .merge(employee_survey, on='EmployeeID', how='left', suffixes=('_employees', '_employee')) 
                     .merge(working_mins[['WorkingMinutes', 'EmployeeID', 'PartitionDate']], on='EmployeeID'))

In [None]:
input_data.columns = input_data.columns.str.replace(' ', '_')


In [None]:
spark_df = spark.createDataFrame(input_data)
spark_df = spark_df.withColumn('PartitionDate', col('PartitionDate').cast(DateType()))

spark_df.write \
    .mode("overwrite") \
    .partitionBy("PartitionDate") \
    .saveAsTable("unity_catalog.employees.input_data_attrition_model")