### iteration시마다 일정한 batch 크기만큼의 데이터를 random하게 가져와서 GD를 수행하는 Mini-Batch GD 수행

In [11]:
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import pandas as pd

boston_df = pd.read_csv("./boston_price.csv")


# 학습과 테스트용 feature와 target 분리. 
def get_scaled_train_test_feature_target_ts(data_df):
    # RM, LSTAT Feature에 Scaling 적용
    scaler = MinMaxScaler()
    scaled_features_np = scaler.fit_transform(data_df[['RM', 'LSTAT']])
    # 학습 feature, 테스트 feature, 학습 target, test_target으로 분리. 
    tr_features, test_features, tr_target, test_target = train_test_split(scaled_features_np, 
                                                                          data_df['PRICE'].values, 
                                                                          test_size=0.3, random_state=2025)
    # 학습 feature와 target을 tensor로 변환. 
    tr_ftr_ts = torch.from_numpy(tr_features)
    tr_tgt_ts = torch.from_numpy(tr_target)
    test_ftr_ts = torch.from_numpy(test_features)
    test_tgt_ts = torch.from_numpy(test_target)
    
    return tr_ftr_ts, tr_tgt_ts, test_ftr_ts, test_tgt_ts

tr_ftr_ts, tr_tgt_ts, test_ftr_ts, test_tgt_ts = get_scaled_train_test_feature_target_ts(data_df=boston_df)

print(f"tr_ftr_ts shape:{tr_ftr_ts.shape} tr_tgt_ts shape:{tr_tgt_ts.shape}")
print(f"test_ftr_ts shape:{test_ftr_ts.shape} test_tgt_ts shape: {test_tgt_ts.shape}")

# 방식
batch_indexes = torch.randint(0, 300, size=(30,)) # random 30개 추출
print(batch_indexes)

tr_ftr_ts[batch_indexes, 0]

tr_ftr_ts shape:torch.Size([354, 2]) tr_tgt_ts shape:torch.Size([354])
test_ftr_ts shape:torch.Size([152, 2]) test_tgt_ts shape: torch.Size([152])
tensor([193,  86, 232, 157,  68, 248, 112, 281, 288, 118, 162,  80, 225, 226,
         54, 238, 184,   2,  20, 291, 139, 210, 230, 199, 226, 299, 166,   3,
         60, 293])


tensor([0.2780, 0.5250, 0.3784, 0.4917, 0.5662, 0.6930, 0.4871, 0.5133, 0.5725,
        0.5281, 0.6049, 0.6848, 0.9075, 0.4949, 0.3451, 0.5739, 0.2705, 0.5880,
        0.5405, 0.8055, 0.5857, 0.6551, 0.6603, 0.3754, 0.4949, 0.5499, 0.4681,
        0.4263, 0.5509, 0.2935], dtype=torch.float64)

In [12]:
import torch
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

scaler = MinMaxScaler()
scaled_features_np = scaler.fit_transform(boston_df[['RM', 'LSTAT']])

print(scaled_features_np)

tr_features, test_features, tr_target, test_target = train_test_split(scaled_features_np, boston_df['PRICE'].values, 
                                                                      test_size=0.3, random_state=2025)

[[0.57750527 0.08967991]
 [0.5479977  0.2044702 ]
 [0.6943859  0.06346578]
 ...
 [0.65433991 0.10789183]
 [0.61946733 0.13107064]
 [0.47307913 0.16970199]]


In [13]:
def get_update_weights_value_batch(bias, w1, w2, rm_batch, lstat_batch, target_batch, learning_rate=0.01):
    # 데이터 건수
    N = target_batch.shape[0]
    # 예측 값. 
    predicted_batch = w1 * rm_batch + w2 * lstat_batch + bias
    # 실제값과 예측값의 차이
    diff_batch = target_batch - predicted_batch 
    
    # weight와 bias를 얼마나 update할 것인지를 계산.  
    w1_update = -(2/N) * learning_rate * (torch.matmul(rm_batch, diff_batch))
    w2_update = -(2/N) * learning_rate * (torch.matmul(lstat_batch, diff_batch))
    bias_update = -(2/N) * learning_rate * torch.sum(diff_batch)
    
    # weight와 bias가 update되어야 할 값 반환. 
    return bias_update, w1_update, w2_update

