In [1]:
"""
Author - Yogesh Agrawal
Email -  Yugagrawal@gmail.com
"""

In [2]:
dbutils.library.installPyPI("mlflow")

In [3]:
"""
Loading important package of spark 
"""
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline,PipelineModel
from pyspark.sql.functions import *
from pyspark.ml.pipeline import Transformer,Estimator
from pyspark.ml.feature import StringIndexer,VectorAssembler,QuantileDiscretizer
from pyspark.ml.classification import LogisticRegression
from  pyspark.ml.param.shared import *
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
import mlflow
from mlflow import spark
from pyspark import keyword_only
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [4]:
"""
Spark session creater 
"""

st = SparkSession \
        .builder \
        .appName('Titanic') \
        .getOrCreate()

In [5]:
"""
Load data function for loading data..
@param - 
        path - path of file
        header_value - header value, incase true first row will be header
        
@return - dataframe of loaded intended data.
"""

def load_data(path,header_value):
  df = st.read.csv(path,inferSchema=True,header=header_value)
  return df

In [6]:
df = load_data('/FileStore/tables/titanic_train.csv',True)
df_test = load_data('/FileStore/tables/titanic_test.csv',True)

In [7]:
df_test.show(5)

In [8]:
"""
check null value each column wise.
@param - 
          df - a dataframe 
"""
def check_column_null(df):
  df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

In [9]:
check_column_null(df)
check_column_null(df_test)

In [10]:
'''
Class A for sharing varible between Estimator and Transformer class.

@param - 
       Params - A object that need to be shared between two class. ( primarily here Estimator logic object )

@return -
       Param - A object to the Transformer class.

'''

# class A(Params):  
  
#   center_param = Param(Params._dummy(),"center_param","center_param")
  
# #   def __init__(self):
# #     super(A,self).__init__()
     
#   def setCenterObject(self,value):
#     return self._set(center_param = value)
  
#   def getCenterObject(self):
#     return self.getOrDefault(self.center_param)

In [11]:
'''
Custom Estimator class for logic implementation .

@param - 
       Estimator - Estimator class refrence 
       df - dataframe in which operation need to be carried ( passed through fit function)

@return -
       Model - a Transformer model for transforming , estimator implemenatation. 

'''

# class My_preprocessing_Estimator(Estimator,DefaultParamsReadable, DefaultParamsWritable):
      
#     def _fit(self,df):
#       print("********************************  in fit method ...************************************")
      
#       self.df = df
#       self.df = self.df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
#       self.df = self.df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
#                      ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

#       self.Age_mean = self.df.groupBy("Initial").avg('Age')
#       self.Age_mean = self.Age_mean.withColumnRenamed('avg(Age)','mean_age')
#       self.Initials_list = self.Age_mean.select("Initial").rdd.flatMap(lambda x: x).collect()
#       self.Mean_list = self.Age_mean.select("mean_age").rdd.flatMap(lambda x: x).collect()
#       return preprocess_transform().setCenterObject(self)

In [12]:
'''
Custom Transformer class for tranformation implementation .

@param - 
       Transformer - Transformer class refrence 
       df - dataframe in which operation need to be carried ( passed through tranform function)
       A - A class for variable sharing.

@return -
       df - a dataframe which contains prediction value as well with featured value. 

'''

class preprocess_transform(Transformer,HasOutputCols,DefaultParamsReadable, DefaultParamsWritable,):
  
    value = Param(Params._dummy(),"value","value to fill")
  
    @keyword_only
    def __init__(self, outputCols=None):
        super(preprocess_transform, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self, outputCols=None):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this SetValueTransformer.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)
      
    def setValue(self, value):
        """
        Sets the value of :py:attr:`value`.
        """
        return self._set(value=value)

    def getValue(self):
        """
        Gets the value of :py:attr:`value` or its default value.
        """
        return self.getOrDefault(self.value)
  
    def _transform(self,df):
      print("********************************  in Transform method ...************************************")
#       self = self.getCenterObject()
      
      
      """
      Generate feature column in dataframe based on specific logic

      @param - 
               df - dataframe for operation.

      @return - 
               df - dataframe with generated feature.
      """
      
