<img src="media/logo_psa.jpg" width="300">

<h1><center>Constructing MADAX U SAMARA</center></h1>

## Imports

In [1]:
%load_ext autoreload
%autoreload 2
import os
import datetime
import numpy as np
import pandas as pd
from functools import reduce
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 100)

from pyspark.sql import functions as F
import pyspark.sql.types as StringType

from distribution_cost.configuration import spark_config
from distribution_cost.configuration.app import AppConfig
from distribution_cost.configuration.data import DataConfig
from distribution_cost.infra import oracle
from distribution_cost.domain import kpis

/gpfs/user/e587246/dco00/conf/application.yml
/gpfs/user/e587246/dco00


## Connection to Exadata (Optional)

In [2]:
# Database uri
app_config = AppConfig()

db_uri = app_config.db_uri_jdbc
db_uri_cx_oracle = app_config.db_uri_cx_oracle

# Data Config
data_config = DataConfig()

data_config.vhls_perimeter

sites = data_config.vhls_perimeter["sites"]
start_date = data_config.vhls_perimeter["start_date"]
end_date = data_config.vhls_perimeter["end_date"]
genr_door = data_config.vhls_perimeter["genr_door"]

## Creating Spark Session

In [3]:
# Create spark session
spark_context, spark_session = spark_config.get_spark(app_name="app-distribution-cost",
                                                      executors=3, executor_cores=4, executor_mem='8g',
                                                      dynamic_allocation=True, max_executors=8)

# spark_session.conf.set("spark.sql.crossJoin.enabled", "true")
# spark_session.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

## Loading the tables from HDFS

In [None]:
rangeMonths = pd.date_range(start = pd.to_datetime('1/08/2017',  dayfirst = True), 
                    periods = 31, freq = 'MS')

# for m in rangeMonths:
#    print(m)

