In [0]:
# Install required libraries
!pip install --upgrade pip
!pip install wordcloud
# Install the latest version of the library
!pip install --trusted-host nexus.aws-data-prd.dc-ifood.com --extra-index-url http://nexus.aws-data-prd.dc-ifood.com/repository/ifood-data-private-pypi/simple genplat-databricks-lib

# Import necessary libraries
from genplat_databricks_lib.setup import setup_databricks
text_generation_udf = setup_databricks()
from genplat_databricks_lib import text_generation
from genplat_databricks_lib import apply_text_generation_to_dataframe
import re
import pandas as pd
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, lit, concat, udf, explode, split, rank,count, first
from pyspark.sql.types import MapType, StringType
from pyspark.sql.window import Window
from wordcloud import WordCloud
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from ifood_databricks.gcp import gsheet
from ifood_databricks.gcp.gsheet import gsheets_data_dump
from pyspark.sql import functions as F


In [0]:
query = """
SELECT * FROM people.qualtrics.qualtrics_individual_responses_hcm_avd_hash_view
order by datereport desc
"""

fala_ai = spark.sql(query)
fala_ai = fala_ai.filter((col('survey_received_at') >= '2024-01-01')&(col('datereport') < '2025-11-01')).orderBy('pseudo_person_id')
fala_ai.display()


In [0]:
# Based on all questions and score until current date_report, select last date_report questions that happened until that date
from pyspark.sql.window import Window
from pyspark.sql.functions import when, add_months

window_spec = (
    Window.partitionBy("pseudo_person_id",'question')
    .orderBy(col("datereport").desc())
)
#enumerate questions 
exploded = fala_ai.withColumn("rn", rank().over(window_spec)).orderBy( col("pseudo_person_id"),col("rn"),col("datereport").desc())

exploded = exploded.withColumn(
    "voluntary_turnover_one_month_flag",
    when(
        (col("terminationaction") == "Voluntário") &
        (add_months(col("datereport"), 1) <= col("terminationdate")),
        lit(1)
    ).otherwise(lit(0))
)


exploded.filter(col('pseudo_person_id')=='1000000031717ZA').display()


In [0]:
exploded.display()


In [0]:
from pyspark.sql.functions import first, col
columns_to_group = ["pseudo_person_id", "datereport",'n4_owner_area','n3_owner_area','n2_owner_area',"voluntary_turnover_one_month_flag",'terminationdate',"score_before","score_before_2","grade","race","identidade_genero","salary_basis",'hcm_salary_amount_diff_abs','tempo_de_casa_em_dias','age']
pivoted_exploded = (
    exploded.groupBy(columns_to_group)
    .pivot("question_id")
    .agg(first("score"))
).orderBy("datereport")


display(pivoted_exploded.filter(col('pseudo_person_id')=='1000000031717ZA'))

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

pivoted_exploded = (
    exploded.groupBy(columns_to_group)
    .pivot("question_id")
    .agg(F.first("score"))
).orderBy("datereport")

# janela: por pessoa, ordenado por data, olhando tudo até a linha atual
w = (
    Window
    .partitionBy("pseudo_person_id")
    .orderBy("datereport")
    .rowsBetween(Window.unboundedPreceding, 0)
)

# identifica automaticamente as colunas de questões
question_cols = [
    c for c in pivoted_exploded.columns
    if c not in [columns_to_group]
]

# aplica forward fill em cada coluna de questão
filled = pivoted_exploded
for col in question_cols:
    filled = filled.withColumn(
        col,
        F.last(col, ignorenulls=True).over(w)
    )


In [0]:
filled.display()

In [0]:
from pyspark.sql.functions import lit

filled.groupBy('score_before').count().display()


In [0]:
dict_avd = {
  'Voando Alto': 5,
  'Brilhou na Entrega': 4,
  'Na Rota': 3,
  'Ajustando a Rota': 2,
  'Estacionado': 1,
}

In [0]:
filled.printSchema()

In [0]:
from pyspark.sql.functions import date_format



