In [None]:
%config Completer.use_jedi = False

In [None]:
import os.path
from pymongo import MongoClient
from pymongo.cursor import Cursor

import pandas as pd
import numpy as np
from datetime import datetime
import pdb

# Set logging level to INFO
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

from predictsignauxfaibles.utils import MongoDBQuery, MongoParams
import predictsignauxfaibles.config as config
import pytz

# Part 1 - Métriques à mars 2020 (modèle)

In [None]:
load_features_from_file = True
load_scores_from_file = True
features_path = "/home/simon.lebastard/predictsignauxfaibles/data/features_2103.json"
scores_path = "/home/simon.lebastard/predictsignauxfaibles/data/scores_2103.json"

FEATURES_LIST = [
    "_id.siret",
    "_id.periode",
    "value.code_commune",
    "value.departement",
    "value.region",
    "value.age",
    "value.effectif",
    "value.effectif_entreprise",
    "value.time_til_failure",
    "value.outcome",
    "value.time_til_outcome",
    #insee
    "value.code_naf",
    "value.libelle_naf",
    "value.code_ape_niveau2",
    "value.libelle_ape2",
    "value.code_ape_niveau3",
    "value.libelle_ape3",
    "value.code_ape",
    # urssaf
    "value.montant_part_ouvriere",
    "value.ratio_dette",
    "value.cotisation",
    "value.cotisation_moy12m",
    "value.montant_part_ouvriere",
    "value.montant_part_patronale",
    "value.apart_heures_autorisees",
    "value.apart_heures_consommees",
    "value.apart_heures_consommees_cumulees",
    "value.apart_entreprise",
    "value.paydex_nb_jours", #paydex
    "value.dette_fiscale", #bdf
    "value.endettement", #diane
    "value.taux_endettement", #diane
]

LIBELLE_NAF = {
    "A": "Agriculture, sylviculture et pêche",
    "B": "Industries extractives",
    "C": "Industrie manufacturière",
    "D": "Production et distribution d'électricité, de gaz, de vapeur et d'air conditionné",
    "E": "Production et distribution d'eau ; assainissement, gestion des déchets et dépollution",
    "F": "Construction",
    "G": "Commerce ; réparation d'automobiles et de motocycles",
    "H": "Transports et entreposage",
    "I": "Hébergement et restauration",
    "J": "Information et communication",
    "K": "Activités financières et d'assurance",
    "L": "Activités immobilières",
    "M": "Activités spécialisées, scientifiques et techniques",
    "N": "Activités de services administratifs et de soutien",
    "O": "Administration publique",
    "P": "Enseignement",
    "Q": "Santé humaine et action sociale",
    "R": "Arts, spectacles et activités récréatives",
    "S": "Autres activités de services",
    "T": "Activités des ménages en tant qu'employeurs ; activités indifférenciées des ménages en tant que producteurs de biens et services pour usage propre",
    "U": "Activités extra-territoriales",
}

In [None]:
def str_to_datetime(date_str: str):
    """
    Converts a string in the YYYY-MM-DD format into a
    datetime
    """
    return pytz.utc.localize(datetime.strptime(date_str, "%Y-%m-%d"))


def datetime_to_str(dt: datetime):
    """
    Converts a datetime.datetime object into its YYYY-MM-DD string representation
    """
    return dt.strftime('%Y-%m-%d')

def unravel_features(df: pd.DataFrame):
    ids = df["_id"].apply(pd.Series)
    values = df["value"].apply(pd.Series)
    return pd.concat([ids, values], axis=1)


