In [1]:
import numpy as np 

data = np.array(
    [
        [.885, .725, .560, .735, .610, .260, .5, .32],
        [.33, .39, .5, .570, .63, .63, .68, .78], 
        [9.1, 10.9, 9.4, 9.8, 8.4, 11.8, 10.5, 10],
        [4,5,6,5,3,8,7,6]
    ]
).T

X = data[:,:-1]
Y = data[:,-1]

# Decision Tree Algorithm (JR Quinlan)

```python
def build_tree(data)
    if data.shape[0] == 1: 
        return [leaf, data.y, NA, NA] 
    if all data.y same: 
        return [leaf, data.y, NA, NA] 
    else
        # determine best feature i to split on
        SplitVal = data[:,i].median()
        lefttree = build_tree(data[data[:,i]<=SplitVal]) 
        righttree = build_tree(data[data[:,i]>SplitVal]) 
        root = [i, SplitVal, 1, lefttree.shape[0] + 1] 
        return append(root, lefttree, righttree)
```

How to determine the "best" Feature? 
- Goal: Divide and Conquer
- Group Data into most similar groups. 

Approaches: 
- Information Gain: Entropy
- Information Gain: Correlation 
- Information Gain: Gini Index


In [2]:
class DTLearner:
    def __init__(self, leaf_size = 1, verbose = False):
        self.leaf_size = leaf_size 
        self.verbose = verbose
        self.tree = None


    def author(self):
        return 'jachaibar3'
    
    def add_evidence(self, data_x, data_y):
        self.X = data_x
        self.y = data_y
        self.data = np.concatenate((data_x, data_y), axis = 1)

    def feature_selection(self, data):
        # Return the column index of the highest correlated feature. 
        # Transpose to get variables as rows. (see np.corcoef doc)
        # [:-1, -1]: -1 gives the last col. in correl matrix, :-1 excludes corr of y and itself (1). 
        # argmax returns the column index of the max correlation 
        return np.abs(np.corrcoef(data.T))[:-1, -1].argmax().astype(int)

    def build_tree(self, data):
        if data.shape[0] <= self.leaf_size: 
            stop1 = np.array([['Leaf', data[:, -1][0], np.nan, np.nan]], dtype = object)
            if self.verbose: 
                print(f'Stop Cond 1: Rows less than the leaf size: {data.shape}')
            return stop1
        if np.unique(data[:, -1]).shape[0] <= 1:
            stop2 = np.array([['Leaf', data[:, -1][0], np.nan, np.nan]], dtype = object)
            if self.verbose: 
                print(f'Stop Cond 2: All Y values are the same: {data.shape}, Y: {data[:, -1]}')
            return stop2

        else:
            x_ind = self.feature_selection(data)
            split_val = np.median(data[:, x_ind]).astype(float).round(3)
            if self.verbose:
                print(f'X vals: {data[:, x_ind][:5]} Split val: {split_val}')
            left_tree = self.build_tree(data[data[:, x_ind] <= split_val])
            right_tree = self.build_tree(data[data[:, x_ind] > split_val])
            root = np.array([[x_ind, split_val, 1, left_tree.shape[0] + 1]])
            self.tree = np.append(root, np.append(left_tree, right_tree, axis = 0), axis = 0)
            return self.tree
    
    def query(self, points):
        """
        Predict Y given the test set of X. 
        Given X (data points) evaluate the tree to return a leaf value for the prediction of Y. 
        """  
        for row in range(points.shape[0]):
            i = 0
            while self.tree[i, 0] != 'Leaf':
                if points[row, int(self.tree[i, 0])] <= float(self.tree[i, 1]):
                    i += int(self.tree[i, 2])
                else:
                    i += int(self.tree[i, 3])
            points[row, -1] = self.tree[i, 1]
        return points[:, -1]

learn = DTLearner(leaf_size=1)
learn.add_evidence(X, Y[:, np.newaxis])
ar = learn.build_tree(data)
# Test query
X_test = np.random.uniform(-1,1, size=(10,3)) 
learn.query(X_test)

array([3., 4., 3., 4., 3., 3., 3., 3., 3., 3.])

In [3]:
ar

array([[2.0, 9.9, 1.0, 8.0],
       [2.0, 9.25, 1.0, 4.0],
       [0.0, 0.748, 1.0, 2.0],
       ['Leaf', 3.0, nan, nan],
       ['Leaf', 4.0, nan, nan],
       [0.0, 0.648, 1.0, 2.0],
       ['Leaf', 6.0, nan, nan],
       ['Leaf', 5.0, nan, nan],
       [0.0, 0.41, 1.0, 4.0],
       [1.0, 0.705, 1.0, 2.0],
       ['Leaf', 8.0, nan, nan],
       ['Leaf', 6.0, nan, nan],
       [1.0, 0.535, 1.0, 2.0],
       ['Leaf', 5.0, nan, nan],
       ['Leaf', 7.0, nan, nan]], dtype=object)

# Random Tree Algorithim (A Cutler)

```python
def build_tree(data):
    if data.shape[0] == 1: 
        return [leaf, data.y, NA, NA] 
    if all data.y same: 
        return [leaf, data.y, NA, NA] 
    else
        # determine random feature i to split on
        SplitVal = (data[random,i] + data[random,i]) / 2 
        lefttree = build_tree(data[data[:,i]<=SplitVal]) 
        righttree = build_tree(data[data[:,i]>SplitVal]) 
        root = [i, SplitVal, 1, lefttree.shape[0] + 1] 
        return (append(root, lefttree, righttree))
```

