In [114]:
import numpy as np
import matplotlib.pyplot as plt

In [115]:
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
cancer = load_breast_cancer()

x = cancer.data
y = cancer.target

x_train_all, x_test, y_train_all, y_test = train_test_split(x, y, stratify=y, 
                                                           test_size=0.2, random_state=42)

In [116]:
class SingleLayer:

    def __init__(self, learning_rate=0.1, l1=0, l2=0):
        self.w = None
        self.b = None
        self.losses = []
        self.val_losses = []       # 추가 05_02
        self.w_history = []        # 추가 05_01
        self.lr = learning_rate    # 추가 05_01, 가중치의 업데이트 양을 조절
                                   # 손실 함수의 표면을 천천히 이동하며 전역 최솟값을 찾는다.
        self.l1 = l1
        self.l2 = l2        

    def forpass(self, x):
        z = np.sum(x * self.w) + self.b
        return z

    def backprop(self, x, err):
        w_grad = x * err
        b_grad = 1 * err
        return w_grad, b_grad

    def activation(self, z):
        z = np.clip(z, -100, None)
        a = 1 / (1 + np.exp(-z))
        return a

    def fit(self, x, y, epochs=100, x_val=None, y_val=None):# 검증 세트를 전달받을 수 있도록 매개변수 추가
        self.w = np.ones(x.shape[1])
        self.b = 0
        self.w_history.append(self.w.copy())
        np.random.seed(42)
        for i in range(epochs):
            loss = 0
            # 인덱스를 섞는다
            indexes = np.random.permutation(np.arange(len(x)))
            for i in indexes:                               # 모든 샘플에 대해 반복한다.
                z = self.forpass(x[i])                      # 정방향 계산
                a = self.activation(z)                      # 활성화 함수 적용
                err = -(y[i] - a)                           # 오차 계산
                w_grad, b_grad = self.backprop(x[i], err)   # 역방향 계산
                w_grad += self.l1 * np.sign(self.w) + self.l2 * self.w
                self.w -= self.lr * w_grad                  # 가중치 업데이트(학습률 적용) 
                self.b -= b_grad                            # 절편 업데이트
                # 가중치를 기록한다
                self.w_history.append(self.w.copy())
                # 안전한 로그 계산을 위해 클리핑한 후 손실을 누적한다.
                a = np.clip(a, 1e-10, 1-1e-10)
                loss += -(y[i]*np.log(a)+(1-y[i])*np.log(1-a))
            # 에포크마다 평균 손실을 저장한다
            self.losses.append(loss/len(y) + self.reg_loss())
            # 검증 세트에 대한 손실을 계산한다
            self.update_val_loss(x_val, y_val)

    def predict(self, x):
        z = [self.forpass(x_i) for x_i in x]        
        return np.array(z) > 0

    def score(self, x, y):
        return np.mean(self.predict(x) == y)
    
    # 검증 손실 계산하기
    def update_val_loss(self, x_val, y_val):
        if x_val is None:
            return
        val_loss = 0
        for i in range(len(x_val)):
            z = self.forpass(x_val[i])                       # 정방향 계산
            a = self.activation(z)                           # 활성화 함수 적용
            a = np.clip(a, 1e-10, 1-1e-10)
            val_loss += -(y_val[i]*np.log(a)+(1-y_val[i])*np.log(1-a))    # 손실 함수 
        self.val_losses.append(val_loss/len(y_val) + self.reg_loss())
        
    def reg_loss(self):
        return self.l1 * np.sum(np.abs(self.w) + self.l2 / 2 * np.sum(self.w**2))    
    

In [None]:
## k-폴드 교차 검증을 구현한다
# 훈련 세트 사용하기

In [117]:
validation_scores = []

In [None]:
# k-폴드 교차 검증 구현

In [118]:
k = 10
bins = len(x_train_all) // k

for i in range(k):
    start = i*bins
    end = (i+1)*bins
    val_fold = x_train_all[start:end]
    val_target = y_train_all[start:end]
    
    train_index = list(range(0, start)) + list(range(end, len(x_train_all)))
    train_fold = x_train_all[train_index]
    train_target = y_train_all[train_index]
    
    train_mean = np.mean(train_fold, axis=0)
    train_std = np.std(train_fold, axis=0)
    train_fold_scaled = (train_fold - train_mean) / train_std
    val_fold_scaled = (val_fold - train_mean) / train_std
    
    lyr = SingleLayer(l2=0.01, l1=0)
    lyr.fit(train_fold_scaled, train_target, epochs=50)
    score = lyr.score(val_fold_scaled, val_target)
    validation_scores.append(score)
    
print(np.mean(validation_scores))

0.9711111111111113


In [None]:
## 사이킷런으로 교차 검증을 한다.
# cross_validate() 함수로 교차 검증 점수 계산

In [119]:
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import cross_validate
sgd = SGDClassifier(loss='log', penalty='l2', alpha=0.001, random_state=42)
scores = cross_validate(sgd, x_train_all, y_train_all, cv=10)
print(np.mean(scores['test_score']))

0.850096618357488


In [None]:
## 전처리 단계를 포함해서 교차 검증을 수행한다.
# Pipeline 클래스 사용 교차 검증 수행

In [120]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
pipe = make_pipeline(StandardScaler(), sgd)
scores = cross_validate(pipe, x_train_all, y_train_all, cv=10, return_train_score=True)
print(np.mean(scores['test_score']))

0.9694202898550724


In [121]:
print(np.mean(scores['train_score']))

0.9875478561631581