def load_features_from_mongo(date_min: str, date_max: str, save_to_path: str = None):
    features_mongo = MongoClient(host = "mongodb://labbdd").get_database("prod").get_collection("Features")
    match_stage = {
        "$match": {
            "$and": [
                {
                "_id.periode": {
                    "$gte": str_to_datetime(date_min),
                    "$lt": str_to_datetime(date_max),
                    },
                "_id.batch": "2103_0_urssaf"
                },
                #{"value.effectif": {"$gte": min_effectif}},
            ]
        }
    }

    project_stage = {
        "$project": {field: 1 for field in FEATURES_LIST}
    }
    pipeline = [match_stage, project_stage]

    features_cursor = features_mongo.aggregate(pipeline)
    logging.info("Loading features from MongoDB...")
    features_raveled = pd.DataFrame(features_cursor)
    logging.info("... Success! Unravelling dataframe...")
    features = unravel_features(features_raveled)
    features["siret"] = features.siret.astype(int)
    logging.info("...done!")

    if save_to_path is not None:
        logging.info("Saving fetched Features data to disk")
        features_tosave = features.copy()
        features_tosave.periode = features_tosave.periode.apply(lambda x: x.strftime('%Y-%m-%d'))
        try:
            features_tosave.to_json(save_to_path, orient="records", default_handler=str)
            logging.info("Success")
        except:
            raise Exception("Features could not be saved to json")
        finally:
            del features_tosave

    return features


def load_features(date_min: str, date_max: str, from_file: bool = True, filepath: str = None):
    if from_file:
        if filepath is None:
            raise Exception("Requesting to load features from file, but no filepath was provided")
        try:
            features = pd.read_json(filepath, orient="records")
            logging.info(f"Succesfully loaded Features data from {filepath}")
            features.periode = features.periode.apply(str_to_datetime)
            return features
        except FileNotFoundError:
            logging.warning(f"Filepath {filepath} was not found on disk. Fetching for MongoDB")
    
    features = load_features_from_mongo(date_min, date_max, filepath)
    return features
        

def load_scores_from_mongo(batch_name: str, algo_name: str, save_to_path: bool = True):
    scores_mongo = MongoClient(host = "mongodb://labbdd").get_database("prod").get_collection("Scores")

    match_stage = {
        "$match": {
            "$and": [
                {
                "batch": batch_name,
                "algo": algo_name,
                },
            ]
        }
    }
    project_stage = {
        "$project": {field: 1 for field in ["siret", "batch", "algo", "periode", "alert", "score", "small_vs_final"]}
    }
    pipeline = [match_stage, project_stage]

    scores_cursor = scores_mongo.aggregate(pipeline)
    logging.info("Loading collection Scores from MongoDB...")
    scores = pd.DataFrame(scores_cursor)
    scores["periode"] = scores.periode.dt.tz_localize(None)
    scores.drop(columns=["_id"], inplace=True)

    if save_to_path is not None:
        logging.info("Saving fetched Scores data to disk")
        try:
            scores.to_json(save_to_path, orient="records", default_handler=str)
            logging.info("Success")
        except:
            raise Exception("Fetch of Scores could not be saved to json")
    
    return scores


def load_scores(batch_name: str, algo_name: str, from_file: bool = True, filepath: str = None):
    if from_file:
        if filepath is None:
            raise Exception("Requesting to load scores from file, but no filepath was provided")
        try:
            scores = pd.read_json(filepath, orient="records")
            logging.info(f"Succesfully loaded Scores data from {filepath}")
            scores.periode = scores.periode.apply(str_to_datetime)
            return scores
        except FileNotFoundError:
            logging.warning(f"Filepath {filepath} was not found on disk. Fetching for MongoDB")
    
    scores = load_scores_from_mongo(batch_name, algo_name, filepath)
    #scores.periode = scores.periode.apply(str_to_datetime)
    return scores

## Fetching features and scores data

In [None]:
features = load_features(date_min="2020-02-01", date_max="2020-02-28", from_file=False, filepath=features_path)
logging.info(f"Loaded {features.shape[0]} rows and {features.shape[1]} columns")

In [None]:
scores = load_scores(batch_name="2102_altares", algo_name="mars2021_v0", from_file=True, filepath=scores_path)
logging.info(f"Loaded {scores.shape[0]} rows and {scores.shape[1]} columns")

