In [None]:
import pandas as pd
import numpy as np
import math
import random
from tqdm.auto import tqdm
import torch

In [None]:
train_df = pd.read_csv("./data/train.csv")
train_df.head()

Unnamed: 0,traffic(t-10),traffic(t-9),traffic(t-8),traffic(t-7),traffic(t-6),traffic(t-5),traffic(t-4),traffic(t-3),traffic(t-2),traffic(t-1),traffic(t),type
0,3.0,2.0,5.0,6.0,6.0,4.0,6.0,6.0,3.0,12.0,6.0,0
1,2.0,5.0,6.0,6.0,4.0,6.0,6.0,3.0,12.0,6.0,6.0,0
2,5.0,6.0,6.0,4.0,6.0,6.0,3.0,12.0,6.0,6.0,4.0,0
3,6.0,6.0,4.0,6.0,6.0,3.0,12.0,6.0,6.0,4.0,6.0,0
4,6.0,4.0,6.0,6.0,3.0,12.0,6.0,6.0,4.0,6.0,17.0,0


# Split and scale train data into stratified 5 folds

In [None]:
X,y = train_df.drop('type',axis = 1).to_numpy(),train_df['type']

In [None]:
X_scaled = (X-X.mean())/X.std()

In [None]:
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=5,random_state=38,shuffle = True)
skf.get_n_splits(X_scaled, y)

5

In [None]:
global fold_dict
fold_dict = dict()

for i, (train_index, test_index) in enumerate(skf.split(X_scaled,y)):
    fold_dict[f'fold{i}'] = {'train_indices':train_index, 'test_indices':test_index}

In [None]:
fold_dict.keys()

dict_keys(['fold0', 'fold1', 'fold2', 'fold3', 'fold4'])

In [None]:
from collections import Counter
print(Counter(y[fold_dict['fold0']['test_indices']]))
print(Counter(y[fold_dict['fold1']['test_indices']]))
print(Counter(y[fold_dict['fold2']['test_indices']]))
print(Counter(y[fold_dict['fold3']['test_indices']]))
print(Counter(y[fold_dict['fold4']['test_indices']]))

Counter({0: 1578, 1: 588, 2: 182, 3: 181})
Counter({0: 1577, 1: 589, 2: 182, 3: 181})
Counter({0: 1577, 1: 589, 3: 182, 2: 181})
Counter({0: 1577, 1: 589, 3: 182, 2: 181})
Counter({0: 1577, 1: 588, 2: 182, 3: 182})


# Model

In [None]:
import numpy as np

class rbfSVMClassifier:
    def __init__(self,n_iters=5, lr = 0.0001, random_seed=3, sigma=1):
        self.n_iters = n_iters # 몇 회 반복하여 적절한 값을 찾을지 정하는 파라미터
        self.lr = lr  # 학습률과 관련된 파라미터 
        self.sigma = sigma
        self.random_seed = random_seed
        np.random.seed(self.random_seed)

    def rbf_kernel(self,x,z):
        numer = -(x-z).dot(x-z)
        denom = 2*self.sigma**2
        return math.exp(numer/denom) 
    
    def sign(self,scalar):
        if scalar >= 0:
            return 1
        else:
            return 0
    
    def update_weight(self,x_i,y_i,i):
        summ = 0
        if i == 0:
            b_next = self.lr*(y_i-self.sign(0))
            self.weight = np.append(self.weight,b_next)
            return self.weight
        else:
            for j in range(i):
                summ += self.weight[j]*self.rbf_kernel(self.train_x[j],x_i)
            b_next = self.lr*(y_i-self.sign(summ))
            self.weight =np.append(self.weight,b_next)
            return self.weight
    
    def fit(self, x_, y_orig):
        """
        본 함수는 x, y를 활용하여 훈련하는 과정을 코딩하는 부분입니다.
        아래 reference 사이트의 gradient 계산 부분을 참고하세요.
        reference: https://towardsdatascience.com/support-vector-machine-introduction-to-machine-learning-algorithms-934a444fca47
        아래 총 6개의 None 부분을 채우시면 됩니다.

        """
        n_samples, n_features = x_.shape
        # hint: y값을 SVM 계산에 활용해주기 위하여 0에 해당하는 y값들을 -1로 변환
        y_ =  np.array(y_orig)#numpy array의 y를 y_에 담아줍니다       
        y_[y_ == 0] = -1#0에 해당하는 y_값들을 -1로 변환합니다.
        
        self.weight = np.array([])
        
        n_label = np.sum((y_==1)*1)
        
        for _ in tqdm(range(self.n_iters),leave =False, colour = "cyan", desc = f"training for {self.n_iters} iterations"):
            
            ###### Oversampling #######
            non_label_indices = np.where(y_ == -1)[0]
            label_indices = np.where(y_ != -1)[0]
            more_sample_n = len(non_label_indices)-len(label_indices)
            n_iter = _
            if more_sample_n <0:
                more_sample_indices = np.random.choice(non_label_indices,size = -more_sample_n,replace = False)
            else:
                more_sample_indices = np.random.choice(label_indices,size = more_sample_n,replace = True)
            x = np.vstack([x_,x_[more_sample_indices,:]])
            y = np.concatenate([y_,y_[more_sample_indices]])
            self.train_x = x
            self.train_y = y 
            n_samples_oversampled = len(y)
            assert Counter(y)[list(Counter(y).keys())[0]] == Counter(y)[list(Counter(y).keys())[1]]
            assert x.shape[0] == y.shape[0]
            for i in tqdm(range(n_samples_oversampled),leave = False,colour = "green",desc = "updating through all train data"):
                x_i = x[i]
                y_i = y[i]
                #### Update considering gradient
                self.weight = self.update_weight(x_i,y_i,i)
                
        return self.weight
    
    def predict_(self,x):#predict for single instance
        summ = 0
        for idx, x_ in enumerate(self.train_x):
            summ += self.weight[idx]*self.rbf_kernel(x,x_)
        return self.sign(summ)
    
    def predict(self, x):
        n_test_data = x.shape[0]
        preds = []
        for i in range(n_test_data):
            pred = self.predict_(x[i,:])
            preds.append(pred)
        preds = np.array(preds)
        return preds#예측 값을 담은 배열인 approx를 반환합니다
    
    def sigmoid(self,z):
        return 1./(1.+np.exp(-z))
    
    def get_prob_(self,x_i):
        summ = 0
        for idx, x_ in enumerte(self.train_x):
            summ += self.weight[idx]*self.rbf_kernel(x_i,x_)
        return self.sigmoid(summ)
    
    def get_prob(self,X):
        n_test_data = x.shape[0]
        probs = []
        for i in range(n_test_data):
            prob = self.get_prob_(x[i,:])
            probs.append(prob)
        probs = np.array(probs)
        return probs


    def get_accuracy(self, y_true, y_pred):
        """
            y_true, y_pred가 들어왔을 때, 정확도를 계산하는 함수.
            sklearn의 accuracy_score 사용 불가능 / sklearn의 accuracy_score 함수를 구현한다고 생각하면 됩니다.
            넘파이만을 활용하여 정확도 계산 함수를 작성하세요.
        """
        acc = np.sum(y_true == y_pred)/len(y_pred)#예측하고자 하는 전체 데이터 수 개수(len(y_pred)) 중 예측 값과 실제 값이 일치하는 데이터 개수(np.sum(y_true == y_pred))의 비율로 정확도를 계산합니다
        return acc #정확도를 담은 acc를 반환합니다


