In [1]:
from abc import ABC, abstractmethod

#classe abstraite
class Extract(ABC):
    
    @abstractmethod
    def extract(self):
        raise NotImplementedError("Subclass must implement abstract method") 
        
        
#classe qui herite de la classe Extract et qui permet d'extraire des données de SGBD Oracle        
class ExtractFromOracle(Extract):
   
    def __init__(self,sqlContext,url,query_or_table,user,password):
        
        self.url=url
        self.query_or_table=query_or_table
        self.user=user
        self.password=password
        self.sqlContext=sqlContext
    
        
    def extract(self):
        result =self.sqlContext.read.format("jdbc").option("url",self.url).option("dbtable",self.query_or_table).option("user",self.user).option("password",self.password).load()
        return result
    
#classe qui herite de la classe Extract et qui permet d'extraire des données d'un fichier CSV    
class ExtractFromFile(Extract):
    def __init__(self,s,sqlContext):
        self.sqlContext=sqlContext
        self.schema=s
        
       
    def extract(self):
        result=self.sqlContext.read.load(self.schema,
        format="csv", sep=",", inferSchema="true", header="true")
        return result
    
#classe qui herite de la classe Extract et qui permet d'extraire des données d'un fichier Json   
class ExtractFromJson(Extract):
    def __init__(self,lien):
        self.lien=lien
       
    def extract(self):
        result=self.spark.read.load(self.lien, format="json")
        return result


In [2]:
#classe qui cotient quelques méthodes pour nettoyer les données  
class Transform:
  
    
    def __init__(self,sqlContext,sparkContext):
        self.sqlContext=sqlContext
        self.sparkContext=sparkContext
    # méthode permettant de faire le cast des colonnes sélectionnées 
    def CastColumns (self,dataFrame,listColumns):
        for c in listColumns:
            dataFrame=dataFrame.withColumn(c[0],dataFrame[c[0]].cast(c[1]))
        return dataFrame
   
    # méthode permettant de remplacer les valeurs nulles par une constante  
    def RemplacerAllNan(self,df,d=0):
        df=df.fillna(d)
        return df
    # méthode permettant de remplacer les valeurs nulles pour chaque colonne par des autres valeurs
    # attribut dictionnaire: contient les couples (colonne, nouvellesValeur)
    def RemplacerAllNanDict(self,df,dictionnaire):
        df=df.fillna(dictionnaire)
        return df

# classe permettant d'avoir une table CDR
class TransformCDR(Transform):
    
    
    def setTableCDR(self,dataFrame,date):
        cdr= self.sqlContext.createDataFrame(self.sparkContext.emptyRDD(),dataFrame.schema)
        df1=dataFrame.select(dataFrame.FROM_SUBSCRIBER_ID,dataFrame.TO_SUBSCRIBER_ID).where(dataFrame.CALL_DATE==date)
        df2=dataFrame.select(dataFrame.TO_SUBSCRIBER_ID,dataFrame.FROM_SUBSCRIBER_ID).where(dataFrame.CALL_DATE==date)
        df3=df1.intersect(df2)
        df3.registerTempTable("cdrSelected")
        dataFrame.where(dataFrame.CALL_DATE==date).registerTempTable("cdr")
        df_cdr=self.sqlContext.sql("select CALL_DATE,cdr.FROM_SUBSCRIBER_ID,cdr.TO_SUBSCRIBER_ID,CALLS,SMS,DURATION,CALLING_DAYS from cdr,cdrSelected where cdr.FROM_SUBSCRIBER_ID=cdrSelected.FROM_SUBSCRIBER_ID and cdr.TO_SUBSCRIBER_ID=cdrSelected.TO_SUBSCRIBER_ID")
        return df_cdr           
# classe permettant d'avoir une table qui carecterise le comportement de chaque client        
class TransformComportement(Transform):
    def setTableComportement(self,df1,df2,df3,df4,df5):
        
        d1=df1.select('CODE_CONTRAT','MONTH_DT','NB_APPEL','DUREE_APPEL','NB_APPEL_TT_GSM','DUREE_APPEL_TT_GSM','DUREE_APPEL_TT_FIXE','NB_APPEL_TT_FIXE',)
        d1 =d1.selectExpr("CODE_CONTRAT as CODE_CONTRAT", "NB_APPEL as NB_APPEL_out","DUREE_APPEL as DUREE_APPEL_out","NB_APPEL_TT_GSM as NB_APPEL_TT_GSM_out","DUREE_APPEL_TT_GSM as DUREE_APPEL_TT_GSM_out","DUREE_APPEL_TT_FIXE as DUREE_APPEL_TT_FIXE_out","NB_APPEL_TT_FIXE as NB_APPEL_TT_FIXE_out")
        d2=df2.select('CODE_CONTRAT','NB_APPEL','DUREE_APPEL','NB_APPEL_TT_GSM','DUREE_APPEL_TT_GSM','DUREE_APPEL_TT_FIXE','NB_APPEL_TT_FIXE',)
        d2=d2.selectExpr("CODE_CONTRAT as CODE_CONTRAT", "NB_APPEL as NB_APPEL_in","DUREE_APPEL as DUREE_APPEL_in","NB_APPEL_TT_GSM as NB_APPEL_TT_GSM_in","DUREE_APPEL_TT_GSM as DUREE_APPEL_TT_GSM_in","DUREE_APPEL_TT_FIXE as DUREE_APPEL_TT_FIXE_in","NB_APPEL_TT_FIXE as NB_APPEL_TT_FIXE_in")
        d3=df3.select('CODE_CONTRAT','ID_OFFRE','FLAG_3G','FLAG_4G','NB_CHANGEMENT_OFFRE','LAST_DATE_CHANGEMENT_OFFRE')
        d4=df4.select('CODE_CONTRAT','NB_JR_ACTIVITE_DATA','VOLUME_SESSION')
        d5=df5.select('CODE_CONTRAT','LAST_EVENT_DATE','DERNIERE_DATE_VOIX_SORTANT','DERNIERE_DATE_SMS_SORTANT','DERNIERE_DATE_DATA')
        
        result = d1.join(d2, on="CODE_CONTRAT").join(d3, on="CODE_CONTRAT").join(d4, on="CODE_CONTRAT").join(d5, on="CODE_CONTRAT")
        return result
    
            
        
        
            
        
            
            
        
    

