In [1]:
import numpy as np
import os

from tinytorch.core.tensor import Tensor
from tinytorch.core.autograd import enable_autograd
from tinytorch.core.optimizers import Optimizer, SGD, Adam, AdamW

enable_autograd()

## Unit Test -Optimizer 

In [2]:
def test_unit_optimizer_base():
    print("ðŸ”¬ Unit Test: Base Optimizer...")

    # create test parameters
    param1 = Tensor([1.0, 2.0], requires_grad= True)
    param2 = Tensor([[3.0, 4.0], [5.0, 6.0]], requires_grad= True)

    # add some gradients
    param1.grad = Tensor([0.1, 0.2])
    param2.grad = Tensor([[0.3, 0.4], [0.5, 0.6]])

    # create an optimizer
    optimizer = Optimizer([param1, param2])

    # Test parameter storage
    assert len(optimizer.params) == 2
    assert optimizer.params[0] is param1
    assert optimizer.params[1] is param2
    assert optimizer.step_count == 0

    # Test zero_grad
    optimizer.zero_grad()
    assert param1.grad is None
    assert param2.grad is None

    # Test that optimizers accepts any tensor (no validation required)
    regular_param = Tensor([1.0])
    opt = Optimizer([regular_param])
    assert len(opt.params) == 1
    
    

    print("âœ… Base Optimizer works correctly!")

if __name__=='__main__':
    test_unit_optimizer_base()

ðŸ”¬ Unit Test: Base Optimizer...
âœ… Base Optimizer works correctly!


## Unit Test - Stochastic Gradient Descent

In [3]:
def test_unit_sgd_optimizer():
    print("ðŸ”¬ Unit Test: SGD Optimizer...")

    # Test basic SGD without momentum
    param = Tensor([1.0, 2.0], requires_grad= True)
    param.grad = Tensor([0.1, 0.2])

    optimizer = SGD([param], lr=0.1)
    original_data = param.data.copy()

    optimizer.step()

    # Expected: param = param - lr * grad = [1.0, 2.0] - 0.1 * [0.1, 0.2] = [0.99, 1.98]
    expected = original_data - 0.1 * param.grad.data
    assert np.allclose(param.data, expected)
    assert optimizer.step_count == 1

    # Test SGD with momentum
    param2 = Tensor([1.0, 2.0], requires_grad = True)
    param2.grad = Tensor([0.1, 0.2])

    optimizer_momentum = SGD([param2], lr=0.1, momentum=0.9)
    # first step v = 0.9 * 0 + [0.1, 0.2] = [0.1, 0.2]
    optimizer_momentum.step()
    expected_first = np.array([1.0, 2.0] - 0.1 * np.array([0.1, 0.2]))
    np.allclose(param2.data, expected_first)

    # second setp with the same gradient
    param2.grad = Tensor([0.1, 0.2])
    optimizer_momentum.step()
    # v = 0.9 * [0.1, 0.2] + [0.1, 0.2] = [0.19, 0.38]
    expected_momentum = np.array([0.19, 0.38])
    expected_second = expected_first - 0.1 * expected_momentum
    assert np.allclose(param2.data, expected_second, rtol=1e-5)

    # Test weigth decay
    param3 = Tensor([1.0, 2.0], requires_grad= True)
    param3.grad = Tensor([0.1, 0.2], requires_grad= True)

    optimizer_wd = SGD([param3], lr= 0.1, weight_decay=0.01)
    optimizer_wd.step()

    # grad with with decay = [0.1, 0.2] + 0.01 * [1.0, 2.0] = [0.11, 0.22] i.e. grad + weight_decay * param.data
    expected_wd = np.array([1.0, 2.0]) - 0.1 * np.array([0.11, 0.22])
    assert np.allclose(param3.data, expected_wd)
    



    print("âœ… SGD optimizer works correctly!")


if __name__=='__main__':
    test_unit_sgd_optimizer()

ðŸ”¬ Unit Test: SGD Optimizer...
âœ… SGD optimizer works correctly!


