In [1]:
%load_ext autoreload
%autoreload 3

In [2]:
import numpy as np
from minitorch.tensor.tensor import Tensor
from minitorch.optimizers.optim import SGD, Adam, AdamW
from minitorch.nn.layers import Linear
from minitorch.losses.losses import MSE

In [3]:
def sgd_unit_test():
    """Test SGD optimizer implemetation"""
    print('Unit Test: SGD Optimizer ....')
    
    #* basic optimizer test
    param = Tensor(np.array([1.0,2.0], dtype=np.float32), requires_grad=True)
    optimizer = SGD([param], lr=0.1)
    param.grad = Tensor(np.array([2.0, 1.0], dtype=np.float32))
    original_data = param.data.copy()
    grad = param.grad.data
    optimizer.step()
    
    expected = original_data - optimizer.learning_rate * grad
    print(expected, param)
    assert np.allclose(expected, param.data)
    assert optimizer.step_count == 1
    print('Basic SGD optimizer works correctly')
    
    # optimizer with momentum test
    param2 = Tensor(np.array([1.0, 2.0]), requires_grad=True)
    optimizer_momentum = SGD([param2], lr=0.1, momentum=0.9)
    param2.grad = Tensor(np.array([2.0, 1.0]))
    original_data = param2.data.copy()
    grad = param2.grad.data
    optimizer_momentum.step()
    
    expected = original_data - optimizer_momentum.learning_rate * grad
    assert np.allclose(expected.data, param2.data)
    assert optimizer_momentum.step_count == 1, f'step count expected to be 1 got {optimizer_momentum.step_count}'
    print('SGD Oprimizer with momentum works correctly ')
    
    # test weight decay
    param3 = Tensor(np.array([1.0, 2.0]), requires_grad=True)
    optimizer_weight_decay = SGD([param3], weight_decay=0.1)
    param3.grad = Tensor(np.array([3.0, 4.0]))
    
    optimizer_weight_decay.step()

    
    expected = param3.data - optimizer_weight_decay.learning_rate * (param3.grad.data + optimizer_weight_decay.weight_decay * param3.data)
    assert np.allclose(expected, param3.data, rtol=0.05)
    print('SGD Optimizer with weight decay works correctly')
    
    print("SGD optimizer works correctly!")

    
if __name__ == '__main__':
    sgd_unit_test()

Unit Test: SGD Optimizer ....
[0.8 1.9] Tensor(data=[0.8 1.9])
Basic SGD optimizer works correctly
SGD Oprimizer with momentum works correctly 
SGD Optimizer with weight decay works correctly
SGD optimizer works correctly!


In [4]:
x = Tensor(np.array([[2.0, 3.0, 4.6,7.0],
                    [4.0,5.0,8.0,10.0],
                    [5.6,7.0, 11.1,1.0],
                    [2.0, 3.0,0.0,-1.0],
                    [4.0,5.0,-2.0, -10.0],
                    [5.6,7.0, 11.9,12.0]]), requires_grad=True)
y = Tensor(np.array([1.0, 2.0, 3.0, 3.0, 4.0,5.0]), requires_grad=True)

weight = Tensor(np.array([0.1, 0.2, 0.3,0.4]), requires_grad=True)
bias = Tensor(np.array([0.0,0.0,0.0,0.0,0.0,0.0]), requires_grad=True)

loss_fn = MSE()
optimizer = SGD([weight, bias],lr=0.001, momentum=0.9, weight_decay=0.01)

for i in range(100):
    y_hat = x @ weight.transpose() + bias
    loss = loss_fn(y_hat, y)
    if i % 10 ==0:
        print(f'Iteration {i}, loss {loss.data}')
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