In [7]:
# classe permettant d'enregistrer les données dans l'SGBD Oracle
class Load():
    
    def loadDataFrame(self,dataFrame,tableName,user,password,mode="Append"):
        dataFrame.write.mode(mode).format("jdbc")\
         .option("url","jdbc:oracle:thin:@localhost:1522:xe")\
         .option("dbtable", tableName)\
         .option("user", user)\
         .option("password", password)\
         .option("truncate", "true")\
         .save()

In [6]:
# classe permettant de faire l'ETL
class ETL():
    def __init__(self,sqlContext,sparkContext,date):
        self.sqlContext=sqlContext
        self.sparkContext=sparkContext
        self.DataFrames=[]
        self.date=date
        query='(select "CALL_DATE","FROM_SUBSCRIBER_ID","TO_SUBSCRIBER_ID","CALLS","SMS","DURATION","CALLING_DAYS"\
               from dw_cla_monthly_trafic_msc where  "DURATION" >5 and "CALL_DATE"='+str(self.date)+')'
        e=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe",query,"telecom","97908631")
        #e1=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","USAGE_MONTHLY_SORTANT_B","telecom","97908631")
        #e2=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","USAGE_MONTHLY_ENTRANT_B","telecom","97908631")
        #e3=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","CONTRACT_D","telecom","97908631")
        #e4=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","USAGE_MOUNTHLY_DATA_B","telecom","97908631")
        #e5=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","PARC_RGS_D","telecom","97908631")
        self.Extracters.append(e)
        self.TransformCDR=TransformCDR(self.sqlContext,self.sparkContext)
        #self.TransformComportement=TransformComportement(self.sqlContext,self.sparkContext)
        self.loder=Load()
    
    # job etl qui va ettre executé chaque mois 
    def job(self):
        
        

        for e in self.Extracters:
            
            self.DataFrames.append(e.extract())
    
    
        for i in range(len(self.DataFrames)):
            self.DataFrames[i]=self.DataFrames[i].repartition(10)

        for i in range(len(self.DataFrames)):
            self.DataFrames[i]=self.DataFrames[i].cache()
            
        listColumns=[('FROM_SUBSCRIBER_ID','int'),('TO_SUBSCRIBER_ID','int'),('SMS','int')]
        self.DataFrames[0]=self.TransformCDR.CastColumns(self.DataFrames[0],listColumns)
        df1=self.TransformCDR.setTableCDR(self.DataFrames[0],self.date)
        print(df1.count())
        #df2=self.TransformComportement.setTableComportement(self.DataFrames[1],self.DataFrames[2],self.DataFrames[3],self.DataFrames[4],self.DataFrames[5])
        #print(df2.count())
        
        self.loder.loadDataFrame(df1,"cdr","telecom","97908631")
       
        #self.loder.loadDataFrame(df2,"comportement","telecom","97908631")
     

In [5]:
def main():
    from pyspark import SparkContext, SparkConf
    from pyspark import SQLContext
    
    spark_config = SparkConf().setMaster("local").setAppName('etl').set("spark.ui.port", "4050")\
    .set("spark.sql.crossJoin.enabled", "true")\
    .set("spark.sql.shuffle.partitions","1")
    sc = SparkContext(conf=spark_config) 
    sqlContext = SQLContext(sc) 
    
    
    etl=ETL(sqlContext,sc)
    etl.job()
    print("end")
    
    

In [6]:
main()

+-----------------+------------------+----------------+-----+---+--------+------------+
|        CALL_DATE|FROM_SUBSCRIBER_ID|TO_SUBSCRIBER_ID|CALLS|SMS|DURATION|CALLING_DAYS|
+-----------------+------------------+----------------+-----+---+--------+------------+
|30Jun2019 0:00:00|                 1|               3|    1|  2|      22|           1|
|30Jun2019 0:00:00|                 2|               7|   10|  4|     915|           3|
|30Jun2019 0:00:00|                 3|               1|    1|  0|      72|           1|
|30Jun2019 0:00:00|                 4|               6|    1|  0|     169|           1|
|30Jun2019 0:00:00|                 6|               4|    1|  0|     136|           1|
|30Jun2019 0:00:00|                 7|               2|    7|  2|     260|           2|
+-----------------+------------------+----------------+-----+---+--------+------------+

None
+------------+------------+---------------+-------------------+----------------------+-----------------------+----