
+ Hadoop est un framework pour le stockage distribué ( HDFS ) et le traitement distribué ( YARN ).

+ **Apache Hive** is un data warehouse et outil  ETL  au dessus de la Apache Hadoop,  nous y avons crée nos bases de dedons. Donc nous avons besoin d'un context hive

+ Spark est un moteur de calcul distribué en mémoire.

+ Spark peut fonctionner avec ou sans composants Hadoop (HDFS/YARN)

+ Spark n'est pas lié au paradigme MapReduce en deux étapes et promet des performances jusqu'à 100 fois plus rapides que Hadoop MapReduce pour certaines applications

+ Le SparkContext peut se connecter à plusieurs types de cluster manager (local, Mesos ou YARN)

In [1]:
from pyspark import SparkContext, HiveContext, SparkConf 

# Configuration

Ceci est  pour exécuter le travail en mode local.

Il est utilisé pour tester le code dans une petite quantité de données dans un environnement local

Il ne fournit pas assez d' avantages de l'environnement distribué surtout dans la gestions des ressources

*  "*" est le nombre de cores à disponible à alloueer

In [2]:
conf = SparkConf().setMaster('local[*]')                # setMaster('yarn-client')
conf = conf.setAppName('my_second_app')
conf =  conf.set('spark.ui.port', '5050')               # definition du uri spark
conf= conf.set('spark.sql.shuffle.partitions','4')
sc = SparkContext(conf=conf)                            # Instanciation du context spark
hctx = HiveContext(sc)                                  # Instanciation du context hive

In [3]:
# sc.stop()                                             # arret du context spark

# Lecture de la table de donnée depuis Hive

In [4]:
df = hctx.sql("SELECT  idnt_comp_serv, flg_rsil, anc_actv, \
                      val_debit_down, libl_res, libl_tech, nb_iad, iad_anc_info, iad_userkey2, fdate_iad_anc, \
                      iad_model, iad_vendor,     iad_softwarever, rk_asc_iad_conf, nb_stv, stb_anc_info, \
                      fdate_stb_anc, stb_model, stb_vendor, stb_softwarever, rk_asc_stb_conf, noinadinfo, nostbinfo \
              FROM upec_2022.iad_stb_model A  WHERE rdn_samp=1")

In [5]:
df.show(2)

+--------------+--------+--------+--------------+--------+---------+------+------------+------------+-------------+---------+----------+--------------------+---------------+------+------------+-------------+--------------+----------+---------------+---------------+----------+---------+
|idnt_comp_serv|flg_rsil|anc_actv|val_debit_down|libl_res|libl_tech|nb_iad|iad_anc_info|iad_userkey2|fdate_iad_anc|iad_model|iad_vendor|     iad_softwarever|rk_asc_iad_conf|nb_stv|stb_anc_info|fdate_stb_anc|     stb_model|stb_vendor|stb_softwarever|rk_asc_stb_conf|noinadinfo|nostbinfo|
+--------------+--------+--------+--------------+--------+---------+------+------------+------------+-------------+---------+----------+--------------------+---------------+------+------------+-------------+--------------+----------+---------------+---------------+----------+---------+
|  610004644264|       0|       1|       20000.0|      BT|     xDSL|     1|           0|          3P|            0| TVW620.I|      UBEE|B40

Les classses sont très déséquilibres, le taux d'erreur n'est plus pertinent pour mésurer la qualité du modèle.

En effet il y a 98% et 2%. Une des solutions serait de rééchantillonner les données. 

C'est pourquoi df revient à selectionner les rdn_samp = 1

In [6]:
hctx.sql("SELECT FLG_RSIL, COUNT(*) FROM upec_2022.iad_stb_model GROUP BY FLG_RSIL").show() 

+--------+--------+
|FLG_RSIL|count(1)|
+--------+--------+
|       0|  601449|
|       1|    8495|
+--------+--------+



La table qui nous interesse est alors un tirage aléatoire de 100 % parmi les postitfs et 10 % parmi les negatifs

