In [None]:
%matplotlib inline

In [None]:
## Init data

import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier

# Parameters
n_classes = 2
plot_colors = "ryb"
plot_step = 0.02

# Load data
iris = load_iris()
pair = [0, 1]

# We only take the two corresponding features
X = iris.data[:, pair]
y = iris.target > 0
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train
clf = DecisionTreeClassifier(max_depth=3, min_samples_leaf=1).fit(X_train, y_train)

x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, plot_step),
                     np.arange(y_min, y_max, plot_step))
plt.tight_layout(h_pad=0.5, w_pad=0.5, pad=2.5)

Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
cs = plt.contourf(xx, yy, Z, cmap=plt.cm.RdYlBu)

plt.xlabel(iris.feature_names[pair[0]])
plt.ylabel(iris.feature_names[pair[1]])

# Plot the test points
for i, color in zip(range(n_classes), plot_colors):
    idx = np.where(y_test == i)
    plt.scatter(X_test[idx, 0], X_test[idx, 1], c=color, label=iris.target_names[i],
                cmap=plt.cm.RdYlBu, edgecolor='black', s=15)

plt.suptitle("Decision surface of a decision tree using paired features")
plt.legend(loc='lower right', borderpad=0, handletextpad=0)
plt.axis("tight")
plt.show()

import pandas as pd

print(y_train.shape)
df = pd.DataFrame(np.hstack((X_train, y_train.reshape((-1, 1)))), index=np.arange(y_train.shape[0]), columns=['sepal_length', 'sepal_width', 'target'])
df.to_csv('train_data.csv', index=False)

df = pd.DataFrame(np.hstack((X_test, y_test.reshape((-1, 1)))), index=np.arange(y_test.shape[0]), columns=['sepal_length', 'sepal_width', 'target'])
df.to_csv('test_data.csv', index=False)

In [None]:
# -*- coding: utf-8 -*-
import json
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
import argparse
from pyspark.ml import feature

class DecisionTreeClassifier:
    def __init__(self, input_columns=None, target_column=None, max_depth=3, min_samples_leaf=1, max_bins=10):
        self.input_columns = input_columns
        self.target_column = target_column
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf
        self.max_bins = max_bins

        self.tree = {}
        
    def fit(self, df):
        self.tree['percentiles'] = {}
        
        df = df.cache()
        df_size = df.count()
        
        for attr in self.input_columns:
            percentiles = []
            
            sorted_rdd = df.select(attr).rdd.sortBy(lambda x: x[0], True)
            indexed = sorted_rdd.zipWithIndex().map(lambda x: (x[1], x[0]))
            
            for i in range(1, self.max_bins):
                index = i * df_size // self.max_bins
                percentiles.append(indexed.lookup(index)[0][0])
            
            percentiles = sorted(set(percentiles))
            bucketizer = feature.Bucketizer(splits=[float('-inf')] + percentiles + [float('inf')],
                                            inputCol=attr, outputCol='bucket')
            df = bucketizer.transform(df)
            df = (df.withColumn(attr, df['bucket'].cast('int'))
                  .drop('bucket')
                 )
            
            self.tree['percentiles'][attr] = percentiles
        
        self.init_node(self.tree, 0, df)
        
        del self.tree['percentiles']
        
        return self
    
    def init_node(self, node, depth, part):
        n = part.count()
        best_gain = float('-inf')
        best_attr = None
        best_cutoff = None
        best_cutoff_index = None
        
        if depth < self.max_depth and n > self.min_samples_leaf:

            for attr in self.input_columns:
                stats = part.groupBy(attr).agg(
                    f.count('*').alias('count'),
                    f.avg(self.target_column).alias('avg')
                )

                stats = sorted(tuple(x) for x in stats.collect())
                cutoff = [x[0] for x in stats]
                count = [x[1] for x in stats]
                weight = [x[2] for x in stats]

                for i in range(1, len(stats)):
                    n_l = sum(count[:i])
                    n_r = sum(count[i:])
                    if n_l < self.min_samples_leaf or n_r < self.min_samples_leaf:
                        continue

                    p_l = sum(a * b for a, b in zip(count[:i], weight[:i])) / n_l
                    p_r = sum(a * b for a, b in zip(count[i:], weight[i:])) / n_r

                    q_l = 1 - p_l
                    q_r = 1 - p_r

                    gain = -(1 - p_l ** 2 - q_l ** 2) * n_l / n - (1 - p_r ** 2 - q_r ** 2) * n_r / n

                    if gain > best_gain:
                        best_gain  = gain
                        best_attr = attr
                        best_cutoff = self.tree['percentiles'][attr][cutoff[i] - 1]
                        best_cutoff_index = cutoff[i]
            
        if best_attr is not None:
            node['attr'], node['cutoff'] = best_attr, best_cutoff
            node['left'], node['right'] = {}, {}
            
            left_part = part.filter(part[best_attr] < best_cutoff_index)
            self.init_node(node['left'], depth + 1, left_part)
            right_part = part.filter(part[best_attr] >= best_cutoff_index)
            self.init_node(node['right'], depth + 1, right_part)
        else:
            node['weight'] = part.groupBy().avg(self.target_column).collect()[0][0]

    def predict_proba(self, df):
        return df.rdd.map(lambda row: predict_node(self.tree, row))
    
    def save(self, filename, spark):
        spark.sparkContext.parallelize((json.dumps(self.tree), )).saveAsTextFile(filename)
        return self
            
    def load(self, filename, spark):
        self.tree = json.loads(spark.sparkContext.textFile(filename).first())
        return self
    