In [None]:
features["periode"] = features.periode.apply(datetime_to_str)

In [None]:
scores["periode"] = scores.periode.apply(datetime_to_str)

In [None]:
df = pd.merge(scores, features, on=['siret', 'periode'], how='inner')

In [None]:
df.columns

In [None]:
postproc_path = "/home/simon.lebastard/predictsignauxfaibles/data/postproc_2103.json"
if not os.path.isfile(postproc_path):
    logging.info("Saving joined post-processed data to disk...")
    df.to_json(postproc_path, orient="records", default_handler=str)
    logging.info(f"Saved to {postproc_path}")

Alternatively, load data directly from df stored on disk

In [None]:
if os.path.isfile(postproc_path):
    print("Loading post-processed data to disk")
    df = pd.read_json(postproc_path, orient="records")

## Aggregation of region-wide features

Niveaux de granularité considérés:
- région
- département

Pour chaque niveau de granularité:
- compter le nombre d'entreprises flaguées rouge par région
- compter le nombre d'entreprises flaguées orange par région
- calculer les ratio correspondants
- [BONUS: A NE PAS COMMUNIQUER] produire un score de risque moyen par région
- compter le nombre de défaillances effectives sur une période donnée, et calculer le ratio correspondants

Pour toutes les grandeurs calculées précédemment, calculer des équivalents en nombre d'employés concernés

En bonus: calculer des indicateurs de contribution des chaques 4 macrovariables du modèle

### Preprocessing steps

In [None]:
dct_nan_rpl = {
    "montant_part_ouvriere": 0.0,
    "montant_part_patronale": 0.0,
    "apart_heures_autorisees": 0.0,
    "apart_heures_consommees": 0.0,
    "apart_heures_consommees_cumulees": 0.0,
    "dette_fiscale": 0.0,
    "endettement": 0.0,
    "taux_endettement": 0.0
}

def preprocess(df):
    # create an outcome flag based only on failures since the beginning of the COVID crisis
    df["failure"] = (df["time_til_failure"]>=0) & (df["time_til_failure"]<12) # todo: automatiser le nombre de mois à regarder vers l'avant: entre mars 2020 et <THIS_MONTH>
    df["failure"] = df.failure.astype(int)

    # encode alert level into integer
    df["alert_flag"] = df.alert.replace({"Pas d'alerte": 0, "Alerte seuil F1": 1, "Alerte seuil F2": 2})
    df["alert_bin"] = (df.alert_flag > 0)

    # ratio dette/cotisation sur la part salariale des cotisations sociales
    df["ratio_dette_ouvriere"] = df["montant_part_ouvriere"] / df["cotisation"]
    df["ratio_dette_patronale"] = df["montant_part_patronale"] / df["cotisation"]
    return df

def replace_nans(df, replace_dct):
    for field, rpl in replace_dct.items():
        df[field].fillna(value=rpl, inplace=True)

In [None]:
replace_nans(df, dct_nan_rpl)
df = preprocess(df)

In [None]:
df.groupby(by=["failure"]).siret.count()

In [None]:
df.groupby(by=["alert_flag"]).siret.count()

In [None]:
df.groupby(by=["alert_bin"]).siret.count()

### Building aggregation dataframe

In [None]:
FEATURES_LIST