In [14]:
# batch_gradient_descent()는 인자로 batch_size(배치 크기)를 입력 받음. 
def batch_random_gradient_descent(features, target, iter_epochs=5000, batch_size=30, verbose=True):
    # random seed 값 설정. 
    torch.manual_seed(2025)
    w1 = torch.zeros(1, dtype=torch.float32)
    w2 = torch.zeros(1, dtype=torch.float32)
    bias = torch.ones(1, dtype=torch.float32)
    print('최초 w1, w2, bias:', w1.item(), w2.item(), bias.item())
    
    learning_rate = 0.01
    rm = features[:, 0]
    lstat = features[:, 1]
    
    # iter_epochs 수만큼 반복하면서 weight와 bias update 수행. 
    for i in range(1, iter_epochs+1):
        # 이렇게 하면 겹치는 부분이 나올 수 있음
        batch_indexes = torch.randint(0, target.shape[0], size=(batch_size,))
        rm_batch = rm[batch_indexes]
        lstat_batch = lstat[batch_indexes]
        target_batch = target[batch_indexes]
        # Batch GD 기반으로 Weight/Bias의 Update를 구함. 
        bias_update, w1_update, w2_update = get_update_weights_value_batch(bias, w1, w2, 
                                                                           rm_batch, lstat_batch, 
                                                                           target_batch, learning_rate)
        
        # Batch GD로 구한 weight/bias의 update 적용. 
        w1 = w1 - w1_update
        w2 = w2 - w2_update
        bias = bias - bias_update
        if verbose: # 100회 iteration 시마다 출력
            if i % 100 == 0:
                print(f'Epoch: {i}/{iter_epochs}')
                # Loss는 전체 학습 데이터 기반으로 구해야 함. 아래는 전체 학습 feature 기반의 예측 및 loss임.  
                predicted = w1 * rm + w2*lstat + bias
                diff = target - predicted
                loss = torch.mean(diff ** 2)
                print(f'w1: {w1.item()}, w2: {w2.item()}, bias: {bias.item()}, loss: {loss.item()}')
        
    return w1, w2, bias

In [15]:
# 학습 feature와 target으로 Stochastic Gradient Descent 수행. 
w1, w2, bias = batch_random_gradient_descent(tr_ftr_ts, tr_tgt_ts, iter_epochs=5000, batch_size=30, verbose=True)
print('##### 최종 w1, w2, bias #######')
print(w1, w2, bias)

최초 w1, w2, bias: 0.0 0.0 1.0
Epoch: 100/5000
w1: 9.532495498657227, w2: 1.938775897026062, bias: 15.881488800048828, loss: 78.11859371095792
Epoch: 200/5000
w1: 11.555778503417969, w2: -0.21187061071395874, bias: 16.998315811157227, loss: 68.93165545311753
Epoch: 300/5000
w1: 12.718533515930176, w2: -2.56358003616333, bias: 16.81089210510254, loss: 62.173577384915774
Epoch: 400/5000
w1: 13.90304183959961, w2: -4.585747241973877, bias: 16.800025939941406, loss: 56.79182986963833
Epoch: 500/5000
w1: 14.967144012451172, w2: -6.40493106842041, bias: 16.75732421875, loss: 52.39103174204248
Epoch: 600/5000
w1: 15.9356050491333, w2: -8.12226390838623, bias: 16.799663543701172, loss: 48.65957528994758
Epoch: 700/5000
w1: 16.72344398498535, w2: -9.702632904052734, bias: 16.679140090942383, loss: 45.647002626302246
Epoch: 800/5000
w1: 17.431224822998047, w2: -11.052210807800293, bias: 16.579404830932617, loss: 43.32511744180553
Epoch: 900/5000
w1: 18.30120849609375, w2: -12.218840599060059, bias

