In [7]:
import pandas as pd
import numpy as np

In [8]:
# Standardization formula: z = (x - mean) / standard_deviation

class Standardiser():
    def __init__(self) -> None:
        pass

    def fit(self, X):
        self.size = X.shape[1]
        self.mean = X.mean(axis=0)
        self.std = X.std(axis=0)
        return self     #returns self, which allows method chaining

    def transform(self,X):
        if X.shape[1] != self.size:
            raise Exception("Wrong array dimensions!")
        return (X-self.mean)/self.std


In [9]:
def shuffle_data(X, Y):
    data_num = np.arange(X.shape[0])
    np.random.shuffle(data_num)
    return X[data_num], Y[data_num]

def train_test_split(X, Y, test_size=0.5, shuffle=True):
    """
    Splits dataset into training and test sets.

    Parameters:
    ----------
    X : array-like
        Feature dataset.
    y : array-like
        Target values.
    test_size : float or int, default=0.5
        - If float (0 < test_size < 1), it represents the proportion of the dataset to include in the test split.
        - If int (1 <= test_size < len(y)), it represents the absolute number of test samples.
    shuffle : bool, default=True
        If True, shuffles data before splitting.

    Returns:
    -------
    X_train, X_test, y_train, y_test : array-like
        Split feature and target sets for training and testing."""

    if shuffle:
        X, Y = shuffle_data(X, Y)
    if test_size <1 :
        train_ratio = len(Y) - int(len(Y) *test_size)
        X_train, X_test = X[:train_ratio], X[train_ratio:]
        Y_train, Y_test = Y[:train_ratio], Y[train_ratio:]
        return X_train, X_test, Y_train, Y_test
    elif test_size in range(1,len(Y)):
        X_train, X_test = X[test_size:], X[:test_size]
        Y_train, Y_test = Y[test_size:], Y[:test_size]
        return X_train, X_test, Y_train, Y_test
    

In [10]:
class LWLR():
    def __init__(self,tau):
        self.tau = tau

    def fit(self,X, Y):
        self.dim = X.ndim
        self.X = self._add_ones(X)
        self.X_T = self.X.T
        self.Y = Y

    def predict(self, P):
        P = self._add_ones(P)
        out = []
        for p in P:
            # Vectorized calculation of weights
            W = np.exp(-np.sum((self.X - p) ** 2, axis=1) / (2 * self.tau ** 2))    #Do the sume because we have many predictors
            W_diag = np.diag(W)

            # Calculate theta using vectorized operations
            XT_W_X = self.X_T @ W_diag @ self.X
            XT_W_Y = self.X_T @ W_diag @ self.Y
            theta = np.linalg.solve(XT_W_X, XT_W_Y) #Solving XT_W_X @ theta = XT_W_Y is faster and more stable than calculating np.linalg.inv(XT_W_X) @ XT_W_Y

            # Make prediction
            out.append((theta @ p).squeeze())
        return np.array(out)

    def change_tau(self,tau):
        self.tau = tau

    def _add_ones(self, X):
        if type(X) != np.ndarray:
            X = np.array(X)
        return np.concatenate((np.ones((len(X),1)), X.reshape(len(X),-1)), axis=1)

In [11]:
def cross_val_split(X, Y, num_folds):
    fold_size = len(X)//num_folds
    X_splits, Y_splits = [], []

    for i in range(num_folds):
        start_index = i * fold_size
        if i == num_folds-1:
            X_splits.append(X[start_index:])
            Y_splits.append(Y[start_index:])
        else:
            X_splits.append(X[start_index:start_index + fold_size])
            Y_splits.append(Y[start_index:start_index + fold_size])

    return X_splits, Y_splits

def cross_val_score(model, X, Y, num_folds=5, scoring='f1_macro'):

    X_splits, Y_splits = cross_val_split(X, Y, num_folds)

    errors_train = []  
    errors_valid = []

    for i in range(num_folds):
        X_train_data = np.concatenate([fold for i, fold in enumerate(X_splits) if i != 3])
        Y_train_data = np.concatenate([fold for i, fold in enumerate(Y_splits) if i != 3])
        X_valid_data = X_splits[i]
        Y_valid_data = Y_splits[i]

        model.fit(X_train_data, Y_train_data)
        prediction_train = model.predict(X_train_data)
        prediction_test = model.predict(X_valid_data)
        # Calculating mean square error
        mse_train = np.sqrt((np.square(prediction_train - Y_train_data)).mean())
        mse_valid = np.sqrt((np.square(prediction_test - Y_valid_data)).mean())

        errors_train.append(mse_train)
        errors_valid.append(mse_valid) 

    return errors_train, errors_valid
    

In [12]:
#Read the data
dile_name = 'data-reg.csv'
# dile_name = 'C:/Users/USER/Desktop/Masinsko/Domaci/1/data.csv'
raw_data = pd.read_csv(dile_name, header=None).to_numpy()

X = raw_data[:,:5]
Y = raw_data[:,-1]

# print(f"Mean of all predictors:\n{X.mean(axis=0)}")
# print(f"Mean of all predictors standardised:\n{X_scaled.mean(axis=0)}")

# print(raw_data[1:5,:])

X_train, X_test, Y_train, Y_test = train_test_split(X,Y, test_size=0.2, shuffle=False)

scaler = Standardiser().fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)

# X_split, Y_split = cross_val_split(X_train_scaled, Y_train, 4)


tau_opt = 1.7
lwlr = LWLR(tau_opt)
# lwlr.fit(X_train_scaled,Y_train)

errors_train, errors_valid = cross_val_score(lwlr, X_train_scaled, Y_train, num_folds=5)

# Generating prediction
prediction_train = lwlr.predict(X_train_scaled)
prediction_test = lwlr.predict(X_test_scaled)

# Calculating mean square error
mse_train = np.sqrt((np.square(prediction_train - Y_train)).mean())
mse_test = np.sqrt((np.square(prediction_test - Y_test)).mean())

print("Mean square error on training data:")
print("{:.2f}".format(mse_train))
print("Mean square error on test data:")
print("{:.2f}".format(mse_test))


Mean square error on training data:
5.29
Mean square error on test data:
5.85
