In [33]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
from numpy import array
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [2]:
rdd = sc.textFile("/HDFS.home/data.txt").map(lambda x: x.split("\t"))

In [40]:
rdd.take(2)

[[u'35,5', u'no', u'yes', u'no', u'no', u'no', u'no', u'no'],
 [u'35,9', u'no', u'no', u'yes', u'yes', u'yes', u'yes', u'no']]

In [3]:
def binary(YN):
    if(YN == "yes"):
        return 1
    else:
        return 0

In [9]:
def createLb(fields):
    f1 = int(fields[0][0])*10+int(fields[0][1])
    f2 = int(fields[0][3])
    f3 = binary(fields[1])
    f4 = binary(fields[2])
    f5 = binary(fields[3])
    f6 = binary(fields[4])
    f7 = binary(fields[5])
    lab1 = binary(fields[6])
    lab2 = binary(fields[7])
    return LabeledPoint(lab2, array([f1,f2,f3,f4,f5,f6,f7]))

#### Training and Testing splits

In [5]:
(trainingData, testData) = rdd.randomSplit([0.7, 0.3])

#### Creating Labeled points for training set 1 (Inflammation of urinary bladder)

In [6]:
train_lb1 = trainingData.map(createLb)

In [7]:
train_lb1.take(5)

[LabeledPoint(1.0, [35.0,9.0,0.0,0.0,1.0,1.0,1.0]),
 LabeledPoint(0.0, [35.0,9.0,0.0,1.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [36.0,0.0,0.0,1.0,0.0,0.0,0.0]),
 LabeledPoint(1.0, [36.0,2.0,0.0,0.0,1.0,1.0,1.0]),
 LabeledPoint(1.0, [36.0,7.0,0.0,0.0,1.0,1.0,1.0])]

#### Building model for predicting Inflammation of urinary bladder

In [8]:
model_1 = DecisionTree.trainClassifier(train_lb1, numClasses = 2,
                                    categoricalFeaturesInfo={2:2,3:2,4:2,5:2,6:2},
                                    impurity = 'gini', maxDepth=5, maxBins=32)

#### Creating Labeled points for training set 2 (Nephritis of renal pelvis origin)

In [10]:
train_lb2 = trainingData.map(createLb)

In [11]:
train_lb2.take(5)

[LabeledPoint(0.0, [35.0,9.0,0.0,0.0,1.0,1.0,1.0]),
 LabeledPoint(0.0, [35.0,9.0,0.0,1.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [36.0,0.0,0.0,1.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [36.0,2.0,0.0,0.0,1.0,1.0,1.0]),
 LabeledPoint(0.0, [36.0,7.0,0.0,0.0,1.0,1.0,1.0])]

#### Building model for predicting Nephritis of renal pelvis origin

In [12]:
model_2 = DecisionTree.trainClassifier(train_lb2, numClasses = 2,
                                    categoricalFeaturesInfo={2:2,3:2,4:2,5:2,6:2},
                                    impurity = 'gini', maxDepth=5, maxBins=32)

#### Creating Test Dataset

In [13]:
def create_test(fields):
    f1 = int(fields[0][0])*10+int(fields[0][1])
    f2 = int(fields[0][3])
    f3 = binary(fields[1])
    f4 = binary(fields[2])
    f5 = binary(fields[3])
    f6 = binary(fields[4])
    f7 = binary(fields[5])
    lab1 = binary(fields[6])
    lab2 = binary(fields[7])
    return array([f1,f2,f3,f4,f5,f6,f7,lab1,lab2])

In [21]:
test = testData.map(create_test)
features = test.map(lambda x: x[0:7])
labels = test.map(lambda x: x[7:9])

In [19]:
features.take(1)

[array([35,  5,  0,  1,  0,  0,  0])]

In [22]:
labels.take(1)

[array([0, 0])]

#### Testing predictions of Inflammation of urinary bladder

In [24]:
pred_1 = model_1.predict(features)

In [26]:
pred_1.sum()

16.0

In [28]:
labels.map(lambda x: x[0]).sum()

16

In [38]:
pred_1.map(lambda x: int(x))

#### Testing predictions of Nephritis of renal pelvis origin

In [29]:
pred_2 = model_2.predict(features)

In [30]:
pred_2.sum()

17.0

In [31]:
labels.map(lambda x: x[1]).sum()

17