def predict_node(node, row):
    if 'attr' in node:
        choice = row[node['attr']] >= node['cutoff']
        next_node = node['right'] if choice else node['left']
        return predict_node(next_node, row)
    else:
        return (1 - node['weight'], node['weight'])
    
def parse_list(l):
    return l.split(',')

def ftrain(spark, args):
    train_data = spark.read.csv(args.train_data, header=True)
    train_data = train_data.select(
        train_data[args.target_column].cast('int'),
        *(train_data[col].cast('float') for col in args.input_columns)
    )
    
    clf = DecisionTreeClassifier(input_columns=args.input_columns, 
                                 target_column=args.target_column, 
                                 max_depth=args.max_depth,
                                 min_samples_leaf=args.min_samples_leaf,
                                 max_bins=args.max_bins
                                )
    clf.fit(train_data)
    clf.save(args.tree_filename, spark)
    return clf

def fpredict(spark, args):
    test_data = spark.read.csv(args.test_data, header=True)
    test_data = test_data.select(
        *(test_data[col].cast('float') for col in test_data.columns)
    )
    
    clf = DecisionTreeClassifier().load(args.tree_filename, spark)
    proba = clf.predict_proba(test_data)
    proba.map(lambda x: (','.join(str(c) for c in x) )).saveAsTextFile(args.predict_filename)
    return proba

def main(params=None):
    parser = argparse.ArgumentParser('Correlation calculator')
    subparsers = parser.add_subparsers(help='train or predict')
    
    train = subparsers.add_parser('train')
    train.add_argument('train_data')
    train.add_argument('input_columns', type=parse_list)
    train.add_argument('target_column')
    train.add_argument('max_depth', type=int)
    train.add_argument('min_samples_leaf', type=int)
    train.add_argument('max_bins', type=int)
    train.add_argument('tree_filename')
    train.set_defaults(func=ftrain)
    
    predict = subparsers.add_parser('predict')
    predict.add_argument('test_data')
    predict.add_argument('tree_filename')
    predict.add_argument('predict_filename')
    predict.set_defaults(func=fpredict)
    
    if params is not None:
        args = parser.parse_args(params.split())
    else:
        args = parser.parse_args()
    
    spark = (SparkSession.builder.appName("Solution")
                .getOrCreate())

    return args.func(spark, args)

if __name__ == '__main__':
    main()
# print(main('train train_data.csv sepal_length,sepal_width target 3 1 10 tree.json'))
# print(main('predict test_data.csv tree.json predict.csv'))

In [None]:
!rm -rf predict.csv/ tree.json/
!python dt.py train train_data.csv sepal_length,sepal_width target 3 1 10 tree.json
!python dt.py predict test_data.csv tree.json predict.csv

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from pyspark.sql.types import FloatType, StructType, StructField
from sklearn.datasets import load_iris

# Parameters
n_classes = 2
plot_colors = "ryb"
plot_step = 0.02

# Load data
iris = load_iris()
pair = [0, 1]

# We only take the two corresponding features


# Train
#clf = main('train train_data.csv sepal_length,sepal_width target 3 1 10 tree.json')
#proba = main('predict test_data.csv tree.json predict.csv')

spark = (SparkSession.builder.appName("Solution")
         .getOrCreate())
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, plot_step),
                     np.arange(y_min, y_max, plot_step))
plt.tight_layout(h_pad=0.5, w_pad=0.5, pad=2.5)

df = np.c_[xx.ravel(), yy.ravel()]


df = spark.createDataFrame(df.tolist(), StructType([
        StructField('sepal_length', FloatType(), False),
        StructField('sepal_width', FloatType(), False)
]))

clf = DecisionTreeClassifier().load('tree.json', spark)
proba = clf.predict_proba(df)
proba = proba.map(lambda x: x[1] - x[0]).collect()

Z = np.array(proba).astype(np.float32).reshape(xx.shape)
cs = plt.contourf(xx, yy, Z, cmap=plt.cm.RdYlBu)

plt.xlabel(iris.feature_names[pair[0]])
plt.ylabel(iris.feature_names[pair[1]])

# Plot the test points
for i, color in zip(range(n_classes), plot_colors):
    idx = np.where(y_test == i)
    plt.scatter(X_test[idx, 0], X_test[idx, 1], c=color, label=iris.target_names[i],
                cmap=plt.cm.RdYlBu, edgecolor='black', s=15)

plt.suptitle("Decision surface of a decision tree using paired features")
plt.legend(loc='lower right', borderpad=0, handletextpad=0)
plt.axis("tight")
plt.show()

import pandas as pd

df = pd.DataFrame(np.hstack((X_train, y_train.reshape((-1, 1)))), index=np.arange(y_train.shape[0]), columns=['sepal_length', 'sepal_width', 'target'])
df.to_csv('train_data.csv', index=False)

df = pd.DataFrame(np.hstack((X_test, y_test.reshape((-1, 1)))), index=np.arange(y_test.shape[0]), columns=['sepal_length', 'sepal_width', 'target'])
df.to_csv('test_data.csv', index=False)