In [None]:
def aggregate_stats(geo_attr, outcome_attr):
    assert outcome_attr in ["alert_flag", "alert_bin", "failure", "outcome"]

    risk_ape3_stats = df.groupby(by=[geo_attr,outcome_attr,"libelle_naf","libelle_ape3"]).agg(
        siret_count=('siret', 'count'),
        effectif_tot=('effectif', 'sum'),
    )
    risk_naf_stats = df.groupby(by=[geo_attr,outcome_attr,"libelle_naf"]).agg(
        siret_count=('siret', 'count'),
        effectif_tot=('effectif', 'sum')
    )
    risk_stats = df.groupby(by=[geo_attr,outcome_attr]).agg(
        siret_count=('siret', 'count'),
        effectif_tot=('effectif', 'sum'),
        ratiodette_ouvr_avg=('ratio_dette_ouvriere', 'mean'),
        ratiodette_patr_avg=('ratio_dette_patronale', 'mean'),
        ratiodette_avg=('ratio_dette', 'mean'),
        apart_autr_avg=('apart_heures_autorisees', 'mean'),
        apart_cons_avg=('apart_heures_consommees', 'mean'),
        apart_cumcons_avg=('apart_heures_consommees_cumulees', 'mean'),
        paydex_avg=('paydex_nb_jours', 'mean'),
        taux_endettement_avg=('taux_endettement', 'mean'),
    )
    national_stats = df.groupby(by=[outcome_attr]).agg(
        siret_count=('siret', 'count'),
        effectif_tot=('effectif', 'sum'),
        ratiodette_ouvr_avg=('ratio_dette_ouvriere', 'mean'),
        ratiodette_patr_avg=('ratio_dette_patronale', 'mean'),
        ratiodette_avg=('ratio_dette', 'mean'),
        apart_autr_avg=('apart_heures_autorisees', 'mean'),
        apart_cons_avg=('apart_heures_consommees', 'mean'),
        apart_cumcons_avg=('apart_heures_consommees_cumulees', 'mean'),
        paydex_avg=('paydex_nb_jours', 'mean'),
        taux_endettement_avg=('taux_endettement', 'mean'),
    )

    risk_stats['siret_rate'] = 100*risk_stats.siret_count / risk_stats.groupby(by=geo_attr).siret_count.sum()
    risk_stats['effectif_rate'] = 100*risk_stats.effectif_tot / risk_stats.groupby(by=geo_attr).effectif_tot.sum()

    risk_stats['ape3_mostatrisk_eff'] = risk_ape3_stats.loc[risk_ape3_stats.index.get_level_values("libelle_naf").isin([LIBELLE_NAF[code] for code in ["B", "C", "D", "E"]])].groupby(by=[geo_attr,outcome_attr]).effectif_tot.idxmax()
    risk_stats['ape3_mostatrisk_eff'] = risk_stats.ape3_mostatrisk_eff.apply(lambda x: x[3] if isinstance(x, tuple) else "None")
    risk_stats['ape3_risk_eff'] = risk_ape3_stats.loc[risk_ape3_stats.index.get_level_values("libelle_naf").isin([LIBELLE_NAF[code] for code in ["B", "C", "D", "E"]])].groupby(by=[geo_attr,outcome_attr]).effectif_tot.max()

    risk_stats['naf_mostatrisk_eff'] = risk_naf_stats.groupby(by=[geo_attr,outcome_attr]).effectif_tot.idxmax()
    risk_stats['naf_mostatrisk_eff'] = risk_stats.naf_mostatrisk_eff.apply(lambda x: x[2] if isinstance(x, tuple) else "None")
    risk_stats['naf_risk_eff'] = risk_naf_stats.groupby(by=[geo_attr,outcome_attr]).effectif_tot.max()

    risk_stats['ape3_mostatrisk_etab'] = risk_ape3_stats.loc[risk_ape3_stats.index.get_level_values("libelle_naf").isin([LIBELLE_NAF[code] for code in ["B", "C", "D", "E"]])].groupby(by=[geo_attr,outcome_attr]).siret_count.idxmax()
    risk_stats['ape3_mostatrisk_etab'] = risk_stats.ape3_mostatrisk_etab.apply(lambda x: x[3] if isinstance(x, tuple) else "None")
    risk_stats['ape3_risk_etab'] = risk_ape3_stats.loc[risk_ape3_stats.index.get_level_values("libelle_naf").isin([LIBELLE_NAF[code] for code in ["B", "C", "D", "E"]])].groupby(by=[geo_attr,outcome_attr]).siret_count.max()

    risk_stats['naf_mostatrisk_etab'] = risk_naf_stats.groupby(by=[geo_attr,outcome_attr]).siret_count.idxmax()
    risk_stats['naf_mostatrisk_etab'] = risk_stats.naf_mostatrisk_etab.apply(lambda x: x[2] if isinstance(x, tuple) else "None")
    risk_stats['naf_risk_etab'] = risk_naf_stats.groupby(by=[geo_attr,outcome_attr]).siret_count.max()

    national_stats['siret_rate'] = 100*national_stats.siret_count / national_stats.siret_count.sum()
    national_stats['effectif_rate'] = 100*national_stats.effectif_tot / national_stats.effectif_tot.sum()


    risk_stats['siret_rate_to_ntl_avg'] = 100*(risk_stats.siret_rate - national_stats.siret_rate) / national_stats.siret_rate
    risk_stats['effectif_rate_to_ntl_avg'] = 100*(risk_stats.effectif_rate - national_stats.effectif_rate) / national_stats.effectif_rate
    risk_stats['ratiodette_ouvr_avg'] = 100*(risk_stats.ratiodette_ouvr_avg - national_stats.ratiodette_ouvr_avg) / national_stats.ratiodette_ouvr_avg
    risk_stats['ratiodette_patr_avg'] = 100*(risk_stats.ratiodette_patr_avg - national_stats.ratiodette_patr_avg) / national_stats.ratiodette_patr_avg
    risk_stats['ratiodette_avg'] = 100*(risk_stats.ratiodette_avg - national_stats.ratiodette_avg) / national_stats.ratiodette_avg
    risk_stats['apart_autr_avg'] = 100*(risk_stats.apart_autr_avg - national_stats.apart_autr_avg) / national_stats.apart_autr_avg
    risk_stats['apart_cons_avg_to_ntl_avg'] = 100*(risk_stats.apart_cons_avg - national_stats.apart_cons_avg) / national_stats.apart_cons_avg
    risk_stats['apart_cumcons_avg_to_ntl_avg'] = 100*(risk_stats.apart_cumcons_avg - national_stats.apart_cumcons_avg) / national_stats.apart_cumcons_avg
    risk_stats['paydex_avg_to_ntl_avg'] = 100*(risk_stats.paydex_avg - national_stats.paydex_avg) / national_stats.paydex_avg
    risk_stats['taux_endettement_avg'] = 100*(risk_stats.taux_endettement_avg - national_stats.taux_endettement_avg) / national_stats.taux_endettement_avg

    return risk_stats