## Unit Test Adam Optimizer 

In [4]:
def test_unit_adam_optimizer():
    print("ðŸ”¬ Unit Test: Adam Optimizer...")

    # Test basic Adam functionality
    param = Tensor([1.0, 2.0], requires_grad= True)
    param.grad = Tensor([0.1, 0.2])

    optimizer = Adam([param], lr=0.01, betas= (0.9, 0.999), eps=1e-8)
    original_data = param.data.copy()

    # Frist step
    optimizer.step()

    # manual compute expected values
    grad = np.array([0.1, 0.2])
    # first moment
    m = 0.1 * grad

    # second moment
    v = 0.001 * (grad ** 2)

    # bias corrections
    bias_correction1 = 1 - 0.9 ** 1
    bias_correction2 = 1 - 0.999**1

    m_hat = m / bias_correction1
    v_hat = v / bias_correction2

    expected = original_data - 0.01 * m_hat / (np.sqrt(v_hat)+ 1e-8)
    assert np.allclose(param.data, expected, rtol=1e-6)
    assert optimizer.step_count == 1

    # Test second step to verify moment accumulation
    param.grad = Tensor([0.1, 0.2])
    optimizer.step()

    # should have updated moments
    assert optimizer.m_buffers[0] is not None
    assert optimizer.v_buffers[0] is not None
    assert optimizer.step_count == 2

    # Test with weight decay
    param2  = Tensor([1.0, 2.0], requires_grad= True)
    param2.grad = Tensor([0.1, 0.2])

    optimizer_wd = Adam([param2], lr= 0.01, weight_decay=0.01)
    optimizer_wd.step()

    # weight with decay computation 
    # grad_with_decay = [0.1, 0.2] + 0.01 * [1.0, 2.0] = [0.11, 0.22]
    assert not np.array_equal(param2.data, np.array([1.0, 2.0]))

    print("âœ… Adam optimizer works correctly!")

if __name__=='__main__':
    test_unit_adam_optimizer()

ðŸ”¬ Unit Test: Adam Optimizer...
âœ… Adam optimizer works correctly!


## Unit Test - AdamW Optimizer

In [5]:
def test_unit_adamw_optimizer():
    print("ðŸ”¬ Unit Test: AdamW Optimizer...")

    # Test AdamW vs Adam difference in weight decay
    # Create identical parameters for comparison
    param_adam = Tensor([1.0, 2.0], requires_grad= True)
    param_adamw = Tensor([1.0, 2.0], requires_grad= True)

    param_adam.grad = Tensor([0.1, 0.2])
    param_adamw.grad = Tensor([0.1, 0.2])

    # create optimzers with same settings
    adam = Adam([param_adam], lr=0.01, weight_decay= 0.01)
    adamw = AdamW([param_adamw], lr= 0.01, weight_decay= 0.01)

    # Take one step
    adam.step()
    adamw.step()

    assert not np.allclose(param_adam.data, param_adamw.data, rtol=1e-6)

    # Test AdamW basic functionality
    param = Tensor([1.0, 2.0], requires_grad= True)
    param.grad = Tensor([0.1, 0.2])

    optimizer = AdamW([param], lr=0.01, weight_decay= 0.01)
    original_data = param.data.copy()

    optimizer.step()

    # parameters should have changed
    assert not np.array_equal(param.data, original_data)
    assert optimizer.step_count == 1

    # Test that moment buffers are created
    assert optimizer.m_buffers[0] is not None
    assert optimizer.v_buffers[0] is not None

    # Test zero weight decay behaves like Adam
    param1 = Tensor([1.0, 2.0], requires_grad= True)
    param2 = Tensor([1.0, 2.0], requires_grad = True)

    param1.grad = Tensor([0.1, 0.2])
    param2.grad = Tensor([0.1, 0.2])

    adam_no_wd = Adam([param1], lr= 0.01, weight_decay= 0.0)
    adamw_no_wd = AdamW([param2], lr= 0.01, weight_decay= 0.0)

    adam_no_wd.step()
    adamw_no_wd.step()

    # should be very simular (within numerical precision)
    assert np.allclose(param1.data, param2.data, rtol=1e-10)
    

    print("âœ… AdamW optimizer works correctly!")