In [7]:
hctx.sql("SELECT FLG_RSIL, COUNT(*)    \
          FROM upec_2022.iad_stb_model \
          WHERE rdn_samp =1            \
         GROUP BY FLG_RSIL").show() 

+--------+--------+
|FLG_RSIL|count(1)|
+--------+--------+
|       0|   60230|
|       1|    8495|
+--------+--------+



show() est une méthode qui s'applique au df, le hive context converti toute les tables en dataframe

In [8]:
type(hctx.sql("SELECT FLG_RSIL, COUNT(*)    \
          FROM upec_2022.iad_stb_model \
          WHERE rdn_samp =1            \
         GROUP BY FLG_RSIL"))

pyspark.sql.dataframe.DataFrame

In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

# Controle du nombre de partitions

RDD (Resilient Distributed Dataset) est un objet natif de spark.

C' est une structure de données en mémoire utilisée par Spark. très rapide car plus besoin d'aller lire les données sur hdfs 

Une fois que le context spark  s'arrette ', il n'y a plus de RDD

 On peut changer le partitonning du dataset pour s'adapter à la puissance qu'on a, je n'ai que quatre cpus logiques

In [10]:
df.rdd.getNumPartitions()

1

In [11]:
rdd = df.rdd.repartition(4)
type(rdd)

pyspark.rdd.RDD

