In [1]:
!pip install prettytable


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
import numpy as np
import pandas as pd
from math import sqrt
import json

def sigmoid(X):
    """Sigmoid activation function"""
    return 1 / (1 + np.exp(-X))

def relu(X):
    """ReLU activation function"""
    return np.maximum(0, X)

def tanh(X):
    """TanH activaction function"""
    return np.tanh(X)

In [3]:
class Layer:
    input_shape = None
    output_shape = None
    
    def forward():
        pass

In [4]:
from typing import Literal

class LSTM(Layer):
    class Weight:
        """Weight class for LSTM"""
        def __init__(self, f, i, c, o) -> None:
            self.f = f
            self.i = i
            self.c = c
            self.o = o

    def __init__(self, units: int) -> None:
        super().__init__()
        self.units = units

    def _compile(self, input_shape: int, from_load: bool = False):
        self.input_shape = input_shape # (Row, Feature)
        self.output_shape = self.units
        
        self.weight = self.Weight(
            np.random.randn(self.units, self.input_shape[1]),
            np.random.randn(self.units, self.input_shape[1]),
            np.random.randn(self.units, self.input_shape[1]),
            np.random.randn(self.units, self.input_shape[1]))
        
        self.bias = self.Weight(
            np.random.randn(self.units),
            np.random.randn(self.units),
            np.random.randn(self.units),
            np.random.randn(self.units)) # b
        
        self.reccurent_weight = self.Weight(
            np.random.randn(self.output_shape, self.output_shape),
            np.random.randn(self.output_shape, self.output_shape),
            np.random.randn(self.output_shape, self.output_shape),
            np.random.randn(self.output_shape, self.output_shape)) # W
        
        self.timestep = np.zeros(self.units)
        self.ct = np.zeros(self.units)
        self.ht = np.zeros(self.units)

        return self.output_shape

    def forward(self, input_data: np.ndarray):
        """Forward propagation"""
        
        # Loop through all units:
        for unit_idx in range(self.units):
            # Loop through all rows
            for row_idx in range(self.input_shape[0]):
                # Calculate forget gate
                ft = sigmoid(np.dot(self.weight.f[unit_idx], input_data[row_idx]) + np.dot(self.reccurent_weight.f[unit_idx], self.ht) + self.bias.f[unit_idx])
                # Calculate input gate
                it = sigmoid(np.dot(self.weight.i[unit_idx], input_data[row_idx]) + np.dot(self.reccurent_weight.i[unit_idx], self.ht) + self.bias.i[unit_idx])
                # Calculate candidate
                ct_ = tanh(np.dot(self.weight.c[unit_idx], input_data[row_idx]) + np.dot(self.reccurent_weight.c[unit_idx], self.ht) + self.bias.c[unit_idx])
                # Calculate output gate
                ot = sigmoid(np.dot(self.weight.o[unit_idx], input_data[row_idx]) + np.dot(self.reccurent_weight.o[unit_idx], self.ht) + self.bias.o[unit_idx])
                
                # Calculate cell state
                self.ct[unit_idx] = ft * self.ct[unit_idx] + it * ct_

                # Calculate hidden state
                self.ht[unit_idx] = ot * tanh(self.ct[unit_idx])
                self.timestep[unit_idx] += 1

        return self.ht

    def get_params_count(self):
        return self.bias.f.size * 4 + self.weight.f.size * 4 + self.reccurent_weight.f.size * 4

    def set_params(self, params):
        self.weight.f = np.array(params["W_f"])
        self.weight.i = np.array(params["W_i"])
        self.weight.c = np.array(params["W_c"])
        self.weight.o = np.array(params["W_o"])
        self.reccurent_weight.f = np.array(params["U_f"])
        self.reccurent_weight.i = np.array(params["U_i"])
        self.reccurent_weight.c = np.array(params["U_c"])
        self.reccurent_weight.o = np.array(params["U_o"])
        self.bias.f = np.array(params["b_f"])
        self.bias.i = np.array(params["b_i"])
        self.bias.c = np.array(params["b_c"])
        self.bias.o = np.array(params["b_o"])

    def get_params(self):
        params = {
            "units": self.units,
            "W_i": np.transpose(self.weight.i).tolist(),
            "W_f": np.transpose(self.weight.f).tolist(),
            "W_c": np.transpose(self.weight.c).tolist(),
            "W_o": np.transpose(self.weight.o).tolist(),
            "U_i": self.reccurent_weight.i.tolist(),
            "U_f": self.reccurent_weight.f.tolist(),
            "U_c": self.reccurent_weight.c.tolist(),
            "U_o": self.reccurent_weight.o.tolist(),
            "b_i": self.bias.i.tolist(),
            "b_f": self.bias.f.tolist(),
            "b_c": self.bias.c.tolist(),
            "b_o": self.bias.o.tolist(),
        }
        return params


