In [None]:
events = (sqlContext.read.csv('hdfs://hdfs-mesos/data.csv', sep=';', inferSchema=True)
    .withColumnRenamed('_c0', 'time')
    .withColumnRenamed('_c1', 'browser')
    .withColumnRenamed('_c2', 'os')
    .withColumnRenamed('_c3', 'deviceType')
    .cache())
events.first()

In [None]:
all_data = events.select('os', 'browser', 'deviceType')
all_data.take(5)

In [None]:
(training, test) = all_data.randomSplit([0.8, 0.2])

In [None]:
training.take(5)

In [None]:
test.groupBy("deviceType").count().show()

In [None]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
os_idx= StringIndexer(inputCol="os", outputCol="os_idx")
browser_idx = StringIndexer(inputCol="browser", outputCol="browser_idx")
device_idx = StringIndexer(inputCol="deviceType", outputCol="device_idx")

In [None]:
device_labels = device_idx.fit(all_data).labels # This part is very important!

In [None]:
for i, x in zip(range(len(device_labels)), device_labels):
    print i, x

In [None]:
vec = VectorAssembler(inputCols=[os_idx.getOutputCol(), browser_idx.getOutputCol()], outputCol="features")

In [None]:
cls = DecisionTreeClassifier(labelCol="device_idx", featuresCol="features", maxDepth=5, impurity="gini")

In [None]:
pred_rev = IndexToString(inputCol="prediction", outputCol="prediction_label", labels = device_labels)

In [None]:
pipeline = Pipeline(stages=[os_idx, browser_idx, device_idx, vec, cls, pred_rev])

In [None]:
model = pipeline.fit(training)

In [None]:
result = model.transform(test)

In [None]:
first = result.first()

In [None]:
first

In [None]:
evaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="device_idx")
print("Test set accuracy = " + str(evaluator.evaluate(result)))

In [None]:
tree_str = model.stages[-2].toDebugString

In [None]:
print tree_str

In [None]:
os_labels = model.stages[0].labels
for i, x in zip(range(len(os_labels)), os_labels):
    print i, x

In [None]:
browser_labels = model.stages[1].labels
for i, x in zip(range(len(browser_labels)), browser_labels):
    print i, x

In [None]:
import json

def parse_labels(line):
    if line.startswith('feature 0'):
        line = line.replace('feature 0', 'os')
        labels = os_labels
    elif line.startswith('feature 1'):
        line = line.replace('feature 1', 'browser')
        labels = browser_labels
    elif line.startswith('Predict'):
        labels = device_labels
        
    for i, label in zip(range(len(labels)), labels):
        line = line.replace(str(i) + ".0", label)
    return line

# Mostly copied from https://github.com/hechemeljed/Decision-Tree-Visualization-Spark/blob/master/DT.py:
def parse(lines):
    block = []
    while lines :
        if lines[0].startswith('If'):
            bl = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
            block.append({'name':parse_labels(bl), 'children':parse(lines)})
            if lines[0].startswith('Else'):
                be = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
                block.append({'name':parse_labels(be), 'children':parse(lines)})
        elif not lines[0].startswith(('If','Else')):
            block2 = lines.pop(0)
            block.append({'name':parse_labels(block2)})
        else:
            break
    return block

def tree_json(tree):
    data = []
    for line in tree.splitlines() : 
        if line.strip():
            line = line.strip()
            data.append(line)
        else : break
        if not line : break
    res = []
    res.append({'name':'Root', 'children':parse(data[1:])})
    return json.dumps(res[0], indent=4)

print tree_json(tree_str)