# Decision Tree Classification in Parallel

This notebook demonstrates basic functionality for parallelizing decision tree classification using the Ray library in Python.

## Setup Ray Wrapper

#### Install ray if desired

In [None]:
import sys
!{sys.executable} -m pip install ray

#### Initialize Ray. Number of cores can be specified.

In [1]:
import ray
ray.init()

{'node_ip_address': '127.0.0.1',
 'raylet_ip_address': '127.0.0.1',
 'redis_address': '127.0.0.1:6379',
 'object_store_address': '/tmp/ray/session_2021-11-25_09-59-53_770709_747/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-11-25_09-59-53_770709_747/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2021-11-25_09-59-53_770709_747',
 'metrics_export_port': 52855,
 'node_id': '8ec3faf19d65473646a79ce6ca9b8b87d5c54a8b7ae0eb69de1f0ba1'}

## Obtain our data

To use decision tree classification, we must obtain a dataset designed for classification. For our purposes,
we are using the abalone dataset found on the UCI repository page for machine learning.

We also have code written that generates a decision tree. First, we must obtain a dataframe and a dictionary of attributes to help our classification.

In [2]:
import induceC45
import time
import Classifier

df, attr = induceC45.csv_to_df('./data/abalone.csv')

df # contains our data

Unnamed: 0,Sex,Length,Diameter,Height,WW,SW,VW,SW.1,Rings
2,M,0.455,0.365,0.095,0.5140,0.2245,0.1010,0.1500,15.0
3,M,0.350,0.265,0.090,0.2255,0.0995,0.0485,0.0700,7.0
4,F,0.530,0.420,0.135,0.6770,0.2565,0.1415,0.2100,9.0
5,M,0.440,0.365,0.125,0.5160,0.2155,0.1140,0.1550,10.0
6,I,0.330,0.255,0.080,0.2050,0.0895,0.0395,0.0550,7.0
...,...,...,...,...,...,...,...,...,...
4174,F,0.565,0.450,0.165,0.8870,0.3700,0.2390,0.2490,11.0
4175,M,0.590,0.440,0.135,0.9660,0.4390,0.2145,0.2605,10.0
4176,M,0.600,0.475,0.205,1.1760,0.5255,0.2875,0.3080,9.0
4177,F,0.625,0.485,0.150,1.0945,0.5310,0.2610,0.2960,10.0


### Initializing a remote function
We must declare a remote function to assist ray with parallelization. The functions below allow parallel and nonparallel generation of decision trees.

rf_parallel and rf_nonparallel both take in a number of trees and generates a forest -- one in parallel and one not.

In [3]:
@ray.remote
def c45_wrapper():
    return induceC45.c45(df, attr, 0.3, True, './data/abalone.csv')

def rf_parallel(trees):
    begin = time.perf_counter()
    forest = ray.get([c45_wrapper.remote() for i in range(trees)])
    end = time.perf_counter()
    return forest, end - begin

def rf_nonparallel(trees):
    begin = time.perf_counter()
    forest = [(induceC45.c45(df, attr, 0.3, True, './data/abalone.csv')) for i in range(trees)]
    end = time.perf_counter()
    return forest, end - begin

We can generate 1 tree in a nonparallel environment and determine the time taken.

In [None]:
forest_nonparallel, time_nonparallel = rf_nonparallel(1)
time_nonparallel

Below is a function wrapper that displays timing results for generating a forest in either environment

In [4]:
def test(num_trees, parallel=True):
    if parallel:
        forest, time = rf_parallel(num_trees)
    else:
        forest, time = rf_nonparallel(num_trees)
    print(f"{'Parallel' if parallel else 'Nonparallel'} timing results for {num_trees} trees: {time}")
    return forest, time
    

In [9]:
test(1)

Parallel timing results for 1 trees: 25.56762554200003


([{'dataset': './data/abalone.csv',
   'node': {'var': 'SW.1',
    'edges': [{'edge': {'value': 0.144,
       'direction': 'le',
       'node': {'var': 'Diameter',
        'edges': [{'edge': {'value': 0.22,
           'direction': 'le',
           'node': {'var': 'SW.1',
            'edges': [{'edge': {'value': 0.0215,
               'direction': 'le',
               'leaf': {'decision': 4.0, 'p': 0.4479166666666667}}},
             {'edge': {'value': 0.0215,
               'direction': 'gt',
               'leaf': {'decision': 5.0, 'p': 0.4146341463414634}}}]}}},
         {'edge': {'value': 0.22,
           'direction': 'gt',
           'leaf': {'decision': 7.0, 'p': 0.2991718426501035}}}]}}},
     {'edge': {'value': 0.144,
       'direction': 'gt',
       'leaf': {'decision': 9.0, 'p': 0.19385026737967914}}}]}}],
 25.56762554200003)

In [4]:
@ray.remote
def classify(tree, data):
    return Classifier.bfs(data, tree)

def rf_classification_parallel(forest, df):
    begin = time.perf_counter()
    for index, row in df.iterrows():
        pred = ray.get([classify.remote(x, row) for x in forest])
    end = time.perf_counter()
    return end - begin

def rf_classification_nonparallel(forest, data):
    begin = time.perf_counter()
    predictions = []
    for index, row in data.iterrows():
        for tree in forest:
            predictions.append(Classifier.bfs(row, tree))
    end = time.perf_counter()
    return end - begin


### Notes before we begin data classification experiments

Below, we will generate a random forest (in parallel) and classify our dataset against the classifier.
In practice, you do not want to test your classifier against the data it trained with. However, we are not
concerned with the accuracy of our classifier; rather we care about the time it takes to classify the data
within.

First, lets generate our forest.

In [8]:
forest = rf_parallel(100)


In [27]:
ray.shutdown()

In [25]:
t_classify = rf_classification_parallel(forest[0][:5], df)

[2m[36m(classify pid=773)[0m 


In [24]:
t_classify

2.1994371030000366

In [20]:
print(forest[0][:10])

[{'dataset': './data/abalone.csv', 'node': {'var': 'SW.1', 'edges': [{'edge': {'value': 0.144, 'direction': 'le', 'node': {'var': 'Diameter', 'edges': [{'edge': {'value': 0.22, 'direction': 'le', 'node': {'var': 'SW.1', 'edges': [{'edge': {'value': 0.0215, 'direction': 'le', 'leaf': {'decision': 4.0, 'p': 0.4479166666666667}}}, {'edge': {'value': 0.0215, 'direction': 'gt', 'leaf': {'decision': 5.0, 'p': 0.4146341463414634}}}]}}}, {'edge': {'value': 0.22, 'direction': 'gt', 'leaf': {'decision': 7.0, 'p': 0.2991718426501035}}}]}}}, {'edge': {'value': 0.144, 'direction': 'gt', 'leaf': {'decision': 9.0, 'p': 0.19385026737967914}}}]}}, {'dataset': './data/abalone.csv', 'node': {'var': 'SW.1', 'edges': [{'edge': {'value': 0.144, 'direction': 'le', 'node': {'var': 'Diameter', 'edges': [{'edge': {'value': 0.22, 'direction': 'le', 'node': {'var': 'SW.1', 'edges': [{'edge': {'value': 0.0215, 'direction': 'le', 'leaf': {'decision': 4.0, 'p': 0.4479166666666667}}}, {'edge': {'value': 0.0215, 'dire