if __name__ == "__main__":
    model = LSTM(1)
    model._compile((2, 2))
    model.weight.f = np.array([[0.5, 0.75]])
    model.weight.i = np.array([[0.81, 0.2]])
    model.weight.c = np.array([[0.35, 0.45]])
    model.weight.o = np.array([[0.4, 0.6]])
    model.reccurent_weight.f = np.array([[0.3]])
    model.reccurent_weight.i = np.array([[0.7]])
    model.reccurent_weight.c = np.array([[0.35]])
    model.reccurent_weight.o = np.array([[0.4]])
    model.bias.f = np.array([0.4])
    model.bias.i = np.array([0.55])
    model.bias.c = np.array([0.25])
    model.bias.o = np.array([0.5])
    print(model.forward(np.array([[0.5, 3], [1, 2]])))

    new_model = LSTM(64)
    new_model._compile((4, 5))
    print(new_model.weight.f.shape)
    print(new_model.output_shape)

[0.83602558]
(64, 5)
64


In [5]:
class Dense(Layer):
    def __init__(self, units: int, activation: Literal["sigmoid", "relu"] = "relu") -> None:
        """
        Dense layer

        Parameters
        ----------
        units : int
            Number of neuron
        activation : str
            Activation function ("sigmoid" or "relu")
        """
        self.units = units
        self.activation = activation

        self.output_shape = units

    def _compile(self, shape, from_load: bool = False):
        # Initialize weight & bias
        self.input_size = shape

        if not from_load:
            self.weights = np.random.randn(shape, self.units)
            self.bias = np.random.randn(self.units)
        
        return self.output_shape

    @staticmethod
    def __activate(X: np.ndarray, activation: Literal["sigmoid", "relu"] = "relu") -> np.ndarray:
        """Activation function

        Parameters
        ----------
        X : np.ndarray
            Input data
        activation : str
            Activation function to be used
        """
        if activation == "sigmoid":
            f = lambda x: sigmoid(x)
        elif activation == "relu":
            f = lambda x: relu(x)
        else:
            raise ValueError("Activation function not supported")
        return f(X)

    def forward(self, input_data: np.ndarray) -> np.ndarray:
        """Forward pass of Dense layer

        Parameters
        ----------
        input_data : np.ndarray
            Input data to be passed through the layer
        """
        
        # Linear transformation (dot product input x weights)
        self.input_data = input_data
        self.output = np.dot(input_data, self.weights) + self.bias
        
        # Activation function
        output = Dense.__activate(self.output, self.activation)
        
        return output

    def get_params_count(self):
        return self.input_size * self.units + self.units

    def set_params(self, params):
        self.weights = np.array(params["kernel"])
        self.bias = np.array(params["bias"])

    def get_params(self):
        params = {
            "units": self.units,
            "activation": self.activation,
            "kernel": self.weights.tolist(),
            "bias": self.bias.tolist()
        }
        return params

    def set_params(self, params):
        self.units = params["units"]
        self.activation = params["activation"]
        self.weights = np.array(params["weights"])
        self.bias = np.array(params["bias"])

In [6]:
from prettytable import PrettyTable

