In [2]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from datetime import date

from itertools import chain

In [3]:
# Necesario para añadir a la ruta sys.path las librerías necesarias Spark para trabajar en Python
findspark.init()

spark = SparkSession.builder.appName('dataCleaning').getOrCreate()

# headcount de una plantilla parcial
hc = spark.read.csv('employees.csv', header=True, inferSchema=True)
# BBDD emails corporativos
emails = spark.read.csv('emails.csv', header=True, inferSchema=True)
# BBDD todos los centros y sus responsables
listado = spark.read.csv('listado.csv', header=True, inferSchema=True)

In [4]:
# Renombra columnas
name_columns = {'Cent->Código_del_Centro':'id_place','Cent->Provincia':'provincia','Trab->Código_del_Trabajador':'id_empleado',
                'Trab->Apellidos_y_Nombre_del_Trabajador':'nombre','Trab->Descripción_del_Puesto_de_Trabajo':'puesto','Trab->Nacionalidad':'nacionalidad',
                'Trab->Fecha_Ingreso_(AAAA/MM/DD)':'fecha_in','Trab->Fecha_Baja_(AAAA/MM/DD)':'fecha_out','Trab->Fecha_Fin_Contrato_(AAAA/MM/DD)':'fecha_end',
                'Trab->Porcentaje_Jornada_tiempo_parcial':'jornada','Trab->Motivo_Baja':'motivo_baja','Trab->Sexo':'genero',
                'Trab->Fecha_Nacimiento_(AAAA/MM/DD)':'fecha_nacimiento','Trab->Dirección_E-mail':'email','Trab->Tipo_Contrato_(3_posiciones)':'id_contrato'}

for k, v in name_columns.items(): hc = hc.withColumnRenamed(k, v)

In [5]:
# Arregla fechas
today = date.today().strftime('%d/%m/%Y')

# Fechas 00/00/0000 y 31/12/2099 indican que el empleado está activo
hc = hc.withColumn('fecha_out', to_date(when((col('fecha_out')=='00/00/0000' ) | (col('fecha_out').isNull()), today).otherwise(col('fecha_out')), 'dd/MM/yyyy'))
hc = hc.withColumn('fecha_end', to_date(when((col('fecha_end')=='31/12/2099' ) | (col('fecha_end')=='00/00/0000' ) | (col('fecha_end').isNull()), today).otherwise(col('fecha_end')), 'dd/MM/yyyy'))

hc = hc.withColumn('fecha_in', to_date(hc['fecha_in'], 'dd/MM/yyyy'))
hc = hc.withColumn('fecha_nacimiento', to_date(hc['fecha_nacimiento'], 'dd/MM/yyyy'))

In [6]:
# Ordena por fecha descendente y borra duplicados, quedándose con la última actualización del empleado
hc = hc.orderBy(col('fecha_out').desc())
hc = hc.dropDuplicates(['id_empleado'])

# Fillna por 100, es decir, los nulos son jornada completa
hc = hc.withColumn('jornada', when((col('jornada').isNull()), 100).otherwise(col('jornada')))

# Fillna en el tipo de contrato por 0, que por defecto es CONTRATO INDEFINIDO
hc = hc.withColumn('id_contrato', when((col('id_contrato').isNull()), 0).otherwise(col('id_contrato')).cast('int'))
tipo_contrato = spark.read.csv('contrato.csv', header=True, inferSchema=True)
hc = hc.join(tipo_contrato, on='id_contrato')