In [17]:
from sklearn.metrics import mean_squared_error

# 테스트 데이터에서 예측 수행 및 결과를 DataFrame으로 생성. 
test_predicted_ts = test_ftr_ts[:, 0]*w1 + test_ftr_ts[:, 1]*w2 + bias

boston_test_df = pd.DataFrame({
    'RM': test_features[:, 0],
    'LSTAT': test_ftr_ts[:, 1],
    'PRICE': test_tgt_ts,
    'PREDICTED_PRICE_RANDOM_BATCH': test_predicted_ts.cpu().numpy()
})

test_total_mse = mean_squared_error(boston_test_df['PRICE'], boston_test_df['PREDICTED_PRICE_RANDOM_BATCH'])
print("test 데이터 세트의 MSE:", test_total_mse)

boston_test_df.head(20)

test 데이터 세트의 MSE: 28.42650203860407


Unnamed: 0,RM,LSTAT,PRICE,PREDICTED_PRICE_RANDOM_BATCH
0,0.504311,0.546082,11.0,16.242677
1,0.727534,0.082781,31.5,33.231256
2,0.442422,0.348786,22.0,19.422623
3,0.44338,0.197296,50.0,23.116687
4,0.51964,0.139349,24.1,26.490353
5,0.511401,0.309051,20.1,22.167069
6,0.425752,0.450607,22.5,16.525711
7,0.612569,0.049669,32.4,31.063288
8,0.623683,0.061258,31.6,31.069676
9,0.571757,0.53394,10.9,18.27916


### iteration 시에 순차적으로 일정한 batch 크기만큼의 데이터를 전체 학습데이터에 걸쳐서 가져오는 Mini-Batch GD 수행

In [19]:
# 이런 방식으로 30개씩 순차적으로 추출
for batch_step in range(0, 506, 30):
    print(batch_step)

0
30
60
90
120
150
180
210
240
270
300
330
360
390
420
450
480


In [21]:
# batch_gradient_descent()는 인자로 batch_size(배치 크기)를 입력 받음. 
def batch_gradient_descent(features, target, epochs=300, batch_size=30, verbose=True):
    torch.manual_seed(2025)

    w1 = torch.zeros(1, dtype=torch.float32)
    w2 = torch.zeros(1, dtype=torch.float32)
    bias = torch.ones(1, dtype=torch.float32)
    print('최초 w1, w2, bias:', w1.item(), w2.item(), bias.item())
    
    # learning_rate와 RM, LSTAT 피처 지정. 호출 시 numpy array형태로 RM과 LSTAT으로 된 2차원 feature가 입력됨.
    learning_rate = 0.01
    rm = features[:, 0]
    lstat = features[:, 1]
    
    # iter_epochs 수만큼 반복하면서 weight와 bias update 수행. 
    for i in range(1, epochs+1):
        # batch_size 만큼 데이터를 가져와서 weight/bias update를 수행하는 로직을 전체 데이터 건수만큼 반복
        for batch_step in range(0, target.shape[0], batch_size):
            # batch_size만큼 순차적인 데이터를 가져옴. 
            rm_batch = rm[batch_step:batch_step + batch_size]
            lstat_batch = lstat[batch_step:batch_step + batch_size]
            target_batch = target[batch_step:batch_step + batch_size]
        
            bias_update, w1_update, w2_update = get_update_weights_value_batch(bias, w1, w2, 
                                                                               rm_batch, lstat_batch, target_batch, 
                                                                               learning_rate)
            # Batch GD로 구한 weight/bias의 update 적용. 
            w1 = w1 - w1_update
            w2 = w2 - w2_update
            bias = bias - bias_update
        
        if verbose:
            print(f'Epoch: {i}/{epochs}')
            # Loss는 전체 학습 데이터 기반으로 구해야 함. 아래는 전체 학습 feature 기반의 예측 및 loss임.  
            predicted = w1 * rm + w2*lstat + bias
            diff = target - predicted
            loss = torch.mean(diff ** 2)
            print(f'w1: {w1.item()}, w2: {w2.item()}, bias: {bias.item()}, loss: {loss.item()}')
        
    return w1, w2, bias