Iteration 0, loss 23.954233333333338
Iteration 10, loss 5.652310933047676
Iteration 20, loss 0.7633906755512504
Iteration 30, loss 1.008738277237916
Iteration 40, loss 0.7229959892219938
Iteration 50, loss 0.46474383421076865
Iteration 60, loss 0.4137194474611481
Iteration 70, loss 0.3795707359642195
Iteration 80, loss 0.34471157095081856
Iteration 90, loss 0.3204017834229631


In [5]:
linear = Linear(x.shape[1], y.size, bias=True)
loss_fn = MSE()
optimizer = SGD(linear.parameters(), lr=0.001, momentum=0.0, weight_decay=0.0)
# y_hat = x @ weight + bias

for i in range(100):
    y_hat = linear(x)
    loss = loss_fn(y_hat, y)
    if i % 10 == 0:
        print(f'Iteration {i}, Loss: {loss.data}')
        
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

Iteration 0, Loss: 47.21571033682408
Iteration 10, Loss: 25.476979983727873
Iteration 20, Loss: 15.794613473628292
Iteration 30, Loss: 11.10112486536548
Iteration 40, Loss: 8.550502515815717
Iteration 50, Loss: 6.980991399276939
Iteration 60, Loss: 5.905896981729324
Iteration 70, Loss: 5.111566859838638
Iteration 80, Loss: 4.496759592264123
Iteration 90, Loss: 4.008073169225105


In [6]:
def Unit_test_adam_update_moments():
    print('Unit Test: Adam Moments Updates ...')
    linear = Linear(x.shape[1], y.size, bias=True)
    optimizer = Adam([linear.weight])
    
    linear.weight.grad = np.random.random(linear.weight.shape)

    # for i, param in enumerate([linear.weight]):
    optimizer.step_count += 1
    m_hat, v_hat = optimizer._update_moments(i, linear.weight.grad)
    assert optimizer.m_buffers[0] is not None
    assert optimizer.v_buffers[0] is not None
    assert np.allclose(m_hat, linear.weight.grad)
    assert np.allclose(v_hat, linear.weight.grad ** 2)
    print('Adama update moments update works correctly')
    
Unit_test_adam_update_moments()

Unit Test: Adam Moments Updates ...


IndexError: list index out of range

In [7]:
def Unit_test_adam():
    print('Unit Test: Adam Moments Updates ...')
    linear = Linear(x.shape[1], y.size, bias=True)
    optimizer = Adam([linear.weight])
    print('Testing the first step')
    
    # set the gradient
    linear.weight.grad = np.random.random(linear.weight.shape)
    grad = linear.weight.grad.copy()
    original_data = linear.weight.data.copy()
    
    # first step
    optimizer.step()
    
    # first moment: m = 0.9 * 0 + 0.1 * grad = 0.1 * grad
    m = 0.1 * grad
    
    # second moment: v = 0.999 * 0 + 0.001 * grad ** 2 = 0.001 * grad ** 2
    v = 0.001 * (grad ** 2)
    
    # bais correction
    m_bias_correction = 1 - 0.9 ** optimizer.step_count
    v_bias_correction = 1 - 0.999 * optimizer.step_count
    
    m_hat = m / m_bias_correction
    v_hat = v / v_bias_correction
    
    #* paramter update
    expected = original_data - optimizer.lr * m_hat / (np.sqrt(v_hat) + optimizer.eps)
    assert np.allclose(expected, linear.weight.data)
    assert optimizer.step_count == 1, f'Expected 1 in the first step, got {optimizer.step_count}'
    print('Adam optimizer works for the first step')
    
    #* second step
    print('Testing the second step')
    
    # set the gradient
    optimizer = Adam([linear.bias])
    linear.bias.grad = np.random.random(linear.bias.shape)
    grad = linear.bias.grad.copy()
    original_data = linear.bias.data.copy()
    
    # first step
    optimizer.step()
    
    # first moment: m = 0.9 * 0 + 0.1 * grad = 0.1 * grad
    m = 0.1 * grad
    
    # second moment: v = 0.999 * 0 + 0.001 * grad ** 2 = 0.001 * grad ** 2
    v = 0.001 * (grad ** 2)
    
    # bais correction
    m_bias_correction = 1 - 0.9 ** optimizer.step_count
    v_bias_correction = 1 - 0.999 * optimizer.step_count
    
    m_hat = m / m_bias_correction
    v_hat = v / v_bias_correction
    
    #* paramter update
    expected = original_data - optimizer.lr * m_hat / (np.sqrt(v_hat) + optimizer.eps)
    assert np.allclose(expected, linear.bias.data), f'Excpected the parameter to be {expected}, got {linear.bias.data}'
    # assert optimizer.step_count == 2, f'Expcted 2 in the second step, got {optimizer.step_count}'
    print('Adam optimizer correctly on the second step')
    
    
    print('Adam optimizer with adaptive learning rate works correctly.')
    
    