#       def feature_generation(self,df):
#         print(self.df.show(2))
#         self.df = self.df.withColumn("Family_Size",col('SibSp')+col('Parch'))
#         self.df = self.df.withColumn('Alone',lit(0))
#         self.df = self.df.withColumn("Alone",when(self.df["Family_Size"] ==0, 1).otherwise(self.df["Alone"]))
#         return self.df
      
      
      def feature_generation(self,df):
        df = df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
        df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
                        ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])
        df = df.withColumn("Family_Size",col('SibSp')+col('Parch'))
        df = df.withColumn('Alone',lit(0))
        df = df.withColumn("Alone",when(df["Family_Size"] ==0, 1).otherwise(df["Alone"]))
        return df


      """
      Impute Age based on Age mean of specific gender. ex for male mean is 46 update all null male row with 46, similarly for others

      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
  
      def Age_impute(self,df):
        Age_mean = df.groupBy("Initial").avg('Age')
        Age_mean = Age_mean.withColumnRenamed('avg(Age)','mean_age')
        Initials_list = Age_mean.select("Initial").rdd.flatMap(lambda x: x).collect()
        Mean_list = Age_mean.select("mean_age").rdd.flatMap(lambda x: x).collect()
        for i,j in zip(Initials_list,Mean_list):
            df = df.withColumn("Age",when((df["Initial"] == i) & (df["Age"].isNull()), j).otherwise(df["Age"]))

        return df
        
        
      """
      Impute Embark based on mode of embark column
      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
      def Embark_impute(self,df):
        mode_value = df.groupBy('Embarked').count().sort(col('count').desc()).collect()[0][0]
        df = df.fillna({'Embarked':mode_value})
        return df
      
      
      """
      Impute Fare based on the class which he/she had sat ex: class 3rd has mean fare 9 and null fare belong to 3rd class so fill 9
      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
      def Fare_impute(self,df):
        Select_pclass = df.filter(col('Fare').isNull()).select('Pclass')
        if Select_pclass.count() > 0:
          Pclass = Select_pclass.rdd.flatMap(lambda x: x).collect()
          for i in Pclass:
            mean_pclass_fare = df.groupBy('Pclass').mean().select('Pclass','avg(Fare)').filter(col('Pclass')== i).collect()[0][1]
            df = df.withColumn("Fare",when((col('Fare').isNull()) & (col('Pclass') == i),mean_pclass_fare).otherwise(col('Fare')))
        return df
      
      
      '''
      combining all column imputation together..

      @param - 
            df - a dataframe for operation.

      @return - 
            df - dataframe with imputed value.

      '''
      def all_impute_together(df):
        df = Age_impute(self,df)
        df = Embark_impute(self,df)
        df = Fare_impute(self,df)
        return df
      
      
      '''
      converting string to numeric values.

      @param - 
               df - dataframe contained all columns.
               col_list - list of column need to be 

      @return - 
              df - transformed dataframe.
      '''
      def stringToNumeric_conv(df,col_list):
        indexer = [StringIndexer(inputCol=column,outputCol=column+"_index").fit(df) for column in col_list]
        string_change_pipeline = Pipeline(stages=indexer)
        df = string_change_pipeline.fit(df).transform(df)
        return df

      
      """
      Drop column from dataframe
      @param -
             df - dataframe 
             col_name - name of column which need to be dropped.
      @return -
             df - a dataframe except dropped column
      """
      def drop_column(df,col_list):
        for i in col_list:
            df = df.drop(col(i))
        return df
      
      
      col_list = ["Sex","Embarked","Initial"]
      dataset = feature_generation(self,df)
      df_impute = all_impute_together(dataset)
      df_numeric = stringToNumeric_conv(df_impute,col_list)
      df_final = drop_column(df_numeric,['Cabin','Name','Ticket','Family_Size','SibSp','Parch','Sex','Embarked','Initial'])
      return df_final

In [13]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

# initialization for pipeline setup
# if __name__ == "__main__":

my_model = preprocess_transform()
df = my_model.transform(df)
feature = VectorAssembler(inputCols=['Pclass','Age','Fare','Alone','Sex_index','Embarked_index','Initial_index'],outputCol="features")
#   lr = LogisticRegression(labelCol='Survived',featuresCol='features')
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10)
#   gb = GBTClassifier(labelCol="Survived", featuresCol="features", maxIter=10)

'''
pipeline stages initilization , fit and transform.
'''
pipeline = Pipeline(stages=[feature,rf])
#   model = pipeline.fit(df)

paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [100,300]).build()

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)
prediction = cvModel.transform(df)


# mlflow.spark.log_model(model, "spark-model16")
#   mlflow.spark.save_model(model, "spark-model_test")
#prediction = model.transform(df_test)

In [14]:
cvModel.bestModel.write().overwrite().save('dbfs:/FileStore/tables/Titanic_model')

In [15]:
am = PipelineModel.load('dbfs:/FileStore/tables/Titanic_model')
df_test = preprocess_transform().transform(df_test)
display(am.transform(df_test))

PassengerId,Pclass,Age,Fare,Alone,Sex_index,Embarked_index,Initial_index,features,rawPrediction,probability,prediction
892,3,34.5,7.8292,1,0.0,2.0,0.0,"List(1, 7, List(), List(3.0, 34.5, 7.8292, 1.0, 0.0, 2.0, 0.0))","List(1, 2, List(), List(268.0781763960894, 31.92182360391057))","List(1, 2, List(), List(0.893593921320298, 0.10640607867970191))",0.0
893,3,47.0,7.0,0,1.0,0.0,2.0,"List(1, 7, List(), List(3.0, 47.0, 7.0, 0.0, 1.0, 0.0, 2.0))","List(1, 2, List(), List(179.55647544022466, 120.44352455977516))","List(1, 2, List(), List(0.5985215848007492, 0.40147841519925076))",0.0
894,2,62.0,9.6875,1,0.0,2.0,0.0,"List(1, 7, List(), List(2.0, 62.0, 9.6875, 1.0, 0.0, 2.0, 0.0))","List(1, 2, List(), List(259.2312260766565, 40.76877392334358))","List(1, 2, List(), List(0.8641040869221881, 0.13589591307781188))",0.0
895,3,27.0,8.6625,1,0.0,0.0,0.0,"List(1, 7, List(), List(3.0, 27.0, 8.6625, 1.0, 0.0, 0.0, 0.0))","List(1, 2, List(), List(264.0989534920208, 35.90104650797917))","List(1, 2, List(), List(0.8803298449734027, 0.11967015502659722))",0.0
896,3,22.0,12.2875,0,1.0,0.0,2.0,"List(1, 7, List(), List(3.0, 22.0, 12.2875, 0.0, 1.0, 0.0, 2.0))","List(1, 2, List(), List(134.38900822258123, 165.61099177741883))","List(1, 2, List(), List(0.4479633607419373, 0.5520366392580627))",1.0
897,3,14.0,9.225,1,0.0,0.0,0.0,"List(1, 7, List(), List(3.0, 14.0, 9.225, 1.0, 0.0, 0.0, 0.0))","List(1, 2, List(), List(236.68011469998038, 63.3198853000197))","List(1, 2, List(), List(0.7889337156666011, 0.21106628433339897))",0.0
898,3,30.0,7.6292,1,1.0,2.0,1.0,"List(1, 7, List(), List(3.0, 30.0, 7.6292, 1.0, 1.0, 2.0, 1.0))","List(1, 2, List(), List(119.630211637394, 180.369788362606))","List(1, 2, List(), List(0.39876737212464664, 0.6012326278753533))",1.0
899,2,26.0,29.0,0,0.0,0.0,0.0,"List(0, 7, List(0, 1, 2), List(2.0, 26.0, 29.0))","List(1, 2, List(), List(239.9300088922145, 60.06999110778552))","List(1, 2, List(), List(0.7997666963073816, 0.2002333036926184))",0.0
900,3,18.0,7.2292,1,1.0,1.0,2.0,"List(1, 7, List(), List(3.0, 18.0, 7.2292, 1.0, 1.0, 1.0, 2.0))","List(1, 2, List(), List(83.63418607240489, 216.36581392759504))","List(1, 2, List(), List(0.2787806202413497, 0.7212193797586502))",1.0
901,3,21.0,24.15,0,0.0,0.0,0.0,"List(0, 7, List(0, 1, 2), List(3.0, 21.0, 24.15))","List(1, 2, List(), List(260.8729827852697, 39.12701721473033))","List(1, 2, List(), List(0.8695766092842323, 0.13042339071576778))",0.0


In [16]:
# import mleap.pyspark

# from mleap.pyspark.spark_support import SimpleSparkSerializer

 

# pip_model.serializeToBundle("jar:___file:/20news_pipeline-json.zip", sparkTransformed) 

# dbutils.fs.cp("___file:/20news_pipeline-json.zip", "dbfs:/FileStore/20news_pipeline-json.zip")

# display(dbutils.fs.ls("dbfs:/")) 