In [4]:
class RTLearner:
    def __init__(self, leaf_size = 1, verbose = False):
        self.leaf_size = leaf_size 
        self.verbose = verbose
        self.tree = None # 0: feature ind, 1: split val, 2: left node, 3: right node


    def author(self):
        return 'jachaibar3'
    
    def add_evidence(self, data_x, data_y):
        self.X = data_x
        self.y = data_y
        self.data = np.concatenate((data_x, data_y), axis = 1)

    def feature_selection(self, data):
        # Return random column index of the highest correlated feature. 
        return np.random.randint(0, data.shape[1] - 1)

    def build_tree(self, data):
        if data.shape[0] <= self.leaf_size: 
            stop1 = np.array([['Leaf', data[:, -1], np.nan, np.nan]], dtype = object)
            if self.verbose: 
                print(f'Leaf found: {data.shape}, Y: {data[:, -1]}')
            return stop1
        if np.unique(data[:, -1]).shape[0] <= 1:
            stop2 = np.array([['Leaf', data[:, -1][0], np.nan, np.nan]], dtype = object)
            if self.verbose: 
                print(f'Leaf found: {data.shape}, Y: {data[:, -1]}')
            return stop2

        else:
            x_ind = self.feature_selection(data)
            split_val = np.median(data[:, x_ind]).astype(float).round(3)
            if self.verbose: 
                print(f'X vals: {data[:, x_ind][:5]} Split val: {split_val}')
            left_tree = self.build_tree(data[data[:, x_ind] <= split_val])
            right_tree = self.build_tree(data[data[:, x_ind] > split_val])
            root = np.array([[x_ind, split_val, 1, left_tree.shape[0] + 1]])
            self.tree = np.append(root, np.append(left_tree, right_tree, axis = 0), axis = 0)
            return self.tree
    
    def query(self, points):
        """
        Predict Y given the test set of X. 
        Given X (data points) evaluate the tree to return a leaf value for the prediction of Y. 
        """  
        for row in range(points.shape[0]): # For each row in the test set
            i = 0 # Start at the root 
    
            while self.tree[i, 0] != 'Leaf': # While the current node is not a leaf

                # Check the test value agianst the split value of the current node
                if points[row, int(self.tree[i, 0])] <= float(self.tree[i, 1]): 
    
                    # If the value is less, use the left node index
                    i += int(self.tree[i, 2])
                else:
    
                    # if the value is more use the right node index 
                    i += int(self.tree[i, 3])
    
            # Once the leaf is reached, assign the leaf value to the test set
            points[row, -1] = self.tree[i, 1]
        # return the leaf values for the test set 
        return points[:,-1]


random_learner = RTLearner(leaf_size=1, verbose=True)
random_learner.add_evidence(X, Y[:, np.newaxis])
ar = random_learner.build_tree(data)

# Test query
X_test = np.random.normal(-1,1, size=(1000,4))
np.unique(random_learner.query(X_test), return_counts=True)


X vals: [0.33 0.39 0.5  0.57 0.63] Split val: 0.6
X vals: [0.885 0.725 0.56  0.735] Split val: 0.73
X vals: [10.9  9.4] Split val: 10.15
Leaf found: (1, 4), Y: [6.]
Leaf found: (1, 4), Y: [5.]
X vals: [0.33 0.57] Split val: 0.45
Leaf found: (1, 4), Y: [4.]
Leaf found: (1, 4), Y: [5.]
X vals: [ 8.4 11.8 10.5 10. ] Split val: 10.25
X vals: [ 8.4 10. ] Split val: 9.2
Leaf found: (1, 4), Y: [3.]
Leaf found: (1, 4), Y: [6.]
X vals: [0.63 0.68] Split val: 0.655
Leaf found: (1, 4), Y: [8.]
Leaf found: (1, 4), Y: [7.]


  points[row, -1] = self.tree[i, 1]


(array([3., 4., 6.]), array([ 60,  38, 902]))

### Strengths and weaknesses of decision tree learners
• Cost of learning:
- Most: Decision Trees
- Medium: Linear Regression 
- Least: KNN

• Cost of query: 
- Most: KNN
- Medium: Decision Trees 
- Least: Linear Regression

• Trees: Dont have to normalize your data and can easily handle missing data. 

In [6]:
import pandas as pd 
import sys
sys.path.append("/Users/jerald/Documents/Dir/Python/Stocks")

from jetaa.sat.indicators import Indicators
from bin.main import Manager 

M = Manager('../../')

prices = M.Pricedb.ohlc('spy')
G = Indicators(prices)
df = G.get_states()['2023-01-01':]
# df = df.resample('w').last()

returns = pd.DataFrame({'returns_1d': df['Close'].pct_change(1),
                        'returns_3d': df['Close'].pct_change(3),
                        'returns_5d': df['Close'].pct_change(5),})

# 1 = Buy, 0 = Hold, 2 = Sell 
multi_class = lambda x: 1 if x > 0.01 else 0 if x < -0.01 else 2
# data = pandas.read_csv('Data/Istanbul.csv', index_col=0).reset_index(drop=True).to_numpy()
# X = data[:,:-1]
# Y = data[:,-1][:, np.newaxis]

X = df.iloc[:, 1:].to_numpy()
Y = returns.returns_1d.values[:, np.newaxis]
learner = DTLearner(leaf_size=1, verbose=True)
learner.add_evidence(X, Y)
ar = learner.build_tree(data)

Options db Connected: 2024-07-23 20:26:26.065501
Prices Connected: 2024-07-23 20:26:26.066470