In [None]:
reg_risk_stats = aggregate_stats(geo_attr="region", outcome_attr="alert_flag")
reg_riskbin_stats = aggregate_stats(geo_attr="region", outcome_attr="alert_bin")
reg_fail_stats = aggregate_stats(geo_attr="region", outcome_attr="failure")

dpt_risk_stats = aggregate_stats(geo_attr="departement", outcome_attr="alert_flag")
dpt_riskbin_stats = aggregate_stats(geo_attr="departement", outcome_attr="alert_bin")
dpt_fail_stats = aggregate_stats(geo_attr="departement", outcome_attr="failure")

In [None]:
reg_risk_outpath_root = "/home/simon.lebastard/predictsignauxfaibles/data/reg_2103"
dpt_risk_outpath_root = "/home/simon.lebastard/predictsignauxfaibles/data/dpt_2103"

reg_risk_stats.to_csv(f"{reg_risk_outpath_root}_riskflag.csv")
dpt_risk_stats.to_csv(f"{dpt_risk_outpath_root}_riskflag.csv")
reg_riskbin_stats.to_csv(f"{reg_risk_outpath_root}_riskbin.csv")
dpt_riskbin_stats.to_csv(f"{dpt_risk_outpath_root}_riskbin.csv")
reg_fail_stats.to_csv(f"{reg_risk_outpath_root}_failures.csv")
dpt_fail_stats.to_csv(f"{dpt_risk_outpath_root}_failures.csv")