In [4]:
for i in range(len(rangeMonths-1))
    # Read MADAX
    df_madax = spark_session.read.load("/user/e587247/data/raw/madaxv30c/COUNTRY=France/year="+ rangeMonths[i].year +"/month="+ rangeMonths[i].month)
    # Read SAMARA
    df_samara = spark_session.read.load("/user/e587247/data/raw/samarav30c/COUNTRY=France/year="+ rangeMonths[i].year +"/month="+ rangeMonths[i].month)
    # READ OPV
    df_opv = spark_session.read.load("/user/e587247/data/raw/opvv30c/COUNTRY=France/year="+ rangeMonths[i].year +"/month="+ rangeMonths[i].month)
    # READ TAX RATE (FROM ORACLE DB)?
    df_taxRate = spark_session.read.option("fetchsize", 10000).jdbc(db_uri, table='SMKT003_REF_TAX')

    # Renaming VIN column in madax, samara and opv and dropping year, month and day
    df_samara=df_samara.withColumnRenamed('CODE_VIN','VIN').withColumn('COUNTRY',F.lit('France')).drop('day')
    df_madax=df_madax.withColumnRenamed('NOVIN','VIN').withColumn('COUNTRY',F.lit('France')).drop('day')
    df_opv=df_opv.withColumnRenamed('CDC_NUMVIN', 'VIN').withColumn('COUNTRY', F.lit('France')).drop('day')

    # Make the outer join between MADAX and SAMARA on VIN and Country
    df_mx_outer_sm = df_madax.join(df_samara,on=['COUNTRY','VIN'] , how='outer')

    # Make the left join between OPV and MXSM on VIN and Country
    df_mx_outer_sm_opv = df_mx_outer_sm.join(df_opv, on=['COUNTRY','VIN'], how='left_outer')

    # Make the left join between tax rate and MXSM on VIN and Country
    df_mx_outer_sm_opv = df_mx_outer_sm_opv.join(df_taxRate, on=['COUNTRY'], how='left_outer')

    # Merge common columns between MADAX and SAMARA
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('BRAND', F.when(F.col('MARQUE').isNull(),F.col('SINQTMRQ_2__CODE')).otherwise(F.col('MARQUE')))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('DATE_SALE', F.when(F.col('DATVENT').isNull(),F.col('DT_VD')).otherwise(F.col('DATVENT')))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('DATE_ORDER', F.when(F.col('DATCCLT').isNull(),F.col('DATE_COMANDE')).otherwise(F.col('DATCCLT')))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('LIB_TYPUTIL', F.when(F.col('TYPUTIL').isNull(),F.col('TYP_UTIL_VD')).otherwise(F.col('TYPUTIL')))
    df_mx_outer_sm_opv=df_mx_outer_sm_opv.drop('MARQUE','DATVENT','DATCCLT','TYPUTIL','TYP_UTIL_VD','DATE_COMANDE','DT_VD','SINQTMRQ_2__CODE')

    # Date formatting (dd-MM-yyyy)
    list_dates = ['DATMEC','DATE_SALE','DATPROD','DATMRES','DATMAD','DATEXPC','DATARCR','DATE_ORDER',"DATDEM","DATIMMAT","DT_FACT"]
    for d in list_dates:
        df_mx_outer_sm_opv = df_mx_outer_sm_opv.withColumn(d, F.to_date(F.col(d),'yyyy-MM-dd'))

    # Convert variables to float
    list_amounts = ['TOTAL_REMISE','NCL_VO_VALON','NCL_VO_IMPAYUDAREC','TOTAL_REMISE_PRE'] 
    for a in list_amounts:
        df_mx_outer_sm_opv = df_mx_outer_sm_opv.withColumn(a, F.regexp_replace(F.col(a),'\,','.'))
        df_mx_outer_sm_opv = df_mx_outer_sm_opv.withColumn(a, F.regexp_replace(F.col(a),'[^0-9\-\.]','').cast("float"))

        # Variable calculations
    df_mx_outer_sm_opv=df_mx_outer_sm_opv.withColumn('TOT_ADV_AFT_TAX', F.abs(F.col('TOTAL_REMISE')) + F.col('NCL_VO_IMPAYUDAREC'))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('TOT_ADV_PRE_TAX', F.col('TOT_ADV_AFT_TAX')/(1+F.col('TAX_RATE').cast('float')))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('TRADE_IN_AID_PRE',  F.col('NCL_VO_IMPAYUDAREC')/(1+F.col('TAX_RATE').cast('float')))

    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('DISC_AFT', F.when((F.abs(F.col('TOTAL_REMISE_PRE')) > 0) & (F.col('NCL_VO_IMPAYUDAREC').isNull()) & (F.col('NCL_VO_VALON').isNull()),1).otherwise(F.when(((F.abs(F.col('TOTAL_REMISE_PRE')) > 0) & (F.col('NCL_VO_IMPAYUDAREC') == 0) & (F.col('NCL_VO_VALON') == 0)),1).otherwise(0)))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('DISC_PRE', F.when(((F.abs(F.col('TOTAL_REMISE_PRE')) <= 0) | (F.col('NCL_VO_IMPAYUDAREC')<= 0) | (F.col('NCL_VO_VALON')<= 0)),0).otherwise(1))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('VIN_DM',F.col('DISC_AFT')+F.col('DISC_PRE'))

    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('VEH_AGE',  F.datediff(F.col('DATE_ORDER'),F.col('DATPROD')))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('STOCK_AGE',F.when(F.col('VEH_AGE')> 0, F.col('VEH_AGE')).otherwise(F.lit(None)))
    df_mx_outer_sm_opv= df_mx_outer_sm_opv.withColumn('DELIVERY_TIME', F.datediff(F.col('DATE_ORDER'),F.col('DATE_SALE'))*(-1))

    # Select columns from MX-SM-OPV (only COD_PDV from OPV)
    df_mx_outer_sm_opv=df_mx_outer_sm_opv.drop('LIB_ZDS','CRE_MARCA', 'DATE_COMANDE', 'NCL_PD_SUBTOTAL2', 'FINITION', 'ACC_PRE', 'NCL_VO_IMPAYUDAREC', 'FRAIS_ANEXXES', 'TOTAL_REMISE_PRE', 'TOTAL_REMISE', 'PRIX_FINAL', 'tarif+options_PRE', 'tarif+OPTTIONS', 'NCL_VO_PRIMACONVERSION', 'ACC', 'CONTRAT_SERVICE', 'BONUS_MALUS', 'NCL_VO_VALON', 'TRANSFORMATIONS', 'TAXE_PARAFISCALE', 'day','TAX_RATE')

    # Cast all float values into string (for power bi format ??)
    for c in df_mx_outer_sm_opv.columns:
        df_mx_outer_sm_opv = df_mx_outer_sm_opv.withColumn(c, F.col(c).cast('string'))

    df_mx_outer_sm_opv.drop_duplicates()
    # df_mx_outer_sm_opv.select('VIN','Country').distinct().count()
    # df_mx_outer_sm_opv\
#     .write\
#     .mode("overwrite")\
#     .partitionBy("year", "month", "day")\
#     .parquet("hdfs:///user/e587247/data/refined/mx_outer_sm/")
    df_mx_outer_sm_opv.write.option("truncate", "true").jdbc(url=db_uri, table="SMKT013_MXUSM", mode="overwrite")

### Writing the result in Oracle DB (Exadata)

In [14]:
spark_session.stop()