In [0]:
dbutils.widgets.removeAll()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [0]:
dbutils.widgets.text("catalogo", "catalog_Proy_fin_dev")
dbutils.widgets.text("esquema_source", "bronze")
dbutils.widgets.text("esquema_sink", "silver")

In [0]:
catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

In [0]:
%skip
%sql
select * from catalog_proy_fin_dev.bronze.employees limit 50 

In [0]:
%skip
%sql
select count(*) from catalog_proy_fin_dev.bronze.payroll where year(pay_period_start) = 2024
--order by pay_period_start--where employee_benefits <40
-- limit 10

In [0]:
def beneficio_categoria(employee_benefits):
    if employee_benefits == 0:
        return "No accede a beneficios"
    elif 0 < employee_benefits < 40:
        return "vendedor promedio"
    else:
        return "Vendedor destacado"

In [0]:
beneficio_udf = F.udf(beneficio_categoria, StringType())

In [0]:
df_employees = spark.table(f"{catalogo}.{esquema_source}.employees")
df_payroll = spark.table(f"{catalogo}.{esquema_source}.payroll")

In [0]:
df_employees = df_employees.dropna(how="all")\
                        .filter((col("employee_id").isNotNull()))

df_payroll = df_payroll.dropna(how="all")\
                    .filter((col("pay_period_start").isNotNull()) | (col("employee_id").isNotNull()))

In [0]:
df_payroll = df_payroll.withColumn("benefits_category", beneficio_udf("employee_benefits"))

In [0]:
df_payroll = df_payroll.withColumn("pay_year", year(col("pay_period_start")))
df_payroll.limit(5).display()

In [0]:
df_joined = df_payroll.alias("x").join(df_employees.alias("y"), col("x.employee_id") == col("y.employee_id"), "inner")

Creamos una columna de aÃ±o y tomamos periodo de pagos despues de pandemia...

In [0]:
df_filtered_sorted = df_joined.filter(df_payroll.pay_year > 2022).orderBy("x.employee_id")

In [0]:
df_filtered_sorted.display()

In [0]:
%skip
df_filtered_sorted = df_filtered_sorted.withColumn(
    "years_diferences", 
    F.year(F.current_date()) - F.col("pay_year")
)

In [0]:
df_aggregated = df_filtered_sorted.groupBy("pay_year", "location_desc").agg(
    F.count("y.employee_id").alias("num_pays_per_year_per_location")
).orderBy("pay_year")

In [0]:
df_aggregated.display()

In [0]:
df_with_days_payed_diff = df_filtered_sorted.withColumn(
    "days_pay_off",
    F.datediff("pay_period_start", "hire_date")
)
df_with_days_payed_diff.limit(5).display()

In [0]:
df_updated = df_with_days_payed_diff.select("*",
                                    when(col("location_desc").isin("YYC-NW", "YYC-WH","YYC-SE"), lit("Store")).otherwise("Warehouse").alias("Location")).drop(col("y.employee_id"),col("y.ingestion_date"))

In [0]:
df_updated.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.payroll_transformed")