Unit_test_adam()

Unit Test: Adam Moments Updates ...
Testing the first step
Adam optimizer works for the first step
Testing the second step
Adam optimizer correctly on the second step
Adam optimizer with adaptive learning rate works correctly.


In [145]:
linear = Linear(x.shape[1], y.size, bias=True)
loss_fn = MSE()
optimizer = Adam(linear.parameters(), lr=0.1,weight_decay=0.001)
# y_hat = x @ weight + bias

for i in range(10000):
    y_hat = linear(x)
    loss = loss_fn(y_hat, y)
    if i % 1000 == 0:
        print(f'Iteration {i}, Loss: {loss.data}')
        
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

Iteration 0, Loss: 50.442521374401196
Iteration 1000, Loss: 0.10882119762703754
Iteration 2000, Loss: 0.08150233113642548
Iteration 3000, Loss: 0.08066181607154065
Iteration 4000, Loss: 0.08354036500605264
Iteration 5000, Loss: 0.08114620766283201
Iteration 6000, Loss: 0.08065318294872802
Iteration 7000, Loss: 0.08095588088298407
Iteration 8000, Loss: 0.0808139068152513
Iteration 9000, Loss: 0.0935180902535618


In [10]:
linear = Linear(x.shape[1], y.size, bias=True)
loss_fn = MSE()
optimizer = AdamW(linear.parameters(), lr=0.1,weight_decay=0.001)
# y_hat = x @ weight + bias

for i in range(100000):
    y_hat = linear(x)
    loss = loss_fn(y_hat, y)
    if i % 10000 == 0:
        print(f'Iteration {i}, Loss: {loss.data}')
        
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

Iteration 0, Loss: 69.4488174059303
Iteration 10000, Loss: 0.0005150130165146818
Iteration 20000, Loss: 0.0031929365709119344
Iteration 30000, Loss: 0.00013526409449169465
Iteration 40000, Loss: 0.00014145314344632132
Iteration 50000, Loss: 0.00034390985544437743
Iteration 60000, Loss: 0.0005161315429608722
Iteration 70000, Loss: 0.00015714950489616934
Iteration 80000, Loss: 0.00013737748793128595
Iteration 90000, Loss: 0.0001668512829105135


In [11]:
y_hat = linear(x)
y_hat,y

(Tensor(data=[[1.00490058 1.99482616 2.991096   2.99015039 4.11146404 4.98428141]
  [1.00826543 1.99253823 2.98519634 2.98510882 4.17548962 4.97520121]
  [1.01009757 1.9982266  2.99458421 2.99407155 4.17218911 4.99058666]
  [1.00250596 2.00081124 3.00137516 3.00073049 4.0370774  5.00175835]
  [0.99938528 2.00045078 3.00055027 3.00086048 3.98655643 5.0011929 ]
  [1.02404827 2.01596511 3.01855065 3.01893041 4.28539767 5.03175339]], shape=(6, 6), grad_info= requires_grad=True),
 Tensor(data=[1. 2. 3. 3. 4. 5.], shape=(6,), grad_info= requires_grad=True))