In [3]:
import findspark
findspark.init('/home/mint/spark-2.1.0-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('logistic').getOrCreate()
data = spark.read.csv('titanic.csv', inferSchema=True, header=True)
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [4]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [6]:
# from pyspark.ml.feature import 
my_columns = data.select(['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked'])
## Drop missing data
final_data = my_columns.na.drop()

In [7]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer

In [8]:
gender_indexer = StringIndexer(inputCol = 'Sex', outputCol = 'SexIndex')
gender_encoder = OneHotEncoder(inputCol = 'SexIndex', outputCol = 'SexVec')
embark_indexer = StringIndexer(inputCol = 'Embarked', outputCol = 'EmbarkedIndex')
embark_encoder = OneHotEncoder(inputCol = 'EmbarkedIndex', outputCol = 'EmbarkedVec')
assembler = VectorAssembler(inputCols = ['Pclass', 'SexVec', 'Age', 'SibSp', 'Parch', 'Fare', 'EmbarkedVec'],
                            outputCol = 'features')

In [9]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
model = LogisticRegression(featuresCol = 'features', labelCol = 'Survived')
pipline = Pipeline(stages = [gender_indexer, gender_encoder,
                             embark_indexer, embark_encoder,
                             assembler, model])

In [10]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])
fit_model = pipline.fit(train_data)

In [13]:
results = fit_model.transform(test_data)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
myEval = BinaryClassificationEvaluator(rawPredictionCol = 'prediction', labelCol = 'Survived')
results.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [14]:
AUC = myEval.evaluate(results)
AUC

0.7989266547406082