In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType
from  pyspark.sql.functions import to_timestamp

## Lectura de datos

In [None]:
path = "/data/sandboxes/ranh/data/proyectos/coe_credito/2020/motores/fichero_1_mejtit_rs_madiva_seg/*"

In [None]:
df = spark.read.parquet(path)

In [None]:
(df.count(),len(df.columns))

## Nos quedamos con las formalizadas

In [None]:
df_form = df.filter(F.col('info_form')==1)

In [None]:
df_form.count()

## Fechas

In [None]:
df_form = df_form.withColumn(
    "fec_anterior", F.add_months(F.col("risk_formalization_date"),-10))

# Subciclos

In [None]:
df_contratos = df_form.select("contrato28_in", "fec_anterior", "risk_formalization_date").distinct()

In [None]:
data_path="/data/master/risk/ecrk/data/"
file_name='t_ecrk_risk_subcycle_contract_m'
full_path_subcicle=data_path+file_name

In [None]:
df_subcicles = spark.read.parquet(full_path_subcicle)

In [None]:
df_subcicles = df_subcicles.withColumn('contrato28_in', 
                    F.concat(F.col('entity_id'),
                    F.col('branch_id'),
                    F.col('counterpart_id'),
                    F.col('oper_page_id') ))

In [None]:
df_form_subcicles = df_contratos.join(df_subcicles,["contrato28_in"])

In [None]:
# Mora en 10 meses antes - Indicador de cura
df_form_subcicles = df_form_subcicles.withColumn(
    "IND_CURA", F.when(((F.col('fec_anterior') < F.col('exit_subcycle_contr_date')) &
                        (F.col('exit_subcycle_contr_date') <= F.col('risk_formalization_date'))), "1").otherwise("0"))

In [None]:
# mora en el momento
df_form_subcicles = df_form_subcicles.withColumn(
    "IND_MORA_OBS", F.when(((F.col('entry_subcycle_contr_date') <= F.col('risk_formalization_date')) &
                            (F.col('risk_formalization_date') < F.col('exit_subcycle_contr_date'))), "1").otherwise("0"))

In [None]:
# Marcar como morosas aquellos clientes que tienen un ciclo de morosidad abierto dentro del periodo de comportamiento
df_form_subcicles = df_form_subcicles.withColumn(
    "IND_MORA", F.when((F.col('risk_formalization_date') < F.col('entry_subcycle_contr_date')), "1").otherwise("0"))

In [None]:
# aggregate contracts, take the value 1 if there is any.
df_form_subcicles_mora = df_form_subcicles.groupby("contrato28_in", 'risk_formalization_date').agg(
    F.max('IND_CURA').alias('IND_CURA'),
    F.max('IND_MORA_OBS').alias('IND_MORA_OBS'),
    F.max('IND_MORA').alias('IND_MORA'),
    F.min('entry_subcycle_contr_date').alias('FEC_ENTRADSUBC'))

In [None]:
df_form_subcicles_mora.count()

In [None]:
left_join = df_form.join(df_form_subcicles_mora,["contrato28_in", "risk_formalization_date"], how='left')

In [None]:
left_join = left_join.cache()

In [None]:
left_join.count()

In [None]:
# fill nulls with "0" for "IND_CURA", "IND_MORA_OBS" and "IND_MORA"
df_form_mora = left_join.withColumn(
    "IND_CURA", F.when(F.col("IND_CURA").isin(['0','1']), F.col("IND_CURA")).otherwise("0")).withColumn(
    "IND_MORA_OBS", F.when(F.col("IND_MORA_OBS").isin(['0','1']), F.col("IND_MORA_OBS")).otherwise("0")).withColumn(
    "IND_MORA", F.when(F.col("IND_MORA").isin(['0','1']), F.col("IND_MORA")).otherwise("0"))

# Refis SAS Hreprog (no esta en Datio)

In [None]:
#actualizar con el Ãºltimo fichero hreprog
path_sandbox = "/data/sandboxes/ranh/data"
path_file = "/bbdd/externo/hreprog/hreprog20200422.csv"
sandbox_file = path_sandbox + path_file
df_hreprog = spark.read.csv(sandbox_file, header=True)

In [None]:
df_hreprog = df_hreprog.withColumn("contrato28_in",F.substring(df_hreprog['CONTRATO_RDO'], 1, 28))

In [None]:
df_hreprog=df_hreprog.withColumn("FEC_INI_REFI2",to_timestamp(F.col('FEC_INI_REFI'),"dd-MM-yyyy"))

In [None]:
df_mora_refi = df_form_mora.join(df_hreprog.select('contrato28_in','FEC_INI_REFI2'),
                                ["contrato28_in"], how='left')

In [None]:
df_mora_refi = df_mora_refi.cache()
df_mora_refi.count()

In [None]:
df_mora_refi = df_mora_refi.withColumn(
    "IND_REFI", F.when((F.col('FEC_INI_REFI2') >= F.col('risk_formalization_date')), "1").otherwise("0"))

In [None]:
df_mora_refi = df_mora_refi.withColumn(
    "IND_MORA_DEF",F.when( (F.col("IND_MORA")=="1") | (F.col("IND_REFI") == "1"), "1").otherwise("0"))

In [None]:
df_mora_refi = df_mora_refi.groupby("contrato28_in").agg(
    F.max('IND_MORA_DEF').alias('IND_MORA_DEF'),
    F.min('FEC_ENTRADSUBC').alias('FEC_ENTRADSUBC'),
    F.min('FEC_INI_REFI2').alias('FEC_INI_REFI2'))

In [None]:
df_mora_refi = df_mora_refi.cache()
df_mora_refi.count()

In [None]:
df_mora = df.join(df_mora_refi,["contrato28_in"], how='left')

In [None]:
df_mora.groupby('IND_MORA_DEF').count().show()