In [1]:
import pandas as pd

import numpy as np
import numpy.linalg as lin
from numpy.linalg import LinAlgError
from itertools import combinations_with_replacement, combinations

Polynomial Features

In [2]:
#쓰는 메소드만 만든다.
class PolynomialFeatures:
    '''
    다항식으로 feature space 변환
    쓰여진 모듈
    1> itertools.combinations_with_replacement, combinations
    2> numpy
    '''
    __slots__ = ['degree', 'interaction_only', 'include_bias', 'self_only']
    def __init__(self, degree = 2, interaction_only = False, self_only = False, include_bias = True):
        self.degree = degree
        self.interaction_only = interaction_only
        self.include_bias = include_bias
        self.self_only = self_only
        
        if (self.self_only == True) and (self.interaction_only == True):
            raise Exception("either self_only or interaction_only can be True")
        
        if self.degree > 8: 
            raise Exception("Too high degree!!")
        
            
    
    def fit_transform(self, X):
        #1. 열 별로 추출
        col_list = [col for col in X.T]
        
        #2. 열 별로 deg에 맞춰 새로운 X를 생성한다.
        result_X = None
        
        for deg in range(1,self.degree+1): #self.degree까지 포함되어야 한다.
            if deg == 1:
                result_X = self.__combination(col_list, deg)
            else:
                result_X = np.vstack( (result_X, self.__combination(col_list, deg)) ) 
        
       # model에서 bias를 따로 할당해준다.
       # if self.include_bias == True:
       #     result_X = np.vstack( (np.ones(col_list[0].shape),result_X) )
        return result_X.T
    
    def __combination(self, col_list, n): #deg n 의 조합들 생성
        
        combination_cols = []
        result = None
        
        if self.interaction_only == True:
            combination_cols = combinations(col_list, n)
        
        elif self.self_only == True:
            for col in col_list:
                temp = [col]*n
                combination_cols.append(temp)
        
        else:
            combination_cols = combinations_with_replacement(col_list, n)
        
        
        for idx, combination in enumerate(combination_cols):
            current_col = np.full(col_list[0].shape, 1.0)
            for col in combination:
                current_col*=col
        
            if idx == 0:
                result = current_col
            else :
                result = np.vstack((result, current_col))        
        return result #return = ndarray type

1. KNN Regression

In [3]:
class KNNRegressor:
    def __init__(self, n=2, copy_train = True):
        self.n = n
        self.copy_train = copy_train
    
    def fit(self, X, y):
        if self.copy_train == True:
            self.train_X = np.copy(X)
            self.train_y = np.copy(y)
        else:
            self.train_X = X
            self.train_y = y
    
    def predict(self, X):
        distance = lambda x : (x).dot(x)
        neighbors = [(distance(X - data), self.train_y[idx]) for idx, data in enumerate(self.train_X)]
        #neighbors.sort() #거리순 정렬
        selected = []
        for _ in range(self.n):
            min_idx = np.argmin(test, axis = 0)[0]
            selected.append(neighbors[min_idx])
            del neighbors[min_idx]
        
        neighbors = selected
        
        if self.n == 1:
            return neighbors[0][1]
        else:
            neighbors = neighbors[:self.n]
            #거리 구하기 + 예측하기
            dist = np.array([d for d, _ in neighbors])
            label = np.array([label for _, label in neighbors])
        
            prop = dist/np.sum(dist) #weighted average를 위해
            predict_label = prop.dot(label)
        
            return predict_label
    
    def score(self, X, y):
        y_pred = np.array([self.predict(data) for data in X])
        
        return self.__r2_score(y,y_pred)
    
    def __r2_score(self, y_true, y_pred):
        SS_total = 0
        SS_reg = 0
    
        for i in range(len(y_true)):
            x = (y_true[i] - y_pred[i])**2
            SS_reg = SS_reg+x
    
        y_true_mean = np.mean(y_true)
        for i in range(len(y_true)):
            x = (y_true[i]-y_true_mean)**2
            SS_total = SS_total+x
        return 1 - (SS_reg/SS_total)

2. Linear Regression Model