class MultiClassSVM:
    def __init__(self, kernel='rbf', C=1, gamma=0.1, n_iters = 3):
        self.kernel = kernel
        self.C = C
        self.gamma = gamma
        self.svms = []
        self.n_iters = n_iters
        self.n_classes = 4

    def fit(self, X, y):
        self.svms = []
        n_classes = self.n_classes
        for i in tqdm(range(n_classes),leave = False, colour="pink",desc = "exploring all classes.."):
            y_temp = np.where(y == i, 1, -1)
            svm = rbfSVMClassifier(lr=self.C, sigma=self.gamma, n_iters = self.n_iters)
            svm.fit(X, y_temp)
            self.svms.append(svm)
            
        assert len(self.svms) == self.n_classes
    def predict(self, X):
        predictions = np.zeros((X.shape[0], self.n_classes))
        for i, svm in enumerate(self.svms):
            predictions[:, i] = svm.predict(X)
        return np.argmax(predictions, axis=1)

# Utils

In [None]:
from collections import Counter
def train_test_binary_model(model,label):
    avg_train_acc = []
    avg_test_acc = []
    for fold in tqdm(fold_dict.values(),leave=False,desc = "5 folds..."):
        train_X,train_y = X_scaled[fold['train_indices']],y[fold['train_indices']]
        test_X,test_y = X_scaled[fold['test_indices']],y[fold['test_indices']]
        use_train_y,use_test_y = train_y,test_y
        model.fit(train_X,use_train_y)
        train_preds = model.predict(train_X)
        print("predicted train dist:",Counter(train_preds))
        assert train_preds.shape[0] == np.sum(list(Counter(train_preds).values()))
        train_acc = np.mean(train_preds == use_train_y)
        test_preds = model.predict(test_X)
        print("predicted test dist:",Counter(test_preds))
        test_acc = np.mean(test_preds == use_test_y)
        
        if test_acc < 0.85:
            print("skipping 5 folds training because acc too low!")
            print(f"test_acc: {test_acc}")
            return 0.,0.
        avg_train_acc.append(train_acc)
        avg_test_acc.append(test_acc)
        
    return np.mean(np.array(avg_train_acc)),np.mean(np.array(avg_test_acc))

In [None]:
import copy
save_models = dict()


C = [0.001,0.01,0.1,1.,5.]
gammas = [0.01,0.1,1.0]
best_param0 = dict()
best_acc_0 = 0
best_train_acc = 0

for C_ in C:
    for gamma in gammas:
        svm0 = MulticlassSVM(C=C_,gamma = gamma,n_iters=2)
        train_acc,test_acc = train_test_binary_model(svm0,0)
        print(f"(C:{C_} gamma:{gamma}) acc: {round(train_acc,4)} val acc:{round(test_acc,3)}")
        if train_acc > best_train_acc and test_acc>=best_acc_0:
            print(">>best model updated\n")
            best_acc_0 = test_acc
            best_param0['C']=C_
            best_param0['gamma'] = gamma
            best_svm0 = copy.deepcopy(svm0)
            best_train_acc = train_acc
            save_name = f"./weights/rbf_multiclass{str(best_acc_0)[:4]}.pt"
            best_param0['train_acc'] = best_train_acc
            best_param0['test_acc'] = best_acc_0
            best_param0['best_model'] = best_svm0
            torch.save(best_param0,save_name)
        if test_acc > 0.85:
            save_models[test_acc] = {'train_acc':train_acc,'model':copy.deepcopy(svm0)}

In [None]:
print("best_train_acc:",best_train_acc,"best test acc:",best_acc_0)
best_param0

In [None]:
save_name = f"./weights/oversampling_label0_{str(best_acc_0)[:4]}.pt"
best_param0['train_acc'] = best_train_acc
best_param0['test_acc'] = best_acc_0
best_param0['best_model'] = best_svm0
torch.save(best_param0,save_name)