if __name__=='__main__':
    test_unit_adamw_optimizer()

ðŸ”¬ Unit Test: AdamW Optimizer...
âœ… AdamW optimizer works correctly!


In [6]:
def analyze_optimizer_memory_usage():
    """ðŸ“Š Analyze memory usage of different optimizers."""
    print("ðŸ“Š Analyzing Optimizer Memory Usage...")

    # Create test parameters of different sizes
    param_sizes = [1000, 10000, 100000]  # 1K, 10K, 100K parameters

    print("Optimizer Memory Analysis (per parameter tensor):")
    print("=" * 60)
    print(f"{'Size':<10} {'SGD':<10} {'Adam':<10} {'AdamW':<10} {'Ratio':<10}")
    print("-" * 60)

    for size in param_sizes:
        # Create parameter
        param = Tensor(np.random.randn(size), requires_grad=True)
        param.grad = Tensor(np.random.randn(size))

        # SGD memory (parameter + momentum buffer)
        sgd = SGD([param], momentum=0.9)
        sgd.step()  # Initialize buffers
        sgd_memory = size * 2  # param + momentum buffer

        # Adam memory (parameter + 2 moment buffers)
        param_adam = Tensor(np.random.randn(size), requires_grad=True)
        param_adam.grad = Tensor(np.random.randn(size))
        adam = Adam([param_adam])
        adam.step()  # Initialize buffers
        adam_memory = size * 3  # param + m_buffer + v_buffer

        # AdamW memory (same as Adam)
        adamw_memory = adam_memory

        # Memory ratio (Adam/SGD)
        ratio = adam_memory / sgd_memory

        print(f"{size:<10} {sgd_memory:<10} {adam_memory:<10} {adamw_memory:<10} {ratio:.1f}x")

    print("\nðŸ’¡ Key Insights:")
    print("- SGD: 2Ã— parameter memory (momentum buffer)")
    print("- Adam/AdamW: 3Ã— parameter memory (two moment buffers)")
    print("- Memory scales linearly with model size")
    print("- Trade-off: More memory for better convergence")

In [7]:
def analyze_optimizer_convergence_behavior():
    """ðŸ“Š Analyze convergence behavior of different optimizers."""
    print("ðŸ“Š Analyzing Optimizer Convergence Behavior...")

    # Simulate optimization of a quadratic function: f(x) = 0.5 * x^2
    # Optimal solution: x* = 0, gradient = x

    def quadratic_loss(x):
        """Simple quadratic function for optimization testing."""
        return 0.5 * (x ** 2).sum()

    def compute_gradient(x):
        """Gradient of quadratic function: df/dx = x."""
        return x.copy()

    # Starting point
    x_start = np.array([5.0, -3.0, 2.0])  # Far from optimum [0, 0, 0]

    # Test different optimizers
    optimizers_to_test = [
        ("SGD", SGD, {"lr": 0.1}),
        ("SGD+Momentum", SGD, {"lr": 0.1, "momentum": 0.9}),
        ("Adam", Adam, {"lr": 0.1}),
        ("AdamW", AdamW, {"lr": 0.1, "weight_decay": 0.01})
    ]

    print("Convergence Analysis (quadratic function f(x) = 0.5 * xÂ²):")
    print("=" * 70)
    print(f"{'Optimizer':<15} {'Step 0':<12} {'Step 5':<12} {'Step 10':<12} {'Final Loss':<12}")
    print("-" * 70)

    for name, optimizer_class, kwargs in optimizers_to_test:
        # Reset parameter
        param = Tensor(x_start.copy(), requires_grad=True)
        optimizer = optimizer_class([param], **kwargs)

        losses = []

        # Run optimization for 10 steps
        for step in range(11):
            # Compute loss and gradient
            loss = quadratic_loss(param.data)
            param.grad = Tensor(compute_gradient(param.data))

            losses.append(loss)

            # Update parameters
            if step < 10:  # Don't update after last evaluation
                optimizer.step()
                optimizer.zero_grad()

        # Format results
        step0 = f"{losses[0]:.6f}"
        step5 = f"{losses[5]:.6f}"
        step10 = f"{losses[10]:.6f}"
        final = f"{losses[10]:.6f}"

        print(f"{name:<15} {step0:<12} {step5:<12} {step10:<12} {final:<12}")

    print("\nðŸ’¡ Key Insights:")
    print("- SGD: Steady progress but can be slow")
    print("- SGD+Momentum: Faster convergence, less oscillation")
    print("- Adam: Adaptive rates help with different parameter scales")
    print("- AdamW: Similar to Adam with regularization effects")