Les méthodes take() , collect() , first renvoient des objets python (list, row

In [12]:
type(rdd.take(2)), type(rdd.collect())

(list, list)

Le **lazy execution** dans Spark signifie que l'exécution ne démarrera pas tant qu'une action ne sera pas déclenchée.  Par exemple, df.rdd va d'abord revenir en arrière sur le code qui permet d'avoir df, une fois exécutée, alors on le transform en rdd. Donc cela est très couteux en temps de calcul.

**Cache()**  et **persist()** sont des techniques d'optimisation pour les applications Spark  afin d'améliorer les performances des jos  des applications.Ils sont sont utilisés pour enregistrer les Spark RDD, Dataframe et Dataset.

Mais, la différence est que la méthode RDD cache() l'enregistre par défaut dans la mémoire (MEMORY_ONLY) tandis que la méthode persist() est utilisée pour le stocker au niveau de stockage défini par l'utilisateur.

Lorsque l'objet à persister est un rdd alors cache () et persist() deviennent pareils. Naturellement rdd.unpersist() existe

In [13]:
rdd = rdd.persist()

In [14]:
rdd.first()

Row(idnt_comp_serv=610001492507, flg_rsil=0, anc_actv=40, val_debit_down=2432.0, libl_res=u'FT', libl_tech=u'xDSL', nb_iad=1, iad_anc_info=3, iad_userkey2=u'2P', fdate_iad_anc=6, iad_model=u'TG787', iad_vendor=u'Thomson', iad_softwarever=u'B11001_MAIN_8.6.62', rk_asc_iad_conf=5, nb_stv=None, stb_anc_info=None, fdate_stb_anc=None, stb_model=u'?', stb_vendor=u'?', stb_softwarever=u'?', rk_asc_stb_conf=None, noinadinfo=0, nostbinfo=0)

# Gestion de doublons

In [15]:
#CONTROLE ET GESTION DES DOUBLONS
NonUnique = rdd.map(lambda x:  (int(x.idnt_comp_serv),1))\
                .reduceByKey(lambda x,y : x+y)\
                .filter(lambda x : x[1]>1)\
                .keys().collect()

In [16]:
print(NonUnique)

[610001457412, 610000865672, 610004533736, 610002128460, 610001686448, 610000610732, 610002220436, 610001089492, 610001540072, 610002334912, 610001724620, 610001335300, 610002282224, 610004500700, 610003446461, 610003257545, 610001229529, 610001620153, 610002024609, 610003910817, 610004246593, 610001962533, 610000251325, 610003061373, 610002387461, 610000844977, 610001937373, 610002897873, 610004193941, 610001126017, 610002031806, 610000765018, 610001727882, 610001640446, 610000969474, 610002995654, 610003238642, 610002436134, 610003952894, 610002659554, 610004589183, 610003261295, 610002738567, 610002755339, 610000345051, 610003373007, 610000481591, 610001098163, 610004285055, 610003338135, 610002428611, 610001391075, 610004165555, 610002019347, 610004360383]


In [17]:
rdd = rdd.filter(lambda x : int (x.idnt_comp_serv)  not in NonUnique)
rdd.persist()

PythonRDD[42] at RDD at PythonRDD.scala:48

# Data processing

Un modèle comme le Random Forest (basés sur les arbres) est robuste aux valeurs manquantes !
Imputer ici risque de créer des données artificielles et eleminer certaines rélations 
Une solution est de considerer la valeur manquante comme une modalité.

## Cas des variables quantitatives et valeurs manquantes

Ici assongons -1 lorsqu'il s'agit de NaN

In [18]:
# Fonctions
def MissingRecode(arr) :
    return [-1 if v is None else v for v in arr]

In [19]:
# Récuperations des Champs utiles
train = rdd.map(lambda x: (x.idnt_comp_serv, (x.idnt_comp_serv, x.flg_rsil, x.anc_actv, x.val_debit_down, x.iad_anc_info, 
                                              x.nb_iad, x.fdate_iad_anc, x.rk_asc_iad_conf,x.stb_anc_info, x.nb_stv,
                                              x.fdate_stb_anc,  x.rk_asc_stb_conf, x.noinadinfo ,x.nostbinfo)
                          ))

In [20]:
train = train.map(lambda x : (x[0],MissingRecode(x[1])))
train =train.persist()

In [21]:
# nombre totales de features
NQuantvar = train.map(lambda x : len(x[1])).first()
print NQuantvar-2

12


## Traitement des variables catégorielles

In [22]:
def JoinListAppender(aList,aValue):
    aList.append(aValue)
    return tuple(aList)

'''
A.join(B) => (KEY, (VA,VB))
C.join(D) => (KEY, ((VA,VB),VC))
D.join(E) => (KEY, (((VA,VB),VC),VD))
'''

'\nA.join(B) => (KEY, (VA,VB))\nC.join(D) => (KEY, ((VA,VB),VC))\nD.join(E) => (KEY, (((VA,VB),VC),VD))\n'

On rappelle que df est un dataframe

In [23]:
df.dtypes

[('idnt_comp_serv', 'bigint'),
 ('flg_rsil', 'int'),
 ('anc_actv', 'int'),
 ('val_debit_down', 'float'),
 ('libl_res', 'string'),
 ('libl_tech', 'string'),
 ('nb_iad', 'int'),
 ('iad_anc_info', 'int'),
 ('iad_userkey2', 'string'),
 ('fdate_iad_anc', 'int'),
 ('iad_model', 'string'),
 ('iad_vendor', 'string'),
 ('iad_softwarever', 'string'),
 ('rk_asc_iad_conf', 'int'),
 ('nb_stv', 'int'),
 ('stb_anc_info', 'int'),
 ('fdate_stb_anc', 'int'),
 ('stb_model', 'string'),
 ('stb_vendor', 'string'),
 ('stb_softwarever', 'string'),
 ('rk_asc_stb_conf', 'int'),
 ('noinadinfo', 'int'),
 ('nostbinfo', 'int')]

In [24]:
df.columns.index('noinadinfo')

21

In [25]:
#Encodage des variables catégorielles (de texte)  en entier (integer)
for i in df.dtypes:
    if i[1]=='string':
        # Recuperer l'index de la colonne
        j = int(df.columns.index(i[0]))
        #  Recuperer la list des valeurs uniques de chaque variable categorielle
        valList = rdd.map(lambda x : (x[j],1)).groupByKey().sortByKey().map(lambda x :x[0]).collect()
        # Retourner la clée et la colonne (nouvelle )
        recode = rdd.map(lambda x : (x.idnt_comp_serv,valList.index(x[j])))
        #  jointure à la table rdd de la colum
        train = train.join(recode).map(lambda x: (x[0],JoinListAppender(list((x[1][0])),x[1][1])))


In [26]:
rdd.unpersist()
train=train.repartition(4).persist()
train.getNumPartitions()

4

# Analyse prédictive avec RF

In [27]:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint

## Préparation de la table pour MLlib


### LabeledPoint

On parle de vecteur parse lorsque vous avez beaucoup de valeurs dans le vecteur comme zéro Alors qu'on parle de vcteur dense lorsque la plupart des valeurs du vecteur sont non nulles 

**vecteur dense**

vd= [0,1,0,3,4,0,5,0,0,0,0,0,0,0,0,0,0,0,0,12]


**vecteur sparse** --> Optimisation de memoire'''

vs = [{1:1},{3:3},{6:5},{19:12}]            # couple indice valeurs, les indices non présentes ont par défintions des  valeurs
nulles

In [28]:
Nvar=train.map(lambda x: len(x[1])).first()
print Nvar

23


Pour entrainer  un nombre , nous devrions instancier une Classe labeledpoint qui représente pour chaque individu le labels (indice =0) et  features indice 1 , dont le contenu est  une liste).

Afin d'identifier les individus la notion de clé valeurs (ici le labeledpoint) reste d'actualité

In [29]:
lp =train.map(lambda x : (x[0],LabeledPoint(float(x[1][1]),DenseVector([x[1][i] for i in range(2,Nvar)])))) 
lp=lp.persist()
lp.take(3)

[(610004152320,
  LabeledPoint(1.0, [5.0,6144.0,4.0,1.0,4.0,1.0,-1.0,-1.0,-1.0,-1.0,0.0,0.0,5.0,3.0,1.0,2.0,2.0,80.0,0.0,0.0,47.0])),
 (610000699400,
  LabeledPoint(0.0, [53.0,20000.0,5.0,1.0,5.0,4.0,0.0,1.0,0.0,3.0,0.0,0.0,2.0,3.0,2.0,9.0,4.0,42.0,8.0,3.0,39.0])),
 (610001369800,
  LabeledPoint(0.0, [42.0,20000.0,8.0,2.0,8.0,4.0,1.0,2.0,1.0,7.0,0.0,0.0,5.0,3.0,2.0,2.0,2.0,80.0,7.0,3.0,74.0]))]

In [30]:
lp.map(lambda x : (x[1].features)).first()

DenseVector([5.0, 6144.0, 4.0, 1.0, 4.0, 1.0, -1.0, -1.0, -1.0, -1.0, 0.0, 0.0, 5.0, 3.0, 1.0, 2.0, 2.0, 80.0, 0.0, 0.0, 47.0])

In [31]:
lp.map(lambda x : (x[1].label)).first()

1.0

### Déclarations de features catégorielles

Nous dévrions informer à Spark les features qui sont catégorielles afin qu'il puisse les traiter differemment.

Pour cela , il faut créer un dictionnaire python qui retournera les features et leurs nombres de modalités

In [32]:
catFeatInfo = {} # on a créer un dictionnaire pour que spark comprenne quelle sont les var text et numérique
for i in range(NQuantvar-1,Nvar-2):
    catBins =lp.map(lambda x: x[1].features[i]).distinct().count()
    catFeatInfo.update(dict({i:catBins}))
print catFeatInfo

{16: 6, 17: 95, 18: 9, 19: 5, 20: 77, 13: 4, 14: 4, 15: 12}


### Creation d'un échantillon train et test


In [33]:
(lpTrain,lpTest)=lp.randomSplit([0.8,0.2])
print("N apprentissage %d N -Test %d" %(lpTrain.count(),lpTest.count()))

N apprentissage 55026 N -Test 13589


## Training

In [34]:
from pyspark.mllib.tree import RandomForest

In [35]:
lpTrain.persist()
RNFModel = RandomForest.trainRegressor(lpTrain.map(lambda x: x[1]),
                                      categoricalFeaturesInfo=catFeatInfo,
                                      numTrees=10, featureSubsetStrategy="sqrt",
                                      impurity='variance',maxDepth=4,maxBins=95,
                                      seed=2022)

## Sauvegarde du modèle sur hdfs et importations

In [38]:
#exportation
RNFModel.save(sc,"/user/hadoop/MODELS/RNF_models_2022")

In [39]:
#Importation
from pyspark.mllib.tree import *
Model = RandomForestModel.load(sc, "/user/hadoop/MODELS/RNF_models_2022")
print Model

TreeEnsembleModel regressor with 10 trees



In [40]:
df.groupBy("flg_rsil").count().show()

+--------+-----+
|flg_rsil|count|
+--------+-----+
|       0|60230|
|       1| 8495|
+--------+-----+



In [41]:
#Apply model to test
Preds = RNFModel.predict(lpTest.map(lambda x: x[1].features))
Preds.stats()
#Taux d'évènement à 12% donc c'est bien ça rpz notre % de churner dans notre table

(count: 13589, mean: 0.125067091281, stdev: 0.063807783476, max: 0.547158612926, min: 0.0792222281688)

In [42]:
Preds.stats()

(count: 13589, mean: 0.125067091281, stdev: 0.063807783476, max: 0.547158612926, min: 0.0792222281688)

In [43]:
Preds.take(2)

[0.15557054325603212, 0.10340656299462155]

## Courbe Lift

Le lift a un rôle de ciblage : solliciter les clients les plus réceptifs

• optimiser un budget limité

• ne pas agacer les clients « hostiles »

Les étapes :


1. Appliquer la fonction score sur le reste de la base
2. Trier la base selon le score
3. Cibler en priorité les clients à fort score(les plus appétents en premier. Les moins appétents en derniers)
4. Prévoir les performances à partir de la courbe LIFT

Rattachons les individus à leur prédictions

- x[0][0] = id_client
- x[0][1] =  label
- x[1] = prédictions

In [44]:
kPreds=lpTest.map(lambda x : (x[0], x[1].label)).zip(Preds).zip(Preds).map(lambda x : (x[0][0],x[0][1], x[1]))
kPreds.take(3)

[((610003865040, 0.0), 0.15557054325603212, 0.15557054325603212),
 ((610004355800, 0.0), 0.10340656299462155, 0.10340656299462155),
 ((610000699680, 0.0), 0.11092487213073658, 0.11092487213073658)]

In [45]:
lpTest.map(lambda x : (x[0], x[1].label)).zip(Preds).first()

((610003865040, 0.0), 0.15557054325603212)

In [46]:
Nrows = kPreds.count()
Tres = kPreds.map(lambda x : x[1]).reduce(lambda x,y :x+y)
print(Nrows, Tres)

(13589, 1699.5367034235016)


In [47]:
# ordonnées les probabilités : ((0.54436346, 0.54436346054), 0)
# Fréquence d'individus, (1, probabilité)
# Par fréquence de 1%: compter le nombre d'individus, somme des probabilité
# Par fréquence de 1%: compter le nombre d'individus, somme des probabilité , le ratio somme des proba/ somme totale des proba

In [48]:
res = kPreds.map(lambda x: (x[2], x[1])).sortByKey(ascending=False).zipWithIndex()\
               .map(lambda x : (x[1]*100/Nrows, (1, x[0][1]))).reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1]))\
               .map(lambda x : (x[0], x[1][0], x[1][1], x[1][1]*1.00/Tres))

In [50]:
res.takeOrdered(100, lambda x : x[0])

[(0, 136, 66.94210795021455, 0.0393884449893716),
 (1, 136, 63.25660181938903, 0.037219909221122796),
 (2, 136, 42.98631397385356, 0.02529296006803682),
 (3, 136, 32.575858563422855, 0.01916749341029406),
 (4, 136, 30.8326218431235, 0.01814178051054448),
 (5, 136, 29.854704098859372, 0.017566377965666083),
 (6, 136, 28.179434267943204, 0.016580656487841246),
 (7, 136, 26.365524067084095, 0.015513359619697582),
 (8, 136, 25.929821847968384, 0.01525699433012305),
 (9, 135, 25.625829302709167, 0.015078126439452102),
 (10, 136, 24.945528141423615, 0.014677840196786574),
 (11, 136, 24.736034691118437, 0.01455457516233149),
 (12, 136, 23.80569775570839, 0.014007168958313653),
 (13, 136, 23.238738748857973, 0.013673572746058661),
 (14, 136, 22.96619280690055, 0.013513207899916526),
 (15, 136, 21.38477451501969, 0.012582708259223098),
 (16, 136, 20.9859138711442, 0.012348020392187312),
 (17, 136, 20.378066340385047, 0.011990365550409127),
 (18, 135, 19.30877672252256, 0.011361200192751043),
 (

In [51]:
cutOff = 10
Lift = res.filter(lambda x :x[0]<cutOff).map(lambda x: x[3]).sum()/(cutOff/100.00)
print Lift

2.19206103042


# OPTIMISATION DES PARAMETRE DU MODELE

In [52]:
# Optimisation des paramètres du modele
import time




#lpTrain.persist()
cutOff = 10

for i in (10,20,50):
    for j in (6, 8, 10, 16):
        start_time = time.time()
        RNFModel = RandomForest.trainRegressor(lpTrain.map(lambda x: x[1]),
                                               categoricalFeaturesInfo= catFeatInfo,
                                               numTrees=i,
                                               featureSubsetStrategy="sqrt",
                                               impurity='variance',
                                               maxDepth=j,
                                               maxBins=95,
                                               seed=12345)
        Preds = RNFModel.predict(lpTest.map(lambda x: x[1].features))
        kPreds = lpTest.map(lambda x: (x[0], x[1].label)).zip(Preds).map(lambda x : (x[0][0], x[0][1], x[1]))
        res = kPreds.map(lambda x: (x[2], x[1])).sortByKey(ascending=False).zipWithIndex()\
               .map(lambda x : (x[1]*100/Nrows,(1, x[0][1]))).reduceByKey(lambda x,y : (x[0]+ y[0], x[1]+y[1]))\
               .map(lambda x : (x[0], x[1][0], x[1][1], x[1][1]*1.00/Tres))
        Lift = res.filter(lambda x :x[0] < cutOff).map(lambda x: x[3]).sum()/(cutOff/100.00)
        elapsed = time.time() - start_time
        print ("numTrees: %d - Depth : %d - Lift : %f - (%f s.)" %(i,j,Lift,elapsed))

numTrees: 10 - Depth : 6 - Lift : 2.659548 - (3.850312 s.)
numTrees: 10 - Depth : 8 - Lift : 2.759576 - (4.911552 s.)
numTrees: 10 - Depth : 10 - Lift : 2.777227 - (6.189529 s.)
numTrees: 10 - Depth : 16 - Lift : 2.788995 - (11.455336 s.)
numTrees: 20 - Depth : 6 - Lift : 2.594825 - (5.191044 s.)
numTrees: 20 - Depth : 8 - Lift : 2.741924 - (6.299267 s.)
numTrees: 20 - Depth : 10 - Lift : 2.853719 - (11.574427 s.)
numTrees: 20 - Depth : 16 - Lift : 2.806647 - (29.025064 s.)
numTrees: 50 - Depth : 6 - Lift : 2.665432 - (10.191149 s.)
numTrees: 50 - Depth : 8 - Lift : 2.724272 - (15.906474 s.)
numTrees: 50 - Depth : 10 - Lift : 2.894907 - (24.902319 s.)
numTrees: 50 - Depth : 16 - Lift : 2.877255 - (70.924941 s.)