In [7]:
# Crea un diccionario de llaves múltiples asociadas a un mismo valor y clasifica los motivos de baja
cambios = {

    **dict.fromkeys(['DECISION DE LA EMPRESA','Despido por causas objetivas','Despido de la persona trabaj','Sucesión de empresa'], 'Despido del trabajador'),

    **dict.fromkeys(['Suspensión del contrato o ER','Resolución de la persona tra','Reducción temporal de jornad','Cese por declaración de inva','Baja por excedencia maternal',
                    'Fin de la actividad fija dis','Baja por agotamiento I.T.'], 'Otros'),

    **dict.fromkeys(['DECISION DEL TRABAJADOR','Baja voluntaria del trabajad','Excedencia.','Baja voluntaria del trabajador','Excedencia','Baja por excedencia voluntar',
                        'Baja voluntaria de la person','Cese en período de prueba po','Decision del trabajador'], 'Baja voluntaria del trabajador'),

    **dict.fromkeys(['Fin de contrato temporal a i','Cese por expiración del tiem','Cese en período de prueba a','Cese en periodo de prueba'], 'Fin de contrato temporal')}

mapping_expr = create_map([lit(x) for x in chain(*cambios.items())])

hc = hc.withColumn('motivo_baja', coalesce(mapping_expr[hc['motivo_baja']], lit('Otros')))

In [8]:
# Cruza con BBDD de emails corporativos y establece ese correo si el empleado lo tuviese
hc = hc.withColumn('email', when(~col('email').rlike('[\w\.\-]+@\w+\.\w+'), None).otherwise(col('email')))

hc = hc.join(emails, on='id_empleado', how='left')

hc = hc.withColumn('email', when((col('email_corporativo').rlike('empresa')) | (col('email').isNull()), col('email_corporativo')).otherwise(col('email')))

hc = hc.drop('email_corporativo')

In [9]:
# Utilizamos el informe de oficinas para conseguir datos del lugar de trabajo, del supervisor y del jefe
hc = hc.join(listado, on='id_place', how='left')

In [10]:
# Utilizando las fechas, saca el estado (activo/salida), edad y el tiempo en empresa
hc = hc.withColumn('estado', when(col('fecha_out')>=date.today(), 'Activo').otherwise('Salida'))

hc = hc.withColumn('edad', round(months_between(lit(date.today()), col('fecha_nacimiento'))/lit(12)).cast('int'))

hc = hc.withColumn('dias_empresa', round(months_between(col('fecha_end'), col('fecha_in'))*lit(30)).cast('int'))

hc = hc.withColumn('años_empresa', round(col('dias_empresa')/lit(365)))

In [11]:
# Columna para agrupar por categorías de tramos de empresa
hc = hc.withColumn('tramo_empresa', when(col('años_empresa')<0.25, 'HASTA 3 MESES').when((col('años_empresa')>=0.25) & (col('años_empresa')<0.5), 'DE 3 A 6 MESES')\
        .when((col('años_empresa')>=0.5) & (col('años_empresa')<1), 'DE 6 MESES A 1 AÑO').when((col('años_empresa')>=1) & (col('años_empresa')<2), 'DE 1 A 2 AÑOS')\
        .when((col('años_empresa')>=2) & (col('años_empresa')<3), 'DE 2 A 3 AÑOS').when(col('años_empresa')>=3, 'MÁS DE 3 AÑOS').otherwise('unknown'))

# Columna para agrupar por categorías de edad
hc = hc.withColumn('tramo_edad', when(col('edad')<25, 'MENOR DE 25 AÑOS').when((col('edad')>=25) & (col('edad')<30), 'ENTRE 25 Y 30 AÑOS')\
        .when((col('edad')>=30) & (col('edad')<35), 'ENTRE 30 Y 35 AÑOS').when(col('edad')>=35, 'MAYOR DE 35 AÑOS').otherwise('unknown'))

# Columna para agrupar por categorías de tipo de contrato
hc = hc.withColumn('horas_contrato', when(col('jornada')<=50, 'CONTRATO DE 15 A 20 HORAS').when((col('jornada')>50) & (col('jornada')<75), 'CONTRATO DE 20 A 30 HORAS')\
    .when(col('jornada')>=75, 'CONTRATO DE 30 A 40 HORAS').otherwise('unknown'))

In [12]:
# Accede a la función de transformar el DF a un DF Pandas para exportar a csv
hc.toPandas().to_csv('cleaned_data.csv', index=False)

In [13]:
# Finaliza Spark y borra las actividades
spark.stop()
del spark