In [1]:
import pandas as pd
train_df = sqlContext.read.load('/FileStore/tables/train.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')



test_df=sqlContext.read.load('/FileStore/tables/test.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

#pd.DataFrame(train_df.take(3), columns = census_data.columns)



In [2]:
train_df.show(2)
test_df.show(2)

In [3]:
## Add Survived column to test
from pyspark.sql.functions import lit, col
train_df = train_df.withColumn('Type',lit('train'))

train_df.show(3)

test_df = (test_df.withColumn('Survived',lit(0))
                  .withColumn('Type',lit('test')))
test_df.show(3)



In [4]:
test_df = test_df[train_df.columns]
test_df.show(3)
# Append Test data to Train data
df = train_df.unionAll(test_df)

In [5]:
df.show(3)
df = (df.withColumn('Age',df['Age'].cast('float'))
            .withColumn('SibSp',df['SibSp'].cast('float'))
            .withColumn('Parch',df['Parch'].cast('float'))
            .withColumn('Fare',df['Fare'].cast('float'))
            .withColumn('Survived',df['Survived'].cast('float'))
            )
df.printSchema()

In [6]:
#Impute missing Age and Fare with the Average

def nullCount(df,var):
    return df.filter(df[var].isNull()).count()
  
numVars = ['Survived','Age','SibSp','Parch','Fare'] 
missing = {var: nullCount(df,var) for var in numVars}
age_mean = df.groupBy().mean('Age').first()[0]
fare_mean = df.groupBy().mean('Fare').first()[0]
df = df.na.fill({'Age':age_mean,'Fare':fare_mean})
df.show(5)

In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
splitFname = udf(lambda name: name.split(',')[1].split('.')[0],StringType())
df = df.withColumn('Title',splitFname(df['Name']))
df.show(3)



In [8]:
###One-Hot Encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["Sex", "Pclass","Title"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [9]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "Survived", outputCol = "label")
stages += [label_stringIdx]

In [10]:
# Transform all features into a vector using VectorAssembler
numericCols = ["Age", "SibSp", "Parch", "Fare"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [11]:
df.show()

In [12]:
import pandas as pd
cols = df.columns
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)

# Keep relevant columns
selectedcols = ["label","features"] + cols
dataset = dataset.select(selectedcols)
dataset.show(3)
#display(dataset)
#type(dataset)
dataset.toPandas()

In [13]:
#split back train/test data
train = dataset.filter(df.Type =='train')
test = dataset.filter(df.Type =='test')

In [14]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = train.randomSplit([0.65, 0.35], seed = 110)
print trainingData.count()
print testData.count()

In [15]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [16]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)
predictions.printSchema()

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing


In [20]:
# Use test set here so we can measure the accuracy of our model on new data

predictions = cvModel.transform(testData)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [22]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [23]:
print "numNodes = ", dtModel.numNodes
print "depth = ", dtModel.depth

# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

predictions.printSchema()

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

dt.getImpurity()

In [25]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())

In [26]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth

In [27]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)


In [28]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [29]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

predictions.printSchema()

In [30]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [31]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [32]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [33]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [35]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [24, 5, 4, 2]

# create the trainer and set its parameters
mp = MultilayerPerceptronClassifier(layers=layers)

# Train model with Training Data
mpModel = mp.fit(trainingData)

In [36]:
# Make predictions on test data using the Transformer.transform() method.
predictions = mpModel.transform(testData)
predictions.printSchema()

In [37]:
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

In [38]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(mp.maxIter, [100,200])
             .addGrid(mp.blockSize, [128, 256])
             .addGrid(mp.seed, [1234])
             .build())

In [39]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=mp, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [40]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)