class Sequential:
    def __init__(self, layers: list[object] = []):
        self.layers = layers

    def add(self, layer: Layer) -> None:
        self.layers.append(layer)

    def forward(self, inp: np.ndarray) -> np.ndarray:
        for layer in self.layers:
            inp = layer.forward(inp)
        return inp

    def predict(self, img: np.ndarray) -> np.ndarray:
        return self.forward(img)

    def compile(self, input_shape, from_load: bool = False):
        assert len(self.layers) > 0, "Tambah layer terlebih dahulu"
        for layer in self.layers:
            layer._compile(input_shape, from_load)
            input_shape = layer.output_shape

    def summary(self):
        total_params = 0
        print("Model Summary:")

        table = PrettyTable(["Nama Layer", "Output Shape", "Parameter"])
        for i, layer in enumerate(self.layers):
            layer_name = layer.__class__.__name__
            output_shape = layer.output_shape if hasattr(layer, "output_shape") else None
            params = layer.get_params_count() if hasattr(layer, "get_params_count") else 0
            total_params += params
            
            table.add_row([layer_name, output_shape, params])

        print(table)
        print(f"Total Parameters: {total_params}")

    # Save Model
    def save_model(self, filepath):
        model_dict = {
            "layers": []
        }
        for layer in self.layers:
            layer_dict = {
                "type": layer.__class__.__name__,
                "params": layer.get_params()
            }
            # Ensure input_shape is saved for LSTM layers
            if isinstance(layer, LSTM):
                layer_dict["params"]["input_shape"] = layer.input_shape
            model_dict["layers"].append(layer_dict)

        with open(filepath, 'w') as file:
            json.dump(model_dict, file, indent=4)

    def load_model(self, filename):
        with open(filename, 'r') as json_file:
            model_params = json.load(json_file)

        layers = []
        for layer_params in model_params:
            layer_type = layer_params["type"]
            layer_args = layer_params["params"]

            if layer_type == "dense":
                layer = Dense(units=layer_args["units"], activation=layer_args["activation"])
            elif layer_type == "lstm":
                # Check if 'input_shape' is in layer_args before trying to use it
                if "input_shape" not in layer_args:
                    raise KeyError("input_shape is required for LSTM layers.")
                
                layer = LSTM(units=layer_args["units"])
                layer._compile(input_shape=layer_args["input_shape"])
                layer.set_params(layer_args)
            else:
                raise ValueError(f"Unsupported layer type: {layer_type}")

            if "input_shape" in layer_params:
                layer.input_shape = tuple(layer_params["input_shape"])

            layer.set_params(layer_args)
            layers.append(layer)

        return Sequential(layers)

In [7]:
# split for getting next day result
def split_sequence(sequence, n_steps):
	X, y = list(), list()
	for i in range(len(sequence)):
		# find the end of this pattern
		end_ix = i + n_steps
		# check if we are beyond the sequence
		if end_ix > len(sequence)-1:
			break
		# gather input and output parts of the pattern
		seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
		X.append(seq_x)
		y.append(seq_y)
	return np.array(X), np.array(y)

In [8]:
raw_train = pd.read_csv("TubesRNN/Train_stock_market.csv")
test = pd.read_csv("TubesRNN/Test_stock_market.csv")

train_date = raw_train["Date"]
train_date = pd.to_datetime(train_date)

train = raw_train.drop("Date", axis=1)

train['Date'] = train_date
test['Date'] = pd.to_datetime(test['Date'])

# print(test.head())
# print(train.head())

# Take relevant columns
train = train[['Date', 'Open', 'High', 'Low', 'Close']]
test = test[['Date', 'Open', 'High', 'Low', 'Close']]

In [9]:
# Experiment 1
n_steps = 10  # Number of days
X_train, y_train = split_sequence(train[['Open', 'High', 'Low', 'Close']].values, n_steps)
X_test, y_test = split_sequence(test[['Open', 'High', 'Low', 'Close']].values, n_steps)

# Create model
model = Sequential([])
model.add(LSTM(units=64))
model.add(Dense(units=32, activation="relu"))
model.add(Dense(units=4))  # 4 output

input_shape = (X_test.shape[1], X_test.shape[2])  # (n_steps, number of features)

# Compile
model.compile(input_shape, from_load=False)

In [None]:
# Experiment 2
n_steps_2 = 35  # Number of days
X_train_2, y_train_2 = split_sequence(train[['Open', 'High', 'Low', 'Close']].values, n_steps_2)
X_test_2, y_test_2 = split_sequence(test[['Open', 'High', 'Low', 'Close']].values, n_steps_2)

