In [1]:
## Reference: Notes of Dr Sridhar Nerur for Big Data 
## Reference Data bricks ML Lib tutorials https://docs.databricks.com/spark/latest/mllib/index.html#mllib

train_df = sqlContext.read.load('/FileStore/tables/train.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

test_df = sqlContext.read.load('/FileStore/tables/test.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')





In [6]:
#Marking the train and test with colum ident and combining to form a single data set

from pyspark.sql.functions import lit, col
train_df = train_df.withColumn('ident',lit('train'))

test_df = (test_df.withColumn('Survived',lit(0))
                  .withColumn('ident',lit('test')))
test_df = test_df[train_df.columns]

df = train_df.unionAll(test_df)





In [8]:
#test_df.show(2)

In [9]:
# Feature engineering to create a new column from name containg  the title 
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

Createtitle =  udf(lambda name: name.split(',')[1].split('.')[0],StringType())
df = df.withColumn('Title', Createtitle(df['Name']))

In [10]:
#df.show(3)

In [11]:
df = (df.withColumn('Age',df['Age'].cast('double'))
            .withColumn('SibSp',df['SibSp'].cast('double'))
            .withColumn('Parch',df['Parch'].cast('double'))
            .withColumn('Fare',df['Fare'].cast('double'))
            .withColumn('Survived',df['Survived'].cast('double'))
            )
df.printSchema()

In [12]:
# Creationg a function to count the null values in the features and filling the null values with mean where applicable 

def countofnull(df,feature):
  return df.where(df[feature].isNull()).count()

listofnumfeat = ['Survived','Age','SibSp','Parch','Fare']
nulls = {feature: countofnull(df,feature) for feature in listofnumfeat}
age_m = df.groupBy().mean('Age').first()[0]
fare_m = df.groupBy().mean('Fare').first()[0]
df = df.na.fill({'Age':age_m,'Fare':fare_m})

In [13]:


listofvar1 = ['Survived','Age','SibSp','Parch','Fare','PassengerId','Pclass','Name','Sex','Ticket','Cabin','Embarked','Title']
nulls = {var: countofnull(df,var) for var in listofvar1}
print(nulls)

In [1]:

df = df.drop("Name")


NameError: name 'df' is not defined

In [15]:

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["Pclass" ,"Sex","Title"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [16]:
label_stringIdx = StringIndexer(inputCol = "Survived", outputCol = "label")
stages += [label_stringIdx]

In [17]:
numericCols = [ "Age", "SibSp", "Parch","Fare"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


In [21]:

cols = df.columns
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
#display(dataset)
#type(dataset)
dataset.toPandas()

In [22]:
dataset.show(5)

In [23]:
#Seprating the trained and test data which we combined earlier 
train_df = dataset.where(df.ident =='train')
test_df = dataset.where(df.ident =='test')


In [24]:
train_df.show(19)

In [25]:
#specifing the random split ratio for the train and test data 
(trainingData, ValidateData) = train_df.randomSplit([0.65, 0.35],seed = 120)
print trainingData.count()
print ValidateData.count()

In [26]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features")

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10,15])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)

In [27]:
#evaluator.getMetricName()

In [28]:
#from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
#lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
#rModel = lr.fit(train_df)

In [29]:
predictions = cvModel.transform(ValidateData)
#Output_Predictions = cvModel.transform(test_df)
predictions.printSchema()
#Output_Predictions.printSchema()
#predictions.show(60)
#Output_Predictions.show(60)


In [32]:
evaluator.evaluate(predictions)

In [34]:
#predictions = lrModel.transform(ValidateData)
#Output_Predictions = lrModel.transform(test_df)
#predictions.printSchema()
#predictions.show(60)

In [35]:
print 'Model Intercept: ', cvModel.bestModel.intercept

In [36]:
weights = cvModel.bestModel.coefficients
weights = map(lambda w: (float(w),), weights)  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
weightsDF.toPandas()

In [37]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
(trainingData, ValidateData) = train_df.randomSplit([0.65, 0.35], seed = 120)
print trainingData.count()
print ValidateData.count()
# Train model with Training Data
#dtModel = dt.fit(train_df)

In [38]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator,numFolds=5)

# Run cross validations
cvModel = cv.fit(train_df)
# Takes ~5 minutes

In [39]:
print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth
predictions_Dt = cvModel.transform(ValidateData)
#Output_predictions = cvModel.transform(test_df)
evaluator.evaluate(predictions)

In [41]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
(trainingData, ValidateData) = train_df.randomSplit([0.65, 0.35],seed= 120)

# Train model with Training Data
#rfModel = rf.fit(trainingData)

In [42]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20,40, 60])
             .addGrid(rf.numTrees, [5,10,15,20])
             .build())
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [43]:
predictions_rf = cvModel.transform(ValidateData)
Output_predictions = cvModel.transform(test_df)

In [44]:
evaluator.evaluate(predictions)

In [46]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [47]:
(trainingData, ValidateData) = train_df.randomSplit([0.65, 0.35], seed = 121)
layers = [24, 5, 4, 2]
mp = MultilayerPerceptronClassifier(layers = layers)



In [48]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(mp.maxIter, [50,100,200])
             .addGrid(mp.blockSize, [128,256,512])
             .addGrid(mp.seed, [1234,3223])
             .build())
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

cv = CrossValidator(estimator=mp, estimatorParamMaps=paramGrid, evaluator=evaluator)


# Run cross validations.  
cvModel = cv.fit(trainingData)

In [49]:
predictions_mc = cvModel.transform(ValidateData)

In [51]:
evaluator.evaluate(predictions_mc)

In [52]:
bestModel = cvModel.bestModel
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(ValidateData)
finalPredictions.printSchema()
# Evaluate best model
evaluator.evaluate(finalPredictions)

In [53]:
#OutputPredictions = bestModel.transform(test_df)

In [54]:
#OutputPredictions['PassengerId','prediction'].write.csv('/FileStore/Output_pred/pre4')

In [55]:
#print (type(OutputPredictions))