In [8]:
def test_module():
    """ðŸ§ª Module Test: Complete Integration

    Comprehensive test of entire module functionality.

    This final test runs before module summary to ensure:
    - All unit tests pass
    - Functions work together correctly
    - Module is ready for integration with TinyTorch
    """
    print("ðŸ§ª RUNNING MODULE INTEGRATION TEST")
    print("=" * 50)

    # Run all unit tests
    print("Running unit tests...")
    test_unit_optimizer_base()
    test_unit_sgd_optimizer()
    test_unit_adam_optimizer()
    test_unit_adamw_optimizer()

    print("\nRunning integration scenarios...")

    # Test realistic neural network optimization scenario
    print("ðŸ”¬ Integration Test: Multi-layer Network Optimization...")

    # Import components from TinyTorch package (previous modules must be completed and exported)
    from tinytorch.core.layers import Linear
    from tinytorch.core.activations import ReLU
    from tinytorch.core.losses import MSELoss

    # Create parameters for a 2-layer network
    # Layer 1: 3 inputs -> 4 hidden
    W1 = Tensor(np.random.randn(3, 4) * 0.1, requires_grad=True)
    b1 = Tensor(np.zeros(4), requires_grad=True)

    # Layer 2: 4 hidden -> 2 outputs
    W2 = Tensor(np.random.randn(4, 2) * 0.1, requires_grad=True)
    b2 = Tensor(np.zeros(2), requires_grad=True)

    params = [W1, b1, W2, b2]

    # Add realistic gradients
    W1.grad = Tensor(np.random.randn(3, 4) * 0.01)
    b1.grad = Tensor(np.random.randn(4) * 0.01)
    W2.grad = Tensor(np.random.randn(4, 2) * 0.01)
    b2.grad = Tensor(np.random.randn(2) * 0.01)

    # Test all optimizers on same network
    optimizers = [
        SGD(params, lr=0.01, momentum=0.9),
        Adam([p for p in params], lr=0.001),  # Fresh param list for Adam
        AdamW([p for p in params], lr=0.001, weight_decay=0.01)  # Fresh param list for AdamW
    ]

    # Save original parameter values
    original_params = [p.data.copy() for p in params]

    # Test SGD
    optimizers[0].step()
    sgd_params = [p.data.copy() for p in params]

    # Restore parameters and test Adam
    for i, p in enumerate(params):
        p.data = original_params[i].copy()
        # Re-add gradients since they may have been modified
        if i == 0:
            p.grad = Tensor(np.random.randn(3, 4) * 0.01)
        elif i == 1:
            p.grad = Tensor(np.random.randn(4) * 0.01)
        elif i == 2:
            p.grad = Tensor(np.random.randn(4, 2) * 0.01)
        else:
            p.grad = Tensor(np.random.randn(2) * 0.01)

    # Update parameter references for Adam
    optimizers[1].params = params
    optimizers[1].step()
    adam_params = [p.data.copy() for p in params]

    # Restore parameters and test AdamW
    for i, p in enumerate(params):
        p.data = original_params[i].copy()
        # Re-add gradients
        if i == 0:
            p.grad = Tensor(np.random.randn(3, 4) * 0.01)
        elif i == 1:
            p.grad = Tensor(np.random.randn(4) * 0.01)
        elif i == 2:
            p.grad = Tensor(np.random.randn(4, 2) * 0.01)
        else:
            p.grad = Tensor(np.random.randn(2) * 0.01)

    # Update parameter references for AdamW
    optimizers[2].params = params
    optimizers[2].step()
    adamw_params = [p.data.copy() for p in params]

    # Verify parameters changed differently for each optimizer
    for i in range(len(params)):
        # Parameters should be different from original
        assert not np.array_equal(sgd_params[i], original_params[i])
        assert not np.array_equal(adam_params[i], original_params[i])
        assert not np.array_equal(adamw_params[i], original_params[i])

        # Different optimizers should produce different results
        assert not np.allclose(sgd_params[i], adam_params[i], rtol=1e-6)

    print("âœ… Multi-layer network optimization works!")

    # Test optimizer state management
    print("ðŸ”¬ Integration Test: Optimizer State Management...")

    param = Tensor([1.0, 2.0], requires_grad=True)
    param.grad = Tensor([0.1, 0.2])

    optimizer = Adam([param], lr=0.001)

    # First step should initialize buffers
    optimizer.step()
    assert optimizer.m_buffers[0] is not None
    assert optimizer.v_buffers[0] is not None
    assert optimizer.step_count == 1

    # Zero grad should clear gradients but preserve optimizer state
    optimizer.zero_grad()
    assert param.grad is None
    assert optimizer.m_buffers[0] is not None  # State preserved
    assert optimizer.step_count == 1  # Step count preserved

    print("âœ… Optimizer state management works!")

    print("\n" + "=" * 50)
    print("ðŸŽ‰ ALL TESTS PASSED! Module ready for export.")
    print("Run: tito module complete 07_optimizers")