In [4]:
from abc import *
class base_linear_model(ABC):
    def __init__(self, alpha = 0.01, copy_X = True, normalize = True, fit_intercept = True, max_iter = None):
        self.alpha = alpha
        self.copy_X = copy_X
        self.normalize = normalize
        self.fit_intercept = fit_intercept
        self.max_iter = max_iter
        self.train_mean = None
        self.train_std = None
        
    @abstractmethod
    def fit(self,X,y):
        pass
    @abstractmethod
    def predict(self, X):
        pass
    @abstractmethod
    def score(self, X,y):
        pass        
    
    @abstractmethod
    def gradient_descent(self, X, y):
        pass
    
    @abstractmethod
    def gradient(self, X, y, w):
        pass
    def normalize(self, data):
        data = (data - self.train_mean)/self.train_std
        return data
    
    def r2_score(self, y_true, y_pred):
        SS_total = 0
        SS_reg = 0
    
        for i in range(len(y_true)):
            x = (y_true[i] - y_pred[i])**2
            SS_reg = SS_reg+x
    
        y_true_mean = np.mean(y_true)
        for i in range(len(y_true)):
            x = (y_true[i]-y_true_mean)**2
            SS_total = SS_total+x
        return 1 - (SS_reg/SS_total)

2-1: Ridge Regression

In [5]:
class Ridge(base_linear_model):
    '''
    alpha: 제약 정도
    normalize: data를 정규화 할 것인가
    max_iter: 경사하강법 사용시, 반복의 정도
    '''
    def __init__(self, alpha = 0.01, copy_X = True, normalize = False, max_iter = 1000):
        super().__init__(alpha = alpha, copy_X = copy_X, normalize = normalize, max_iter = max_iter)
    
    
    def fit(self, X, y):
        self.weight = None
        
        if self.copy_X == True:
            self.X_train = np.copy(X)
        else:
            self.X_train = X
        
        if self.normalize == True:
            self.train_mean = self.X_train.mean()
            self.train_std = self.X_train.std()
            
            self.X_train = super().normalize(self.X_train)
        
        self.X_train = np.concatenate((self.X_train, np.ones((self.X_train.shape[0],1))), axis = 1)
        
        try :
            XT = self.X_train.T
            step1 = lin.inv(XT@self.X_train + self.alpha*(np.eye(XT.shape[0])))
            self.weight = step1@XT@y
        except LinAlgError as e: #inv를 구할 수 없다. ==> 경사하강법 사용
            
            self.weight = np.random.randn(self.X_train.shape[1]) #weight initialize
            self.weight = self.gradient_descent(self.X_train,y) #경사하강법으로 찾는다.
        
        return self
    
    def predict(self, X):
        if self.normalize == True:
            X = super().normalize(X)
        
        if len(X.shape) <= 1:
            X = np.hstack((X, np.array([1])))
        else:
            X = np.concatenate((X, np.ones((X.shape[0],1))), axis = 1)
        
        return np.dot(X,self.weight)
    
    def score(self, X, y):
        predict_y = self.predict(X)
        score = super().r2_score(y, predict_y)
        return score
    
    def gradient_descent(self, X,y):
        #heavy ball method
        #1. 초기값 정하기
        tolerence = 1.0e-2
        bethas = np.logspace(-2,0,3) #중력
        steps = np.logspace(-8,0,11) #이동 거리
        prv_weight = np.copy(self.weight) #중력 방향
        
        for _ in range(self.max_iter):
            gradient = self.gradient(X,y, self.weight)
            direction = (-1) * (gradient/np.sqrt(gradient.dot(gradient)))
            
            #2. 종료조건 탐색 ==> 기울기가 0에 수렴하는가.
            if gradient.dot(gradient) < tolerence:
                return self.weight
            
            #3. 다음 위치 찾기
            next_weights = [(self.weight + step*direction + betha*(self.weight - prv_weight)) for step in steps for betha in bethas]
            
            next_gradients = [self.gradient(X,y,w) for w in next_weights]
            
            next_gradients = [g.dot(g) for g in next_gradients]
            
            min_idx = np.argmin(next_gradients)
            
            #4.update
            prv_weight = np.copy(self.weight) #중력 방향
            self.weight = next_weights[min_idx]
                
        return self.weight
            
    
    def gradient(self, X, y, w):
        X_T = X.T
        #print('X_T@X: ',X_T@X)
        #print('X_T@y: ', X_T@y)
        return -2*( X_T@y - ( X_T@X + self.alpha*np.eye(len(w)) )@w )    

In [6]:
#poly = PolynomialFeatures(degree = 5, interaction_only = False, self_only = False)
#model = Ridge(normalize = True, alpha = 0.01, max_iter = 10000)
model = KNNRegressor(n=5)

In [7]:
train_data = pd.read_csv('~/ml/data/test3_data/test3_data_train.csv')
test_data = pd.read_csv('~/ml/data/test3_data/test3_data_test.csv')

