In [1]:
import os
import pyspark.sql.functions as f 
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when,lit

import pandas as pd
pd.reset_option('max_columns')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 25)

In [7]:
from rsc_repository import preprocess
from rsc_repository import scoring_function

### 1. Import raw tables

In [10]:
df_source_avt =     spark.table("X.X").where("dt_partition <> 202012")
df_dossier_sin =    spark.table("X.X").where("dt_partition = 202104 & cdappli = 13 & cdcie = 14")
df_contrat =        spark.table("X.X").where("dt_partition = 202104")
df_prestations =    spark.table("X.X").where("cdappli = 13 & cdcie = 14")
df_socinfo =        spark.table("X.X").where("dt_partition = 202104")
df_adhesionassure = spark.table("X.X")
df_ref_garantie =   spark.table("X.X")

In [11]:
# Objectif 1 : Inclure les sinistres X.X comme point de départ

# 1) Montrer un exemple de dossier qu'on ne retrouve pas dans AVT
# pass

# 2) Regarder la longueur des numéros de dossiers dans X.X
# filtre sur la longueur des numéros de références 6,7,8,9

# 3) compter les différentes longueur des numéros de dossiers sinistre dans la denrière partition

# 4) est-ce qu'il n'existe pas des colonnes permettant l'identification des dossiers pcoll
# cdcie = 14
# cdappli = 13

### 2. Create reference table: df_sinistres

In [12]:
df_sinistres = preprocess.create_df_sinistres_ref(df_source_avt)

# Verifier qu'il y a une ligne par sinistre
assert df_sinistres.count() == df_sinistres.select("numutipre").distinct().count()

### 2. bis Rajouter information sur la nature du sinistre (Inc/Inv/DC) et Arrêt Maladie ou Accident du travail

In [13]:
# Rajout de deux nouvelles colonnes
# 1. nat_risque : 'INC' / 'INV' / 'DC'
# 2. sin_evemt : 'arret_maladie' / 'accident_travail' / 'autre'

df_sinistres_nat_sin = preprocess.ajout_nat_risque(df_sinistres,df_ref_garantie)

# Verifier qu'il y a une ligne par sinistre
assert df_sinistres_nat_sin.count() == df_sinistres_nat_sin.select("numutipre").distinct().count()

### 3. Add prestations

In [14]:
def ajout_sinistres_prestations(df_sinistres, df_dossier_sin, df_contrat, df_prestations, df_socinfo):
    df_contrat = df_contrat.withColumnRenamed("cdmodges", "mode_gestion")\
                        .withColumnRenamed("dteffcon", "date_effet")\
                        .withColumnRenamed("dtsurv", "datsursin")\
                        .withColumnRenamed("nucontra", "numconuti")

    df_contrat = df_contrat.groupBy("numconuti").agg(f.max('date_effet').alias('date_effet'),\
                                                  f.max('dtdermod').alias('dtdermod'))
    
    # TODO @Abdel - Priorité 2: delai_effcon_sinistre et delai_modcon_sinistre colonnes a integrer dans scoring fraude
    # parametrer seuil pour alerte (1 mois)
    
    # UPDATE (20/07/2021) TODO @Abdel : fait dans crit_effcon et crit_modcon
    
    df_sinistres_presta = df_sinistres.join(df_contrat,'numconuti', how="left")
    df_sinistres_presta = df_sinistres_presta.withColumn("delai_effcon_sinistre", f.datediff("date_effet", "datsursin"))
    df_sinistres_presta = df_sinistres_presta.withColumn("delai_modcon_sinistre", f.datediff("dtdermod", "datsursin"))
    
    return df_sinistres_presta

In [15]:
df_sinistres_prestation = preprocess.ajout_sinistres_prestations(df_sinistres_nat_sin, df_dossier_sin, df_contrat, df_prestations, df_socinfo)

# Verifier no duplicates in left join
assert df_sinistres.count() == df_sinistres_prestation.count()

### 4. num adhesions

In [16]:
df_sinistres_presta_adh = preprocess.ajout_nb_adhesions(df_sinistres_prestation, df_adhesionassure)

# Verifier no duplicates in left join
assert df_sinistres.count() == df_sinistres_presta_adh.count()

### 5. Insee

In [17]:
df_portefeuille = spark.table('X.X')
df_insee = spark.table('X.X')

In [18]:
df_sin_portefeuille = df_sinistres_prestation.join(
    df_portefeuille
    .withColumn('siren', col('siret').substr(1,9))
    .withColumnRenamed('num_contrat', 'numconuti')
    .select('numconuti', 'siren', 'cd_postal').distinct(), 
    'numconuti', 
    'left' 
)

#df_sin_portefeuille.count()

In [19]:
df_sin_insee = df_sin_portefeuille.join(
    df_insee.select('siren', 'apet700', 'dcret').distinct(), 'siren', how='left'
) # TODO - get one row per sinistre

### 6. Scoring

- <ins> Periodicite </ins>
    - p_1 : Plusieurs sinistres le même jours
    - p_2 : +/- 3 jours à la date de survenance
    - p_3 : le même jour, même mois mais à une année antérieure / postérieure
    - p_4 : le même mois, à une année antérieure / postérieure  avec +/- 3 jours de décalage
    - p_5 : le nombre de déclaration de sinistre par mois est supérieur à 3 (strictement)
    - -> Colonne : crit_period
- <ins> Contrat_multiple </ins>
    - S'il existe plusieurs contrats pour le même assuré
    - -> Colonne : crit_multiplicite_contrat

In [97]:
df_master = df_sinistres_presta_adh

In [99]:
params = {'delai_eff_con' : 30 ,\
          'delai_mod_con' : 30 ,\
          'nb_assure' : 3,\
          'limit_nb_sin_mois':3,
          'delai_periodicite':30}

params_poids_score = {'crit_periodicite_p_1' : 4,\
                      'crit_periodicite_p_2' : 3,\
                      'crit_periodicite_p_3' : 2,\
                      'crit_periodicite_p_4' : 1,\
                      'crit_periodicite_p_5' : 2,\
                      'crit_multiplicite_contrat' : 2,\
                      'crit_nb_assure' : 1,\
                      'crit_delai_eff_con' : 1,\
                      'crit_delai_modcon' :1}

In [100]:
#df_master = scoring_function.scoring(df_master,params,params_poids_score)
df_master = scoring(df_master,params,params_poids_score)
# Verifier no duplicates in left join
assert df_sinistres.count() == df_master.count()

### Sauvegarde

In [101]:
df_master = df_master.toPandas()

                                                                                ]]

In [102]:
df_master.to_excel("scoring_data.xlsx")