In [None]:
%reset -f

In [1]:
from sklearn.datasets import fetch_california_housing

california_housing = fetch_california_housing()

print(california_housing.data.shape)
print(california_housing.target.shape)

print(california_housing.feature_names)
print(california_housing.target_names)

print(california_housing.DESCR)

X = california_housing.data
y = california_housing.target


(20640, 8)
(20640,)
['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup', 'Latitude', 'Longitude']
['MedHouseVal']
.. _california_housing_dataset:

California Housing dataset
--------------------------

**Data Set Characteristics:**

    :Number of Instances: 20640

    :Number of Attributes: 8 numeric, predictive attributes and the target

    :Attribute Information:
        - MedInc        median income in block group
        - HouseAge      median house age in block group
        - AveRooms      average number of rooms per household
        - AveBedrms     average number of bedrooms per household
        - Population    block group population
        - AveOccup      average number of household members
        - Latitude      block group latitude
        - Longitude     block group longitude

    :Missing Attribute Values: None

This dataset was obtained from the StatLib repository.
https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html

The target variable 

In [2]:
import pandas as pd
import numpy as np
#import jsonpickle

class Node:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, value=None):
        self.feature_index = feature_index    # 특징(Feature)의 인덱스
        self.threshold = threshold            # 분할 기준값
        self.left = left                      # 왼쪽 서브트리
        self.right = right                    # 오른쪽 서브트리
        self.value = value                    # 리프 노드의 값

    def to_dict(self):
        return {
            'feature_index': self.feature_index,
            'threshold': self.threshold,
            'left': self.left.to_dict() if isinstance(self.left, Node) else self.left,
            'right': self.right.to_dict() if isinstance(self.right, Node) else self.right,
            'value': self.value
        }

    @classmethod
    def from_dict(cls, data):
        if data is None:
            return None
        return cls(
            feature_index = data['feature_index'],
            threshold = data['threshold'],
            left = cls.from_dict(data['left']),
            right = cls.from_dict(data['right']),
            value = data['value']
        )

# Standard CART (Classification And Regression Tree)
class RegressionTree:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree = None

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.to_numpy()
        if isinstance(y, pd.Series):
            y = y.values
        self.tree = self._build_tree(X, y, depth=0)

    def _variance_reduction(self, y, left_y, right_y):
        variance = np.var(y)

        left_var = np.var(left_y)
        right_var = np.var(right_y)

        frac_left = len(left_y) / len(y)
        frac_right = len(right_y) / len(y)

        return variance - (frac_left * left_var + frac_right * right_var)

    def _build_tree(self, X, y, depth):
        n_samples, n_features = X.shape
        variance = np.var(y)

        if variance == 0 or depth == self.max_depth or n_samples < self.min_samples_split:
            return np.mean(y)

        best_feature, best_threshold = None, None
        best_gain = -np.inf

        for feature_index in range(n_features):
            thresholds = np.unique(X[:, feature_index])
            for threshold in thresholds:
                left_indices = X[:, feature_index] < threshold
                right_indices = ~left_indices

                if len(y[left_indices]) > 0 and len(y[right_indices]) > 0:
                    gain = self._variance_reduction(y, y[left_indices], y[right_indices])
                    if gain > best_gain:
                        best_gain = gain
                        best_feature = feature_index
                        best_threshold = threshold

        if best_gain == -np.inf:            # 분할 시 Gain이 없음
            return np.mean(y)

        left_indices = X[:, best_feature] < best_threshold
        right_indices = ~left_indices
        left = self._build_tree(X[left_indices], y[left_indices], depth + 1)
        right = self._build_tree(X[right_indices], y[right_indices], depth + 1)

        return Node(feature_index=best_feature, threshold=best_threshold, left=left, right=right)

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values  # DataFrame을 NumPy 배열로 변환
        return np.array([self._predict_tree(x, self.tree) for x in X])

    def _predict_tree(self, x, node):
        if isinstance(node, float):  # Leaf인 경우
            return node
        if x[node.feature_index] < node.threshold:
            return self._predict_tree(x, node.left)
        else:
            return self._predict_tree(x, node.right)

    def to_dict(self):
        return self.tree.to_dict()

    def export_tree_as_dot(self, feature_names=None):
        if feature_names is None:
            feature_names = [f"Feature {i}" for i in range(len(self.tree.feature_index))]

        dot_data = "digraph Tree {\nnode [shape=box] ;\n"
        dot_data += self._build_dot(self.tree, feature_names)
        dot_data += "}"
        return dot_data

    def _build_dot(self, node, feature_names):
        dot = ""
        if isinstance(node, Node):
            if node.feature_index is not None and node.feature_index < len(feature_names) and feature_names[node.feature_index] is not None:
                if node.value is not None:
                    dot += f'N{hash(node)} [label="{feature_names[node.feature_index]} <= {node.threshold:.2f}\\nvariance reduction: {node.value:.4f}"] ;\n'
                else:
                    dot += f'N{hash(node)} [label="{feature_names[node.feature_index]} <= {node.threshold:.2f}"] ;\n'
            else:
                dot += f'N{hash(node)} [label="value: {node.value:.2f}"] ;\n'

            if node.left is not None:
                dot += f'N{hash(node)} -> N{hash(node.left)} [label="True"] ;\n'
                dot += self._build_dot(node.left, feature_names)
            if node.right is not None:
                dot += f'N{hash(node)} -> N{hash(node.right)} [label="False"] ;\n'
                dot += self._build_dot(node.right, feature_names)
        else:
            if node is not None:
                dot += f'N{hash(node)} [label="value: {node:.2f}"] ;\n'

        return dot


In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error

# k-fold 교차 검증 설정 (2만개 당 fold 1개)
k = 10
kf = KFold(n_splits=k, shuffle=True, random_state=42)
mse_scores = []

# 트리 하이퍼파라미터
tree_depth = 5

# k-fold 교차 검증 실행
predictions = []
reg_tree = []

# 예측 결과와 실제 값 수집
all_y_true = []
all_y_pred = []

for train_index, test_index in kf.split(X):
    X_train, X_test = X[train_index], X[test_index]

    # 입력과 타겟 분리
    y_train, y_test = y[train_index], y[test_index]

    # 모델 생성 및 학습
    regression_tree = RegressionTree(max_depth=tree_depth, min_samples_split=5)
    regression_tree.fit(X_train, y_train)
    reg_tree.append(regression_tree)

    # 예측
    y_pred = regression_tree.predict(X_test)

    # MSE 계산 및 저장
    mse = mean_squared_error(y_test, y_pred)
    mse_scores.append(mse)
    print(f'fold RMSE: {np.sqrt(mse)}')

    # 각 폴드의 예측값과 실제값을 튜플로 저장
    predictions.append((X_train, y_pred))

# k-fold 교차 검증 결과 출력
mean_mse = np.mean(mse_scores)
std_mse = np.std(mse_scores)
print(f"\nRMSE: {np.sqrt(mean_mse)}")
print(f"Mean MSE: {mean_mse}")
print(f"Std MSE: {std_mse}")