SALARY_BASIS_DICT = {
    "EXECUTIVE": "EXECUTIVE",
    "DATA EXECUTIVE": "EXECUTIVE",
    "DATA BI EXECUTIVE": "EXECUTIVE",
    "PRODUTO & DESIGNER EXECUTIVE": "EXECUTIVE",
    "TECH EXECUTIVE": "EXECUTIVE",
    "PRODUTO & DESIGNER": "PRODUTO E DESIGN",
    "DATA BI": "DATA BI",
    "DATA": "DATA",
    "NON-TECH": "NON-TECH",
    "TECH": "TECH",
    "IFOOD BENEFICIOS - SDR": "IFOOD BENEFICIOS",
    "IFOOD BENEFICIOS - CANAIS": "IFOOD BENEFICIOS",
    "IFOOD BENEFICIOS - ENGAJAMENTO": "IFOOD BENEFICIOS",
    "IFOOD BENEFICIOS - KEY ACCOUNT": "IFOOD BENEFICIOS",
    "IFOOD BENEFICIOS - CLOSER": "IFOOD BENEFICIOS",
    "ADS - COORD COMERCIAL": "COMERCIAL",
    "COMERCIAL - SUPERVISORES": "COMERCIAL",
    "COMERCIAL - COORDENACAO": "COMERCIAL",
    "IFOOD BENEFICIOS - COORD COMERCIAL": "COMERCIAL",
    "COMERCIAL - COORDENADORES": "COMERCIAL",
    "COMERCIAL - ZOOP": "COMERCIAL",
    "COMERCIAL - VENDAS EXTERNA": "COMERCIAL",
    "COMERCIAL - VENDAS INTERNA": "COMERCIAL",
    "COMERCIAL - KEY ACCOUNT": "COMERCIAL",
    "COMERCIAL - POS VENDAS": "COMERCIAL",
    "DATA_IFOOD FRANÇA": "GLOBAL MOBILITY",
    "DATA_IFOOD PORTUGAL": "GLOBAL MOBILITY",
    "EXECUTIVE TECH - IFOOD PORTUGAL": "GLOBAL MOBILITY",
    "FRANÇA - TECH/DATA/PROD/DESIGN_EXEC": "GLOBAL MOBILITY",
    "NON TECH - IFOOD PORTUGAL": "GLOBAL MOBILITY",
    "NON-TECH - IFOOD COLOMBIA": "GLOBAL MOBILITY",
    "PRODUTO & DESIGNER- IFOOD PORTUGAL": "GLOBAL MOBILITY",
    "TECH - IFOOD PORTUGAL": "GLOBAL MOBILITY",
    "TECH_IFOOD_FRANÇA": "GLOBAL MOBILITY",
}


#adapting salary basis to the dict SALARY_BASIS_DICT
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

salary_basis_map_udf = udf(lambda x: SALARY_BASIS_DICT.get(x, x) if x is not None else None, StringType())
filled = filled.withColumn("salary_basis", salary_basis_map_udf("salary_basis"))
filled 

In [0]:
#change score_before for dict_avd, if not in dict remove from df
from pyspark.sql.functions import when, col

filled = filled.withColumn(
    "sem_avaliacao",
    when(col("score_before").isNull() | (col("score_before") == 'Sem Avaliacao'), 1).otherwise(0)
)

filled = filled.withColumn(
    "sem_avaliacao_anterior",
    when(col("score_before_2").isNull() | (col("score_before_2") == 'Sem Avaliacao'), 1).otherwise(0)
)


filled = filled.withColumn(
    "score_before",
    when(col("score_before") == 'Voando Alto', 5)
    .when(col("score_before") == 'Brilhou na Entrega', 4)
    .when(col("score_before") == 'Na Rota', 3)
    .when(col("score_before") == 'Ajustando a Rota', 2)
    .when(col("score_before") == 'Estacionado', 1)
    .otherwise(None)
)

filled = filled.withColumn(
    "score_before_2",
    when(col("score_before_2") == 'Voando Alto', 5)
    .when(col("score_before_2") == 'Brilhou na Entrega', 4)
    .when(col("score_before_2") == 'Na Rota', 3)
    .when(col("score_before_2") == 'Ajustando a Rota', 2)
    .when(col("score_before_2") == 'Estacionado', 1)
    .otherwise(None)
)
#cria coluna sem avaliacao se null ou 'Sem Avaliacao'
from pyspark.sql.functions import when, col