In [22]:
tr_ftr_ts, tr_tgt_ts, test_ftr_ts, test_tgt_ts = get_scaled_train_test_feature_target_ts(data_df=boston_df)

# 학습 feature와 target으로 Mini Batch Gradient Descent 수행. 
w1, w2, bias = batch_gradient_descent(tr_ftr_ts, tr_tgt_ts, epochs=300, batch_size=30, verbose=True)
print('##### 최종 w1, w2, bias #######')
print(w1, w2, bias)

최초 w1, w2, bias: 0.0 0.0 1.0
Epoch: 1/300
w1: 2.548649311065674, w2: 1.0510945320129395, bias: 5.483735084533691, loss: 324.92663814145334
Epoch: 2/300
w1: 4.426955699920654, w2: 1.7139111757278442, bias: 8.697149276733398, loss: 206.7792344029648
Epoch: 3/300
w1: 5.824299335479736, w2: 2.0995075702667236, bias: 11.000228881835938, loss: 145.65487506900436
Epoch: 4/300
w1: 6.876403331756592, w2: 2.2874581813812256, bias: 12.650941848754883, loss: 113.7959164674643
Epoch: 5/300
w1: 7.680532455444336, w2: 2.3347744941711426, bias: 13.834155082702637, loss: 96.95631171099302
Epoch: 6/300
w1: 8.306395530700684, w2: 2.2823007106781006, bias: 14.682345390319824, loss: 87.82619786802942
Epoch: 7/300
w1: 8.803953170776367, w2: 2.1592955589294434, bias: 15.290448188781738, loss: 82.65587677962309
Epoch: 8/300
w1: 9.209012985229492, w2: 1.9867151975631714, bias: 15.726494789123535, loss: 79.52265426532097
Epoch: 9/300
w1: 9.547245025634766, w2: 1.7795655727386475, bias: 16.03923797607422, loss: 

In [23]:
# 테스트 데이터에서 예측 수행 및 결과를 DataFrame으로 생성. 
test_predicted_ts = test_ftr_ts[:, 0]*w1 + test_ftr_ts[:, 1]*w2 + bias

boston_test_df = pd.DataFrame({
    'RM': test_features[:, 0],
    'LSTAT': test_ftr_ts[:, 1],
    'PRICE': test_tgt_ts,
    'PREDICTED_PRICE_BATCH': test_predicted_ts.cpu().numpy()
})

test_total_mse = mean_squared_error(boston_test_df['PRICE'], boston_test_df['PREDICTED_PRICE_BATCH'])
print("test 데이터 세트의 MSE:", test_total_mse)

boston_test_df.head(20)

test 데이터 세트의 MSE: 28.330330879942494


Unnamed: 0,RM,LSTAT,PRICE,PREDICTED_PRICE_BATCH
0,0.504311,0.546082,11.0,16.511732
1,0.727534,0.082781,31.5,32.899692
2,0.442422,0.348786,22.0,19.593937
3,0.44338,0.197296,50.0,23.16414
4,0.51964,0.139349,24.1,26.41401
5,0.511401,0.309051,20.1,22.236692
6,0.425752,0.450607,22.5,16.796413
7,0.612569,0.049669,32.4,30.820619
8,0.623683,0.061258,31.6,30.825219
9,0.571757,0.53394,10.9,18.470453