In [9]:
def demo_optimizers():
    """ðŸŽ¯ See optimizers update weights."""
    print("ðŸŽ¯ AHA MOMENT: Optimizers Update Weights")
    print("=" * 45)

    # Create a parameter with a gradient
    weight = Tensor(np.array([5.0]), requires_grad=True)
    weight.grad = np.array([1.0])  # Gradient pointing "uphill"

    print(f"Initial weight: {weight.data[0]:.2f}")
    print(f"Gradient:       {weight.grad[0]:.2f} (pointing uphill)")

    # SGD takes a step in the opposite direction
    optimizer = SGD([weight], lr=0.5)
    optimizer.step()

    print(f"\nAfter SGD step: {weight.data[0]:.2f}")
    print(f"Moved: {5.0 - weight.data[0]:.2f} (opposite to gradient)")

    print("\nâœ¨ Optimizer moves weights to reduce loss!")

In [10]:
if __name__ == "__main__":
    test_module()
    print("\n")
    demo_optimizers()

ðŸ§ª RUNNING MODULE INTEGRATION TEST
Running unit tests...
ðŸ”¬ Unit Test: Base Optimizer...
âœ… Base Optimizer works correctly!
ðŸ”¬ Unit Test: SGD Optimizer...
âœ… SGD optimizer works correctly!
ðŸ”¬ Unit Test: Adam Optimizer...
âœ… Adam optimizer works correctly!
ðŸ”¬ Unit Test: AdamW Optimizer...
âœ… AdamW optimizer works correctly!

Running integration scenarios...
ðŸ”¬ Integration Test: Multi-layer Network Optimization...
âœ… Multi-layer network optimization works!
ðŸ”¬ Integration Test: Optimizer State Management...
âœ… Optimizer state management works!

ðŸŽ‰ ALL TESTS PASSED! Module ready for export.
Run: tito module complete 07_optimizers


ðŸŽ¯ AHA MOMENT: Optimizers Update Weights
Initial weight: 5.00
Gradient:       1.00 (pointing uphill)

After SGD step: 4.50
Moved: 0.50 (opposite to gradient)

âœ¨ Optimizer moves weights to reduce loss!