filled = filled.filter(~((col("score_before").isNull()) & (col("sem_avaliacao") == 0)))

filled = filled.filter(~((col("score_before_2").isNull()) & (col("sem_avaliacao_anterior") == 0)))


In [0]:
filled.display()

In [0]:
cat_cols = ["race", "identidade_genero", "salary_basis"]

# Convert Spark DataFrame to pandas DataFrame
wide_df_pd = filled.toPandas()

# One-hot encode categorical columns
wide_df_ohe = pd.get_dummies(
    wide_df_pd,
    columns=cat_cols,
    dummy_na=True
)

In [0]:
wide_df_ohe

In [0]:
#save csv
wide_df_ohe.to_csv("juliana_alves_turnover_pred_wide_df.csv", index=False)

In [0]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer

#spliting X and Y to predicit flag voluntary_turnover_one_month_flag
X = wide_df_ohe.drop(["voluntary_turnover_one_month_flag"], axis=1)
y = wide_df_ohe["voluntary_turnover_one_month_flag"]

# Drop or encode non-numeric columns
# X_features = X.drop(['pseudo_person_id', 'datereport', 'terminationdate','n4_owner_area','n3_owner_area','n2_owner_area'], axis=1)
X_features = X.set_index(['pseudo_person_id', 'datereport', 'terminationdate','n4_owner_area','n3_owner_area','n2_owner_area'])


# Impute missing values
imputer = SimpleImputer(strategy='mean')
X_features_imputed = imputer.fit_transform(X_features)

X_train, X_test, y_train, y_test = train_test_split(
    X_features_imputed, y, test_size=0.2, random_state=42
)

clf = RandomForestClassifier(random_state=42)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

In [0]:
#feature importance
importances = clf.feature_importances_
feature_names = X_features.columns
feature_importance_df = pd.DataFrame({"Feature": feature_names, "Importance": importances})
feature_importance_df = feature_importance_df.sort_values(by="Importance", ascending=False)
feature_importance_df.reset_index(drop=True, inplace=True)
feature_importance_df

In [0]:
list(feature_importance_df.Feature)

In [0]:
#plot feature importance - top 10
top_10 = feature_importance_df.head(10)
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.barh(top_10["Feature"], top_10["Importance"])
plt.xlabel("Importance")
plt.title("")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()



In [0]:
#avaulatign acuracy, precision and recall
from sklearn.metrics import accuracy_score, precision_score, recall_score

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")


In [0]:
#confusion matrix
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix")
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.show()

In [0]:
X_features.head()

In [0]:
#make a simple cross validation 5 folds
from sklearn.model_selection import cross_val_score


from sklearn.model_selection import cross_validate

scoring = {
    "accuracy": "accuracy",
    "precision": "precision",
    "recall": "recall",
    "f1": "f1"
}

results = cross_validate(
    clf,
    X_train,
    y_train,
    cv=5,
    scoring=scoring,
    return_train_score=False
)


print(f"Cross-validation scores: {scores}")

In [0]:
import pandas as pd
display(pd.DataFrame(results))

In [0]:
from sklearn.model_selection import GridSearchCV

param_grid = {
    'n_estimators': [100, 300],
    'max_depth': [5, 15],
    'min_samples_split': [2, 5],
    'max_features': ['sqrt', 'log2']
}

grid_search = GridSearchCV(
    RandomForestClassifier(random_state=42),
    param_grid=param_grid,
    cv=5,
    scoring='f1',
    n_jobs=-1
)
grid_search.fit(X_train, y_train)
best_params = grid_search.best_params_
print(f"Best parameters: {best_params}")
#train model with best parameters
best_clf = RandomForestClassifier(**best_params, random_state=42)
best_clf.fit(X_train, y_train)
y_pred = best_clf.predict(X_test)
#avaulatign acuracy, precision and recall
from sklearn.metrics import accuracy_score, precision_score, recall_score

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
#confusion matrix
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix")
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.show()