# Create model
model_2 = Sequential([])
model_2.add(LSTM(units=64))
model_2.add(LSTM(units=128))
model_2.add(LSTM(units=32))
model_2.add(Dense(units=32, activation="relu"))
model_2.add(Dense(units=4))  # 4 output

input_shape_2 = (X_test.shape[1], X_test.shape[2])  # (n_steps, number of features)

# Compile
model_2.compile(input_shape_2, from_load=False)

In [10]:
model.save_model('TubesRNN/exp1.json')
model_2.save_model('TubesRNN/exp2.json')
model_3.save_model('TubesRNN/exp3.json')

model_exp1 = Sequential([])
model_exp1 = model_exp1.load_model('TubesRNN/exp1.json')

model_exp2 = Sequential([])
model_exp2 = model_exp2.load_model('TubesRNN/exp2.json')

model_exp3 = Sequential([])
model_exp3 = model_exp3.load_model('TubesRNN/exp3.json')

TypeError: string indices must be integers

In [None]:
# model.summary()
model_exp1.summary()
model_exp2.summary()
model_exp2.summary()

Model Summary:


AttributeError: 'Dense' object has no attribute 'input_size'

In [None]:
predictions = []
rmses = []
for i in range(len(X_test)):
    print()
    # print("INPUT: ", X_test[i])
    prediction = model.forward(X_test[i])
    print("PREDICTION: ", prediction)
    print("ACTUAL: ", y_test[i])

    mse = (y_test[i] - prediction) ** 2
    rmse = np.sqrt(np.mean(mse))
    predictions.append(prediction)
    rmses.append(rmse)

print("Experiment 1 - Average Root Mean Squared Error (RMSE):", np.mean(rmses))


PREDICTION:  [24.06129135  2.29447884  0.          0.        ]
ACTUAL:  [3.   3.04 2.99 2.99]

PREDICTION:  [3.82425067 0.         0.         0.        ]
ACTUAL:  [3.01 3.03 2.97 3.  ]

PREDICTION:  [1.04086995 0.         0.         0.        ]
ACTUAL:  [3.  3.  2.9 2.9]

PREDICTION:  [14.86082602  0.          0.          0.        ]
ACTUAL:  [2.93 2.93 2.84 2.87]

PREDICTION:  [23.51393225  0.          0.          0.        ]
ACTUAL:  [2.82 2.85 2.8  2.8 ]

PREDICTION:  [24.33399972  0.          0.          0.        ]
ACTUAL:  [2.7  2.92 2.7  2.92]

PREDICTION:  [30.34738018  0.          0.          0.        ]
ACTUAL:  [2.92 3.   2.92 3.  ]

PREDICTION:  [6.03133162 0.         0.         0.        ]
ACTUAL:  [3.   3.   2.85 2.9 ]

PREDICTION:  [4.76813199 0.         0.         0.        ]
ACTUAL:  [2.84 3.   2.84 2.93]

PREDICTION:  [10.07627643  0.          0.          0.        ]
ACTUAL:  [3.   3.14 3.   3.  ]

PREDICTION:  [3.16104409e+01 1.58567852e-02 0.00000000e+00 0.00000000

In [None]:
loaded_model = Sequential()
loaded_model = loaded_model.load_model('TubesRNN/exp1.json')
loaded_model.compile((4, 5), True)
loaded_model.summary()

test = np.array([
    [1, 2, 3, 4, 5],
    [1, 2, 3, 4, 5],
    [1, 2, 3, 4, 5],
    [1, 2, 3, 4, 5],
])
print(loaded_model.layers[0].weight.f.shape)
print(model.forward(test))

loaded_model.save_model('TubesRNN/test.json')

Model Summary:
+------------+--------------+-----------+
| Nama Layer | Output Shape | Parameter |
+------------+--------------+-----------+
|    LSTM    |      64      |   17920   |
|   Dense    |      32      |    2080   |
|   Dense    |      4       |    132    |
+------------+--------------+-----------+
Total Parameters: 20132
(64, 5)


ValueError: shapes (4,) and (5,) not aligned: 4 (dim 0) != 5 (dim 0)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=61bb34cb-2316-475c-9595-96cf5c0e1564' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>