In [8]:
target = ['prop_log_historical_price']

1. 점수 내기

In [9]:
train_data_price = train_data.drop(target, axis = 'columns').values
train_target_price = train_data[target[0]].values

In [10]:
#train_data_price = poly.fit_transform(train_data_price.values)

In [11]:
model.fit(train_data_price, train_target_price)

In [12]:
test_data_price = test_data.drop(target, axis = 'columns').values
test_target_price = test_data[target[0]].values
#test_data_price = poly.fit_transform(test_data_price.values)

In [13]:
model.score(test_data_price, test_target_price)

NameError: name 'test' is not defined

<경사하강법>

시도 1: 단순 경사하강법<step 1개> << w <= w + step * direction >>
결과 1: optimal을 찾는데 실패했고, 0.32정도의 결과를 얻었다.
기타 1:
        > 기울기를 구하는데 있어, X가 정규화 되지 않으면, 적합한 해를 구할 수 없음을 알았다
             > normalize를 하지 않았을 때, 약 1.0e-300정도의 r2-score를 얻었고, normalize를 했을 때, 0.32정도의 r2-score를 얻었다.
        > 종료 조건을 w의 각 entry가 tolerance보다 작은지 비교하는 형태를 취하였다.
             > 종료 조건의 문제인지 iteration을 도는 동안 optimal을 얻지 못한 것 같다.
        > step을 키웠을 때, overflow error가 발생하였다.
             > gradient의 크기가 매우 커져, weight 자체를 구할 수 없는 상황이 있었다
        > max_iter를 증가해도, optimal을 찾지 못하는 것으로 보아, 구현 자체에 문제가 있다고 판단된다.
             
시도 2: 단순 경사하강법<step 여러개>
결과 2: optimal을 찾는데 실패했고, 시도 1보다 약간 더 좋은 결과를 얻었지만, 여전히 optimal 값을 얻지 못했다.
기타 2: 
        > step을 여러 개 두어, 다음 weight의 위치를 더 좋게 만들고자 했으며, 이를 위해 복잡도를 약간 희생했다.(11정도의 상수가 곱해짐)
        > overflow error가 발생하는 것을 해결하지 못했다.
        > 최저점을 찾는데 있어, 종료 조건으로 vector size < tolerance<약 1.0e-4>라고 해야 함을 알았다
            >이 때, g의 내적을 한 뒤, 제곱근을 취하는 대신, 그냥 tolerance를 1.0e-2로 두어 계산량을 줄였다.
        > max_iter를 증가해도, optimal을 찾지 못하는 것으로 보아, 구현 자체에 문제가 있다고 판단된다.
        
시도 3: heavy ball method<step여러개, betha 한개> << w <= w + step * direction + betha * (w - prv_w) >>
결과 3: optimal을 찾는데 실패했고, 시도 2보다 약간 더 좋은 결과를 얻었다.
기타 3: 
        > direction을 찾는데 있어 치명적인 실수가 있었음을 발견했다.
              > 단순히 gradient를 방향으로 치환하여 사용한 오류를 발견했고, 이후, 단위벡터로 치환하여 이용하였다.
              > overflow error가 사라졌다.
        > max_iter를 증가하면, r2 score가 올라가긴 하지만, 시간이 너무 오래 걸리는 단점이 있다.
              > step의 크기가 부정확한 문제가 있는 것으로 판단된다.

시도 4: heavy ball method<<step 여러개, betha 여러개>>
결과 4: optimal을 찾는데 성공했다.
기타 4: 
        > betha와 step을 여러개 두어, 다음의 최적 지점을 보다 잘 찾을 수 있도록 만들었다.
        > initial weight와 optimal weight를 보았을 때, 큰 크기의 수 변화가 일어나지 않아 step의 max를 1로 두었다.
        > max_iter를 전부 순회하기 전, optimal을 찾을 수 있었다.

<최종 분석>
1. 단순 경사하강법 보다, heavy ball method가 더 좋은 결과를 도출했다.
2. 실제 구현에 있어 데이터의 전처리<normalize>가 중요함을 알게 되었다.
3. 방향을 찾는데 있어, 단위 벡터로 해야 하는 이유를 알게 되었다<overflow error가 발생하고, 단순 gradient는 해당 방향으로 매번 일정하게 이동함을 보장하지 않는다>
4. ridge regression의 경우 loss function이 convex임을 미리 알고 있었기 때문에, FONC조건과 cvx의 성질을 이용하여 단순히 gradient size == 0일 때, optimal임을 받아들였다.