# 기계학습 실습 3: PyTorch로 구현하는 신경망 (24.11.28)

**학번**: 2019160102

이름: 이윤영

**주요일정**:

- 12월 5일(목): 실습 4: 차원축소 및 군집
- 12월 17일(화): 기말고사
- 12월 23일(월): 텀프로젝트 마감


**목차:**

1. PyTorch의 Tensor
2. PyTorch 신경망 기본
3. 활성화 함수
4. 신경망 분류 모델
5. 미분, 역전파, Autograd
6. 신경망 훈련
7. 정규화

In [1]:
#@title 시각화 함수 실행

import torch
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.metrics import mean_squared_error, mean_absolute_error
from torchvision import utils
import matplotlib.pyplot as plt
plt.style.use('ggplot')
plt.rcParams.update({'font.size': 16, 'axes.labelweight': 'bold'})

def sigmoid(x):
    return 1 / (1 + np.exp(-x))


def gradient(x, y, w):
    """MSE gradient."""
    y_hat = x @ w
    error = y - y_hat
    gradient = -(1.0 / len(x)) * 2 * x.T @ error
    mse = (error ** 2).mean()
    return gradient, mse


def gradient_descent(
    x,
    y,
    w,
    alpha,
    tolerance: float = 2e-5,
    max_iterations: int = 1000,
    verbose: bool = False,
    print_progress: int = 10,
    history: bool = False,
):
    """MSE gradient descent."""
    iterations = 1
    if verbose:
        print(f"Iteration 0.", "Weights:", [f"{_:.2f}" for _ in w])
    if history:
        ws = []
        mses = []
    while True:
        g, mse = gradient(x, y, w)
        if history:
            ws.append(list(w))
            mses.append(mse)
        w_new = w - alpha * g
        if sum(abs(w_new - w)) < tolerance:
            if verbose:
                print(f"Converged after {iterations} iterations!")
                print("Final weights:", [f"{_:.2f}" for _ in w_new])
            break
        if iterations % print_progress == 0:
            if verbose:
                print(
                    f"Iteration {iterations}.",
                    "Weights:",
                    [f"{_:.2f}" for _ in w_new],
                )
        iterations += 1
        if iterations > max_iterations:
            if verbose:
                print(f"Reached max iterations ({max_iterations})!")
                print("Final weights:", [f"{_:.2f}" for _ in w_new])
            break
        w = w_new
    if history:
        w = w_new
        _, mse = gradient(x, y, w)
        ws.append(list(w))
        mses.append(mse)
        return ws, mses


def stochastic_gradient_descent(
    x,
    y,
    w,
    alpha,
    tolerance: float = 2e-5,
    max_iterations: int = 1000,
    verbose: bool = False,
    print_progress: int = 10,
    history: bool = False,
    seed=None,
):
    """MSE stochastic gradient descent."""
    if seed is not None:
        np.random.seed(seed)
    iterations = 1
    if verbose:
        print(f"Iteration 0.", "Weights:", [f"{_:.2f}" for _ in w])
    if history:
        ws = []
        mses = []
    while True:
        i = np.random.randint(len(x))
        g, mse = gradient(x[i, None], y[i, None], w)
        if history:
            ws.append(list(w))
            mses.append(mse)
        w_new = w - alpha * g
        if sum(abs(w_new - w)) < tolerance:
            if verbose:
                print(f"Converged after {iterations} iterations!")
                print("Final weights:", [f"{_:.2f}" for _ in w_new])
            break
        if iterations % print_progress == 0:
            if verbose:
                print(
                    f"Iteration {iterations}.",
                    "Weights:",
                    [f"{_:.2f}" for _ in w_new],
                )
        iterations += 1
        if iterations > max_iterations:
            if verbose:
                print(f"Reached max iterations ({max_iterations})!")
                print("Final weights:", [f"{_:.2f}" for _ in w_new])
            break
        w = w_new
    if history:
        w = w_new
        _, mse = gradient(x, y, w)
        ws.append(list(w))
        mses.append(mse)
        return ws, mses


def minibatch_gradient_descent(
    x,
    y,
    w,
    alpha,
    batch_size,
    tolerance: float = 2e-5,
    max_iterations: int = 1000,
    verbose: bool = False,
    print_progress: int = 10,
    history: bool = False,
    seed=None,
):
    """MSE stochastic gradient descent."""
    if seed is not None:
        np.random.seed(seed)
    iterations = 1
    if verbose:
        print(f"Iteration 0.", "Weights:", [f"{_:.2f}" for _ in w])
    if history:
        ws = []
        mses = []
    while True:
        i = np.random.choice(
            range(len(x)), batch_size, replace=False
        )  # no replacement
        g, mse = gradient(x[i], y[i], w)
        if history:
            ws.append(list(w))
            mses.append(mse)
        w_new = w - alpha * g
        if sum(abs(w_new - w)) < tolerance:
            if verbose:
                print(f"Converged after {iterations} iterations!")
                print("Final weights:", [f"{_:.2f}" for _ in w_new])
            break
        if iterations % print_progress == 0:
            if verbose:
                print(
                    f"Iteration {iterations}.",
                    "Weights:",
                    [f"{_:.2f}" for _ in w_new],
                )
        iterations += 1
        if iterations > max_iterations:
            if verbose:
                print(f"Reached max iterations ({max_iterations})!")
                print("Final weights:", [f"{_:.2f}" for _ in w_new])
            break
        w = w_new
    if history:
        w = w_new
        _, mse = gradient(x, y, w)
        ws.append(list(w))
        mses.append(mse)
        return ws, mses


def plot_pokemon(
    x, y, y_hat=None, x_range=[10, 130], y_range=[10, 130], dx=20, dy=20
):
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(x=x, y=y, mode="markers", marker=dict(size=10), name="data")
    )
    if y_hat is not None:
        fig.add_trace(
            go.Scatter(
                x=x,
                y=y_hat,
                line_color="red",
                mode="lines",
                line=dict(width=3),
                name="Fitted line",
            )
        )
        width = 550
        title_x = 0.46
    else:
        width = 500
        title_x = 0.5
    fig.update_layout(
        width=width,
        height=500,
        title="Pokemon stats",
        title_x=title_x,
        title_y=0.93,
        xaxis_title="defense",
        yaxis_title="attack",
        margin=dict(t=60),
    )
    fig.update_xaxes(range=x_range, tick0=x_range[0], dtick=dx)
    fig.update_yaxes(range=y_range, tick0=y_range[0], dtick=dy)
    return fig


def plot_logistic(
    x,
    y,
    y_hat=None,
    threshold=None,
    x_range=[-3, 3],
    y_range=[-0.25, 1.25],
    dx=1,
    dy=0.25,
):
    fig = go.Figure()
    fig.update_xaxes(range=x_range, tick0=x_range[0], dtick=dx)
    fig.update_yaxes(range=y_range, tick0=y_range[0], dtick=dy)
    if threshold is not None:
        threshold_ind = (np.abs(y_hat - threshold)).argmin()
        fig.add_trace(
            go.Scatter(
                x=[x_range[0], x_range[0], x[threshold_ind], x[threshold_ind]],
                y=[y_range[0], y_range[1], y_range[1], y_range[0]],
                mode="lines",
                fill="toself",
                fillcolor="limegreen",
                opacity=0.2,
                line=dict(width=0),
                name="0 prediction",
            )
        )
        fig.add_trace(
            go.Scatter(
                x=[x[threshold_ind], x[threshold_ind], x_range[1], x_range[1]],
                y=[y_range[0], y_range[1], y_range[1], y_range[0]],
                mode="lines",
                fill="toself",
                fillcolor="lightsalmon",
                opacity=0.3,
                line=dict(width=0),
                name="1 prediction",
            )
        )
    fig.add_trace(
        go.Scatter(
            x=x,
            y=y,
            mode="markers",
            marker=dict(
                size=10,
                color="#636EFA",
                line=dict(width=1, color="DarkSlateGrey"),
            ),
            name="data",
        )
    )
    if y_hat is not None:
        fig.add_trace(
            go.Scatter(
                x=x,
                y=y_hat,
                line_color="red",
                mode="lines",
                line=dict(width=3),
                name="Fitted line",
            )
        )
        width = 650
        title_x = 0.46
    else:
        width = 600
        title_x = 0.5
    if threshold is not None:
        fig.add_trace(
            go.Scatter(
                x=[x[threshold_ind]],
                y=[threshold],
                mode="markers",
                marker=dict(
                    size=18,
                    color="gold",
                    line=dict(width=1, color="DarkSlateGrey"),
                ),
                name="Threshold",
            )
        )
    fig.update_layout(
        width=width,
        height=500,
        title="Pokemon stats",
        title_x=title_x,
        title_y=0.93,
        xaxis_title="defense",
        yaxis_title="legendary",
        margin=dict(t=60),
    )
    return fig


def plot_gradient_m(x, y, m, slopes, mse, grad_func):
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=slopes,
            y=mse,
            line_color="#1ac584",
            line=dict(width=3),
            mode="lines",
            name="MSE",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=slopes,
            y=mean_squared_error(y, m * x) + grad_func(x, y, m) * (slopes - m),
            line_color="red",
            mode="lines",
            line=dict(width=2),
            name="gradient",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=[m],
            y=[mean_squared_error(y, m * x)],
            line_color="red",
            marker=dict(size=14, line=dict(width=1, color="DarkSlateGrey")),
            mode="markers",
            name=f"slope {m}",
        )
    )
    fig.update_layout(
        width=520,
        height=450,
        xaxis_title="slope (w)",
        yaxis_title="MSE",
        title=f"slope {m:.1f}, gradient {grad_func(x, y, m):.1f}",
        title_x=0.46,
        title_y=0.93,
        margin=dict(t=60),
    )
    fig.update_xaxes(range=[0.4, 1.6], tick0=0.4, dtick=0.2)
    fig.update_yaxes(range=[0, 2500])
    return fig


def plot_grid_search(
    x,
    y,
    slopes,
    loss_function,
    title="Mean Squared Error",
    y_range=[0, 2500],
    y_title="MSE",
):
    mse = []
    df = pd.DataFrame()
    for m in slopes:
        df[f"{m:.2f}"] = m * x  # store predictions for plotting later
        mse.append(loss_function(y, m * x))  # calc MSE
    mse = pd.DataFrame({"slope": slopes, "squared_error": mse})
    fig = make_subplots(
        rows=1, cols=2, subplot_titles=("Data & Fitted Line", title)
    )
    fig.add_trace(
        go.Scatter(x=x, y=y, mode="markers", marker=dict(size=10), name="Data"),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Scatter(
            x=x,
            y=df.iloc[:, 0],
            line_color="red",
            mode="lines",
            line=dict(width=3),
            name="Fitted line",
        ),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Scatter(
            x=mse["slope"],
            y=mse["squared_error"],
            mode="markers",
            marker=dict(size=7),
            name="MSE",
        ),
        row=1,
        col=2,
    )
    fig.add_trace(
        go.Scatter(
            x=mse.iloc[[0]]["slope"],
            y=mse.iloc[[0]]["squared_error"],
            line_color="red",
            mode="markers",
            marker=dict(size=14, line=dict(width=1, color="DarkSlateGrey")),
            name="MSE for line",
        ),
        row=1,
        col=2,
    )
    fig.update_layout(width=900, height=475)
    fig.update_xaxes(
        range=[10, 130],
        tick0=10,
        dtick=20,
        row=1,
        col=1,
        title="defense",
        title_standoff=0,
    )
    fig.update_xaxes(
        range=[0.3, 1.6],
        tick0=0.3,
        dtick=0.2,
        row=1,
        col=2,
        title="slope",
        title_standoff=0,
    )
    fig.update_yaxes(
        range=[10, 130],
        tick0=10,
        dtick=20,
        row=1,
        col=1,
        title="attack",
        title_standoff=0,
    )
    fig.update_yaxes(
        range=y_range, row=1, col=2, title=y_title, title_standoff=0
    )
    frames = [
        dict(
            name=f"{slope:.2f}",
            data=[
                go.Scatter(x=x, y=y),
                go.Scatter(x=x, y=df[f"{slope:.2f}"]),
                go.Scatter(x=mse["slope"], y=mse["squared_error"]),
                go.Scatter(
                    x=mse.iloc[[n]]["slope"], y=mse.iloc[[n]]["squared_error"]
                ),
            ],
            traces=[0, 1, 2, 3],
        )
        for n, slope in enumerate(slopes)
    ]

    sliders = [
        {
            "currentvalue": {
                "font": {"size": 16},
                "prefix": "slope: ",
                "visible": True,
            },
            "pad": {"b": 10, "t": 30},
            "steps": [
                {
                    "args": [
                        [f"{slope:.2f}"],
                        {
                            "frame": {
                                "duration": 0,
                                "easing": "linear",
                                "redraw": False,
                            },
                            "transition": {"duration": 0, "easing": "linear"},
                        },
                    ],
                    "label": f"{slope:.2f}",
                    "method": "animate",
                }
                for slope in slopes
            ],
        }
    ]
    fig.update(frames=frames), fig.update_layout(sliders=sliders)
    return fig


def plot_grid_search_2d(x, y, slopes, intercepts):
    mse = np.zeros((len(slopes), len(intercepts)))
    for i, slope in enumerate(slopes):
        for j, intercept in enumerate(intercepts):
            mse[i, j] = mean_squared_error(y, x * slope + intercept)
    fig = make_subplots(
        rows=1,
        cols=2,
        subplot_titles=("Surface Plot", "Contour Plot"),
        specs=[[{"type": "surface"}, {"type": "contour"}]],
    )
    fig.add_trace(
        go.Surface(
            z=mse, x=intercepts, y=slopes, name="", colorscale="viridis"
        ),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Contour(
            z=mse,
            x=intercepts,
            y=slopes,
            name="",
            showscale=False,
            colorscale="viridis",
        ),
        row=1,
        col=2,
    )
    fig.update_layout(
        scene=dict(
            zaxis=dict(title="MSE"),
            yaxis=dict(title="slope (w<sub>1</sub>)"),
            xaxis=dict(title="intercept (w<sub>0</sub>)"),
        ),
        scene_camera=dict(eye=dict(x=2, y=1.1, z=1.2)),
        margin=dict(l=0, r=0, b=60, t=90),
    )
    fig.update_xaxes(
        title="intercept (w<sub>0</sub>)",
        range=[intercepts.max(), intercepts.min()],
        tick0=intercepts.max(),
        row=1,
        col=2,
        title_standoff=0,
    )
    fig.update_yaxes(
        title="slope (w<sub>1</sub>)",
        range=[slopes.min(), slopes.max()],
        tick0=slopes.min(),
        row=1,
        col=2,
        title_standoff=0,
    )
    fig.update_layout(width=900, height=475, margin=dict(t=60))
    return fig


def plot_gradient_descent(x, y, w, alpha, tolerance=2e-4, max_iterations=5000):
    if x.ndim == 1:
        x = np.array(x).reshape(-1, 1)
    slopes, losses = gradient_descent(
        x, y, [w], alpha, tolerance, max_iterations, history=True
    )
    slopes = [_[0] for _ in slopes]
    x = x.flatten()
    mse = []
    df = pd.DataFrame()
    for w in slopes:
        df[f"{w:.2f}"] = w * x  # store predictions for plotting later
    slope_range = np.arange(0.4, 1.65, 0.05)
    for w in slope_range:
        mse.append(mean_squared_error(y, w * x))  # calc MSE
    mse = pd.DataFrame({"slope": slope_range, "squared_error": mse})

    fig = make_subplots(
        rows=1,
        cols=2,
        subplot_titles=("Data & Fitted Line", "Mean Squared Error"),
    )
    fig.add_trace(
        go.Scatter(x=x, y=y, mode="markers", marker=dict(size=10), name="Data"),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Scatter(
            x=x,
            y=df.iloc[:, 0],
            line_color="red",
            mode="lines",
            line=dict(width=3),
            name="Fitted line",
        ),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Scatter(
            x=mse["slope"],
            y=mse["squared_error"],
            line_color="#1ac584",
            line=dict(width=3),
            mode="lines",
            name="MSE",
        ),
        row=1,
        col=2,
    )
    fig.add_trace(
        go.Scatter(
            x=np.array(slopes[:1]),
            y=np.array(losses[:1]),
            line_color="salmon",
            line=dict(width=4),
            marker=dict(size=10, line=dict(width=1, color="DarkSlateGrey")),
            mode="markers+lines",
            name="Slope history",
        ),
        row=1,
        col=2,
    )
    fig.add_trace(
        go.Scatter(
            x=np.array(slopes[0]),
            y=np.array(losses[0]),
            line_color="red",
            mode="markers",
            marker=dict(size=18, line=dict(width=1, color="DarkSlateGrey")),
            name="MSE for line",
        ),
        row=1,
        col=2,
    )
    fig.add_trace(
        go.Scatter(
            x=[30.3],
            y=[120],
            mode="text",
            text=f"<b>Slope {slopes[0]:.2f}<b>",
            textfont=dict(size=16, color="red"),
            showlegend=False,
        ),
        row=1,
        col=1,
    )

    fig.update_layout(width=900, height=475, margin=dict(t=60))
    fig.update_xaxes(
        range=[10, 130], tick0=10, dtick=20, title="defense", title_standoff=0, row=1, col=1
    ), fig.update_xaxes(range=[0.4, 1.6], tick0=0.4, dtick=0.2, title="slope (w)", title_standoff=0, row=1, col=2)
    fig.update_yaxes(
        range=[10, 130], tick0=10, dtick=20, title="attack", title_standoff=0, row=1, col=1
    ), fig.update_yaxes(range=[0, 2500], title="MSE", title_standoff=0, row=1, col=2)

    frames = [
        dict(
            name=n,
            data=[
                go.Scatter(x=x, y=y),
                go.Scatter(x=x, y=df[f"{slope:.2f}"]),
                go.Scatter(x=mse["slope"], y=mse["squared_error"]),
                go.Scatter(
                    x=np.array(slopes[: n + 1]),
                    y=np.array(losses[: n + 1]),
                    mode="markers" if n == 0 else "markers+lines",
                ),
                go.Scatter(x=np.array(slopes[n]), y=np.array(losses[n])),
                go.Scatter(text=f"<b>Slope {slope:.2f}<b>"),
            ],
            traces=[0, 1, 2, 3, 4, 5],
        )
        for n, slope in enumerate(slopes)
    ]

    sliders = [
        {
            "currentvalue": {
                "font": {"size": 16},
                "prefix": "Iteration: ",
                "visible": True,
            },
            "pad": {"b": 10, "t": 30},
            "steps": [
                {
                    "args": [
                        [n],
                        {
                            "frame": {
                                "duration": 0,
                                "easing": "linear",
                                "redraw": False,
                            },
                            "transition": {"duration": 0, "easing": "linear"},
                        },
                    ],
                    "label": n,
                    "method": "animate",
                }
                for n in range(len(slopes))
            ],
        }
    ]
    fig.update(frames=frames), fig.update_layout(sliders=sliders)
    return fig


def plot_gradient_descent_2d(
    x,
    y,
    w,
    alpha,
    m_range,
    b_range,
    tolerance=2e-5,
    max_iterations=5000,
    step_size=1,
    markers=False,
    stochastic=False,
    batch_size=None,
    seed=None,
):
    if x.ndim == 1:
        x = np.array(x).reshape(-1, 1)
    if stochastic:
        if batch_size is None:
            weights, losses = stochastic_gradient_descent(
                np.hstack((np.ones((len(x), 1)), x)),
                y,
                w,
                alpha,
                tolerance,
                max_iterations,
                history=True,
                seed=seed,
            )
            title = "Stochastic Gradient Descent"
        else:
            weights, losses = minibatch_gradient_descent(
                np.hstack((np.ones((len(x), 1)), x)),
                y,
                w,
                alpha,
                batch_size,
                tolerance,
                max_iterations,
                history=True,
                seed=seed,
            )
            title = "Minibatch Gradient Descent"
    else:
        weights, losses = gradient_descent(
            np.hstack((np.ones((len(x), 1)), x)),
            y,
            w,
            alpha,
            tolerance,
            max_iterations,
            history=True,
        )
        title = "Gradient Descent"
    weights = np.array(weights)
    intercepts, slopes = weights[:, 0], weights[:, 1]
    mse = np.zeros((len(m_range), len(b_range)))
    for i, slope in enumerate(m_range):
        for j, intercept in enumerate(b_range):
            mse[i, j] = mean_squared_error(y, x * slope + intercept)

    fig = make_subplots(
        rows=1,
        subplot_titles=[title],  # . Iterations = {len(intercepts) - 1}."],
    )
    fig.add_trace(
        go.Contour(z=mse, x=b_range, y=m_range, name="", colorscale="viridis")
    )
    mode = "markers+lines" if markers else "lines"
    fig.add_trace(
        go.Scatter(
            x=intercepts[::step_size],
            y=slopes[::step_size],
            mode=mode,
            line=dict(width=2.5),
            line_color="coral",
            marker=dict(
                opacity=1,
                size=np.linspace(19, 1, len(intercepts[::step_size])),
                line=dict(width=2, color="DarkSlateGrey"),
            ),
            name="Descent Path",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=[intercepts[0]],
            y=[slopes[0]],
            mode="markers",
            marker=dict(size=20, line=dict(width=2, color="DarkSlateGrey")),
            marker_color="orangered",
            name="Start",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=[intercepts[-1]],
            y=[slopes[-1]],
            mode="markers",
            marker=dict(size=20, line=dict(width=2, color="DarkSlateGrey")),
            marker_color="yellowgreen",
            name="End",
        )
    )
    fig.update_layout(
        width=700,
        height=600,
        margin=dict(t=60),
        legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01),
    )
    fig.update_xaxes(
        title="intercept (w<sub>0</sub>)",
        range=[b_range.min(), b_range.max()],
        tick0=b_range.min(),
        row=1,
        col=1,
        title_standoff=0,
    )
    fig.update_yaxes(
        title="slope (w<sub>1</sub>)",
        range=[m_range.min(), m_range.max()],
        tick0=m_range.min(),
        row=1,
        col=1,
        title_standoff=0,
    )
    return fig


def plot_random_gradients(x, y, w, num_of_points=10, seed=None):
    if seed is not None:
        np.random.seed(seed)
    randn = np.random.randint(0, len(x), num_of_points)
    fig = go.Figure()
    slopes = np.arange(-30, 60, 1)
    mse = np.array([mean_squared_error(y, m * x) for m in slopes])
    fig.add_trace(
        go.Scatter(
            x=slopes,
            y=mse,
            line_color="#1ac584",
            line=dict(width=3),
            mode="lines",
            name="MSE",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=slopes,
            y=mean_squared_error(y, w * x)
            + gradient(x[randn[0], None], y[randn[0], None], [w])[0]
            * (slopes - w),
            line_color="black",
            mode="lines",
            line=dict(width=2),
            name="gradient for one data point",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=slopes,
            y=mean_squared_error(y, w * x)
            + gradient(x[:, None], y, [w])[0][0] * (slopes - w),
            line_color="red",
            mode="lines",
            line=dict(width=3),
            name="gradient for all data",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=[w],
            y=[mean_squared_error(y, w * x)],
            line_color="red",
            marker=dict(size=14, line=dict(width=2, color="DarkSlateGrey")),
            mode="markers",
            name=f"slope {w}",
        )
    )

    frames = [
        dict(
            name=f"{n}",
            data=[
                go.Scatter(x=slopes, y=mse),
                go.Scatter(
                    x=slopes,
                    y=mean_squared_error(y, w * x)
                    + gradient(x[n, None], y[n, None], [w])[0] * (slopes - w),
                ),
                go.Scatter(
                    x=slopes,
                    y=mean_squared_error(y, w * x)
                    + gradient(x[:, None], y, [w])[0][0] * (slopes - w),
                ),
                go.Scatter(x=[w], y=[mean_squared_error(y, w * x)]),
            ],
            traces=[0, 1, 2, 3],
        )
        for n in randn
    ]

    sliders = [
        {
            "currentvalue": {
                "font": {"size": 16},
                "prefix": "Data point: ",
                "visible": True,
            },
            "pad": {"b": 10, "t": 30},
            "steps": [
                {
                    "args": [
                        [f"{n}"],
                        {
                            "frame": {
                                "duration": 0,
                                "easing": "linear",
                                "redraw": False,
                            },
                            "transition": {"duration": 0, "easing": "linear"},
                        },
                    ],
                    "label": f"{n}",
                    "method": "animate",
                }
                for n in randn
            ],
        }
    ]
    fig.update_layout(
        width=650,
        height=500,
        xaxis_title="slope",
        yaxis_title="MSE",
        title_x=0.46,
        title_y=0.93,
        margin=dict(b=20, t=60),
    )
    fig.update(frames=frames), fig.update_layout(sliders=sliders)
    fig.update_xaxes(range=[-30, 60], dtick=10, title_standoff=0)
    fig.update_yaxes(range=[6500, 9000], dtick=500, title_standoff=0)
    return fig


def plot_gradient_histogram(x, y, w):
    gradients = [
        gradient(x[i, None], y[i, None], [w])[0] for i in range(len(x))
    ]
    fig = go.Figure(data=[go.Histogram(x=gradients)])
    fig.update_layout(
        width=600,
        height=400,
        margin=dict(t=60),
        title=f"Histogram of gradients at slope {w}",
        title_x=0.5,
        title_y=0.9,
    )
    fig.update_xaxes(title="gradient", title_standoff=0)
    fig.update_yaxes(title="frequency", title_standoff=0)
    return fig


def plot_minibatch_gradients(x, y, w, batch_sizes=[1], seed=None):
    if seed is not None:
        np.random.seed(seed)
    batches = []
    for _ in batch_sizes:
        batches.append(np.random.choice(range(len(x)), _, replace=False))
    fig = go.Figure()
    slopes = np.arange(-30, 60, 1)
    mse = np.array([mean_squared_error(y, m * x) for m in slopes])
    fig.add_trace(
        go.Scatter(
            x=slopes,
            y=mse,
            line_color="#1ac584",
            line=dict(width=3),
            mode="lines",
            name="MSE",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=slopes,
            y=mean_squared_error(y, w * x)
            + gradient(x[batches[0]], y[batches[0]], [w])[0] * (slopes - w),
            line_color="black",
            mode="lines",
            line=dict(width=2),
            name=f"gradient for batch",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=slopes,
            y=mean_squared_error(y, w * x)
            + gradient(x[:, None], y, [w])[0][0] * (slopes - w),
            line_color="red",
            mode="lines",
            line=dict(width=3),
            name="gradient for all data",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=[w],
            y=[mean_squared_error(y, w * x)],
            line_color="red",
            marker=dict(size=14, line=dict(width=2, color="DarkSlateGrey")),
            mode="markers",
            name=f"slope {w}",
        )
    )

    frames = [
        dict(
            name=f"{len(batch)}",
            data=[
                go.Scatter(x=slopes, y=mse),
                go.Scatter(
                    x=slopes,
                    y=mean_squared_error(y, w * x)
                    + gradient(x[batch, None], y[batch], [w])[0] * (slopes - w),
                ),
                go.Scatter(
                    x=slopes,
                    y=mean_squared_error(y, w * x)
                    + gradient(x[:, None], y, [w])[0][0] * (slopes - w),
                ),
                go.Scatter(x=[w], y=[mean_squared_error(y, w * x)]),
            ],
            traces=[0, 1, 2, 3],
        )
        for batch in batches
    ]

    sliders = [
        {
            "currentvalue": {
                "font": {"size": 16},
                "prefix": "Batch size: ",
                "visible": True,
            },
            "pad": {"b": 10, "t": 30},
            "steps": [
                {
                    "args": [
                        [f"{len(batch)}"],
                        {
                            "frame": {
                                "duration": 0,
                                "easing": "linear",
                                "redraw": False,
                            },
                            "transition": {"duration": 0, "easing": "linear"},
                        },
                    ],
                    "label": f"{len(batch)}",
                    "method": "animate",
                }
                for batch in batches
            ],
        }
    ]
    fig.update_layout(
        width=650,
        height=500,
        xaxis_title="slope",
        yaxis_title="MSE",
        title_x=0.46,
        title_y=0.93,
        margin=dict(b=20, t=60),
    )
    fig.update(frames=frames), fig.update_layout(sliders=sliders)
    fig.update_xaxes(range=[-30, 60], dtick=10, title_standoff=0)
    fig.update_yaxes(range=[6500, 9000], dtick=500, title_standoff=0)
    return fig


def plot_panel(f1, f2, f3):
    fig = make_subplots(
        rows=1,
        cols=3,
        subplot_titles=(
            "Gradient Descent",
            "Stochastic Gradient Descent",
            "Minibatch Gradient Descent",
        ),
    )
    for n, f in enumerate((f1, f2, f3)):
        for _ in range(len(f.data)):
            fig.add_trace(f.data[_], row=1, col=n + 1)
        fig.update_xaxes(range=[-40, 138], row=1, col=n + 1)
        fig.update_yaxes(range=[-30, 58], row=1, col=n + 1)
    fig.update_layout(
        width=1000, height=400, margin=dict(t=60), showlegend=False
    )
    return fig


def plot_regression(
    x, y, y_hat=None, x_range=[-3, 3], y_range=[-150, 150], dx=1, dy=30
):
    if x.ndim > 1:
        x = np.squeeze(x)
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(x=x, y=y, mode="markers", marker=dict(size=10), name="data")
    )
    if y_hat is not None:
        if y_hat.ndim > 1:
            y_hat = np.squeeze(y_hat)
        fig.add_trace(
            go.Scatter(
                x=x,
                y=y_hat,
                line_color="red",
                mode="lines",
                line=dict(width=3),
                name="Fitted line",
            )
        )
        width = 550
        title_x = 0.46
    else:
        width = 500
        title_x = 0.5
    fig.update_layout(
        width=width,
        height=500,
        # title="Pokemon stats",
        title_x=title_x,
        title_y=0.93,
        xaxis_title="x",
        yaxis_title="y",
        margin=dict(t=60),
    )
    fig.update_xaxes(range=x_range, tick0=x_range[0], dtick=dx)
    fig.update_yaxes(range=y_range, tick0=y_range[0], dtick=dy)
    return fig


def plot_nodes(
    x,
    y,
    model,
    x_range=[-3, 3],
    y_range=[-25, 25],
    dx=1,
    dy=5,
):
    m = model.main.state_dict()
    y_nodes = (
        sigmoid((x * m["0.weight"].numpy()).T + m["0.bias"].numpy())
        * m["2.weight"].numpy()
    )
    nodes = [str(_) for _ in range(1, y_nodes.shape[1] + 1)] + ["Bias"]
    nodes = [nodes[: n + 1] for n, _ in enumerate(nodes)]
    y_nodes = np.concatenate(
        (y_nodes, np.ones((len(y_nodes), 1)) * m["2.bias"].numpy()), axis=1
    )
    fig = make_subplots(
        rows=1,
        cols=2,
        subplot_titles=("Node Decomposition", "Interactive Re-composition"),
    )
    # PLOT 1
    for i in range(y_nodes.shape[1] - 1):
        fig.add_trace(
            go.Scatter(
                x=x,
                y=y_nodes[:, i],
                mode="lines",
                name=f"Node {i+1}",
            ),
            row=1,
            col=1,
        )
    fig.add_trace(
        go.Scatter(
            x=x,
            y=y_nodes[:, -1],
            mode="lines",
            name="Output Bias",
        ),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Scatter(
            x=x,
            y=y,
            mode="lines",
            name="Prediction",
            line_color="red",
            line=dict(width=4, dash="dot"),
        ),
        row=1,
        col=1,
    )

    # SUBPLOT 2
    fig.add_trace(
        go.Scatter(
            x=x,
            y=y,
            mode="lines",
            showlegend=False,
            line_color="red",
            line=dict(width=4, dash="dot"),
        ),
        row=1,
        col=2,
    )
    fig.add_trace(
        go.Scatter(
            x=x,
            y=y_nodes[:, :1].sum(axis=1),
            line=dict(width=2),
            mode="lines",
            line_color="darkslategrey",
            name="Node Sum",
        ),
        row=1,
        col=2,
    )

    frames = [
        dict(
            name=f"{', '.join(node)}",
            data=[
                go.Scatter(x=x, y=y_nodes[:, i])
                for i in range(y_nodes.shape[1] - 1)
            ]
            + [
                go.Scatter(x=x, y=y_nodes[:, -1]),
                go.Scatter(x=x, y=y),
                go.Scatter(x=x, y=y),
                go.Scatter(x=x, y=y_nodes[:, : n + 1].sum(axis=1)),
            ],
            traces=list(range(y_nodes.shape[1] - 1 + 4)),
        )
        for n, node in enumerate(nodes)
    ]

    sliders = [
        {
            "currentvalue": {
                "font": {"size": 16},
                "prefix": "Sum of: ",
                "visible": True,
            },
            "pad": {"b": 10, "t": 40, "l": 430},
            "steps": [
                {
                    "args": [
                        [f"{', '.join(node)}"],
                        {
                            "frame": {
                                "duration": 0,
                                "easing": "linear",
                                "redraw": False,
                            },
                            "transition": {
                                "duration": 0,
                                "easing": "linear",
                            },
                        },
                    ],
                    "label": f"{', '.join(node)}",
                    "method": "animate",
                }
                for node in nodes
            ],
        }
    ]
    fig.update(frames=frames), fig.update_layout(sliders=sliders)

    fig.update_layout(width=1000, height=500, margin=dict(t=60))
    fig.update_xaxes(range=x_range, tick0=x_range[0], dtick=dx, row=1, col=1)
    fig.update_xaxes(range=x_range, tick0=x_range[0], dtick=dx, row=1, col=2)
    fig.update_yaxes(range=y_range, tick0=y_range[0], dtick=dy, row=1, col=1)
    fig.update_yaxes(range=y_range, tick0=y_range[0], dtick=dy, row=1, col=2)
    return fig


def plot_activations(x, functions, rows=2, cols=3, width=800, height=500):
    names = [_.__name__ for _ in functions]
    fig = make_subplots(
        rows=rows,
        cols=cols,
        subplot_titles=names,
        # horizontal_spacing=None,
        vertical_spacing=0.12,
    )
    i, j = 1, 1
    for f in functions:
        fig.add_trace(
            go.Scatter(
                x=x,
                y=f()(x),
                mode="lines",
                name=f"{f.__name__}",
                line=dict(width=3),
            ),
            row=i,
            col=j,
        )
        j += 1
        if j > cols:
            i += 1
            j = 1
    fig.update_layout(
        width=width, height=height, margin=dict(b=30, t=60), showlegend=False
    )
    return fig


def plot_classification_2d(X, y, model=None, transform="Sigmoid"):
    c = [
        "#636EFA",
        "#EF553B",
        "#00CC96",
        "#AB63FA",
        "#FFA15A",
        "#19D3F3",
        "#FF6692",
        "#B6E880",
        "#FF97FF",
        "#FECB52",
    ]
    yy = [c[_] for _ in y]
    fig = go.Figure()
    if model is None:
        for class_ in np.unique(y):
            mask = y == class_
            fig.add_trace(
                go.Scatter(
                    x=X[mask, 0],
                    y=X[mask, 1],
                    mode="markers",
                    marker=dict(size=10),
                    name=f"Class {class_}",
                )
            )
        fig.update_layout(
            width=500,
            height=500,
            title="Binary Classification",
            title_x=0.5,
            title_y=0.93,
            xaxis_title="x",
            yaxis_title="y",
            margin=dict(t=60),
            legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01),
        )
        fig.update_xaxes(range=[-1.5, 1.5], tick0=-1.5, dtick=0.5)
        fig.update_yaxes(range=[-1.5, 1.5], tick0=-1.5, dtick=0.5)
    else:
        yy = [c[_] for _ in y]
        x_t = torch.tensor(X, dtype=torch.float32, requires_grad=True)
        xx1, xx2 = torch.meshgrid(
            torch.linspace(-1.5, 1.5, 25),
            torch.linspace(-1.5, 1.5, 25),
        )
        if transform == "Sigmoid":
            Z = model(
                torch.cat((xx1.reshape(1, -1).T, xx2.reshape(1, -1).T), dim=1)
            ).reshape(xx1.shape)
            Z = torch.nn.Sigmoid()(Z)
            fig = go.Figure(
                data=go.Contour(
                    z=Z.detach(),
                    x=xx1[:, 0].detach(),
                    y=xx2[0, :].detach(),
                    colorscale="rdbu",
                    reversescale=True,
                    name="Predictions",
                    contours=dict(start=0, end=1, size=0.1),
                    colorbar=dict(title="Probability of Class 1"),
                )
            )
        elif transform == "Softmax":
            Z = torch.nn.Softmax(dim=0)(
                model(
                    torch.cat(
                        (xx1.reshape(1, -1).T, xx2.reshape(1, -1).T), dim=1
                    )
                )
            ).argmax(dim=1)
            ZZ = [c[_] for _ in Z]
            fig.add_trace(
                go.Scatter(
                    x=xx1.reshape(1, -1).T[:, 0].detach(),
                    y=xx2.reshape(1, -1).T[:, 0].detach(),
                    mode="markers",
                    marker=dict(size=15, color=ZZ, opacity=0.3),
                    showlegend=False,
                )
            )
        for class_ in np.unique(y):
            mask = y == class_
            fig.add_trace(
                go.Scatter(
                    x=X[mask, 0],
                    y=X[mask, 1],
                    mode="markers",
                    marker=dict(
                        size=10,
                        color=c[class_],
                        line=dict(width=1, color="DarkSlateGrey"),
                    ),
                    name=f"Class {class_}",
                ),
            )
        fig.update_layout(
            width=550,
            height=500,
            title="Binary Classification",
            title_x=0.5,
            title_y=0.93,
            xaxis_title="x",
            yaxis_title="y",
            margin=dict(t=60),
            legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01),
        )
        fig.update_xaxes(range=[-1.5, 1.5], tick0=-1.5, dtick=0.5)
        fig.update_yaxes(range=[-1.5, 1.5], tick0=-1.5, dtick=0.5)
    return fig


def plot_loss(train_loss, validation_loss=None, train_acc=None, valid_acc=None):
    # Make figure
    if train_acc is not None:
        fig = make_subplots(rows=1, cols=2)
    else:
        fig = go.Figure()
    # Add losses
    fig.add_trace(
        go.Scatter(x=np.arange(len(train_loss)), y=train_loss, mode="lines", line=dict(width=2), name="Training loss")
    )
    if validation_loss is not None:
        fig.add_trace(
            go.Scatter(
                x=np.arange(len(validation_loss)),
                y=validation_loss,
                mode="lines",
                line=dict(width=2),
                name="Validation loss",
            )
        )
        width = 400
    else:
        width = 400
    # add accuracy
    if train_acc is not None:
        fig.add_trace(
            go.Scatter(
                x=np.arange(len(train_acc)),
                y=train_acc,
                mode="lines",
                line=dict(width=2),
                name="Training accuracy",
            ),
            row=1,
            col=2
        )
        width = 550
        title_x = 0.46
        if valid_acc is not None:
            fig.add_trace(
                go.Scatter(
                    x=np.arange(len(valid_acc)),
                    y=valid_acc,
                    mode="lines",
                    line=dict(width=2),
                    name="Validation accuracy",
                ),
                row=1,
                col=2
            )
        fig.update_layout(
            width=800,
            height=400,
            title_y=0.93,
            margin=dict(t=60),
        )
        fig.update_xaxes(title_text="Epochs", title_standoff=0, row=1, col=1)
        fig.update_xaxes(title_text="Epochs", title_standoff=0, row=1, col=2)
        fig.update_yaxes(title_text="Loss", title_standoff=0, row=1, col=2)
        fig.update_yaxes(title_text="Accuracy", title_standoff=0, row=1, col=2)

    else:
        fig.update_layout(
            width=width,
            height=400,
            title_y=0.93,
            xaxis_title="Epochs",
            yaxis_title="Loss",
            margin=dict(t=60),
            legend=dict(
                yanchor="top",
                y=0.99,
                xanchor="left",
                x=0.434
            )
        )
    return fig


def plot_image(sample_batch, rgb=False):
    plt.figure(figsize=(10, 8))
    plt.axis("off")
    plt.title("Sample Training Images")
    if rgb:
        plt.imshow(np.transpose(utils.make_grid(sample_batch, padding=1, normalize=True),(1,2,0)));
    else:
        plt.imshow(np.transpose(utils.make_grid(sample_batch[0], padding=1, normalize=True),(1,2,0)));


def plot_bitmoji(image, label):
    plt.figure(figsize=(4, 4))
    plt.axis("off")
    plt.title(f"Prediction: {['not_tom', 'tom'][label]}", pad=10)
    plt.imshow(image[0, 0], cmap='gray');


def plot_conv(image, filter):
    """Plot convs with matplotlib."""
    d = filter.shape[-1]
    conv = torch.nn.Conv2d(1, 1, kernel_size=(d, d), padding=1)
    conv.weight[:] = filter
    fig, (ax1, ax2) = plt.subplots(figsize=(8, 4), ncols=2)
    ax1.imshow(image, cmap='gray')
    ax1.axis('off')
    ax1.set_title("Original")
    ax2.imshow(conv(image[None, None, :]).detach().squeeze(), cmap='gray')
    ax2.set_title("Filtered")
    ax2.axis('off')
    plt.tight_layout();

def plot_convs(image, conv_layer, axis=False):
    """Plot convs with matplotlib. Sorry for this lazy code :D"""
    filtered_image = conv_layer(image[None, None, :])
    n = filtered_image.shape[1]
    if n == 1:
        fig, (ax1, ax2) = plt.subplots(figsize=(8, 4), ncols=2)
        ax1.imshow(image, cmap='gray')
        ax1.set_title("Original")
        ax2.imshow(filtered_image.detach().squeeze(), cmap='gray')
        ax2.set_title("Filter 1")
        ax1.grid(False)
        ax2.grid(False)
        if not axis:
            ax1.axis(False)
            ax2.axis(False)
        plt.tight_layout();
    elif n == 2:
        filtered_image_1 = filtered_image[:,0,:,:]
        filtered_image_2 = filtered_image[:,1,:,:]
        fig, (ax1, ax2, ax3) = plt.subplots(figsize=(10, 4), ncols=3)
        ax1.imshow(image, cmap='gray')
        ax1.set_title("Original")
        ax2.imshow(filtered_image_1.detach().squeeze(), cmap='gray')
        ax2.set_title("Filter 1")
        ax3.imshow(filtered_image_2.detach().squeeze(), cmap='gray')
        ax3.set_title("Filter 2")
        ax1.grid(False)
        ax2.grid(False)
        ax3.grid(False)
        if not axis:
            ax1.axis(False)
            ax2.axis(False)
            ax3.axis(False)
        plt.tight_layout();
    elif n == 3:
        filtered_image_1 = filtered_image[:,0,:,:]
        filtered_image_2 = filtered_image[:,1,:,:]
        filtered_image_3 = filtered_image[:,2,:,:]
        fig, (ax1, ax2, ax3, ax4) = plt.subplots(figsize=(12, 4), ncols=4)
        ax1.imshow(image, cmap='gray')
        ax1.set_title("Original")
        ax2.imshow(filtered_image_1.detach().squeeze(), cmap='gray')
        ax2.set_title("Filter 1")
        ax3.imshow(filtered_image_2.detach().squeeze(), cmap='gray')
        ax3.set_title("Filter 2")
        ax4.imshow(filtered_image_3.detach().squeeze(), cmap='gray')
        ax4.set_title("Filter 3")
        ax1.grid(False)
        ax2.grid(False)
        ax3.grid(False)
        ax4.grid(False)
        if not axis:
            ax1.axis(False)
            ax2.axis(False)
            ax3.axis(False)
            ax4.axis(False)
        plt.tight_layout();

def plot_scatter3D(X, y):
    fig = go.Figure(data=[go.Scatter3d(x=X[:,0], y=X[:,1], z=X[:,2], mode='markers', marker=dict(size=4, color=y, colorscale='rdylbu'))])
    fig.update_layout(margin=dict(b=30, t=60), showlegend=False, scene=dict(
        zaxis=dict(title="X3"),
        yaxis=dict(title="X2"),
        xaxis=dict(title="X1"),
    )
    )
    return fig

def plot_scatter2D(X, y):
    fig = go.Figure(data=[go.Scatter(x=X[:,0].detach(), y=X[:,1].detach(), mode='markers', marker=dict(color=y, size=4, colorscale='rdylbu'))])
    fig.update_layout(width=500, height=400, margin=dict(b=30, t=60), showlegend=False
)
    fig.update_xaxes(title="Encoded feature 1", title_standoff=0)
    fig.update_yaxes(title="Encoded feature 2", title_standoff=0)
    return fig


def plot_eights(X, noise=0.5):
    fig, (ax1, ax2) = plt.subplots(2, 3, figsize=(7, 5))
    noise = noise * torch.randn(*X[1, 0, :, :].shape)
    ax1[0].imshow(X[1, 0, :, :], cmap="gray")
    ax2[0].imshow(X[1, 0, :, :] + noise, cmap="gray")
    ax1[1].imshow(X[2, 0, :, :], cmap="gray")
    ax1[1].set_title("Original 8's")
    ax2[1].imshow(X[2, 0, :, :] + noise, cmap="gray")
    ax2[1].set_title("Noisy 8's")
    ax1[2].imshow(X[3, 0, :, :], cmap="gray")
    ax2[2].imshow(X[3, 0, :, :] + noise, cmap="gray")
    plt.tight_layout();


def plot_eight_pair(input_8, output_8):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(6, 4))
    ax1.imshow(input_8.squeeze().detach(), cmap="gray")
    ax1.set_title("Input")
    ax2.imshow(output_8.squeeze().detach(), cmap="gray")
    ax2.set_title("Output")
    plt.tight_layout();


def plot_gan_loss(dis_loss, gen_loss):
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(x=np.arange(1, 1+len(dis_loss)), y=dis_loss, mode="lines", line=dict(width=2), name="Discriminator loss")
    )
    fig.add_trace(
        go.Scatter(
            x=np.arange(1, 1+len(gen_loss)),
            y=gen_loss,
            mode="lines",
            line=dict(width=2),
            name="Generator loss",
        )
    )
    fig.update_layout(
        width=550,
        height=400,
        title_y=0.93,
        xaxis_title="Epochs",
        yaxis_title="Loss",
        margin=dict(t=60),
        legend=dict(
            yanchor="top",
            y=0.99,
            xanchor="left",
            x=0.434
        )
    )
    return fig

In [2]:
# imports
import sys
import numpy as np
import pandas as pd
import torch
from torchsummary import summary
from torch import nn, optim
from torchvision import transforms, datasets, utils
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import make_regression, make_circles, make_blobs, fetch_openml
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression

# 1. PyTorch의 `Tensor`

PyTorch는 과학 계산을 위한 Python 기반 도구로, 다음과 같은 주요 기능을 제공한다:

- `torch.Tensor`: NumPy와 유사한 n차원 배열로, GPU에서 실행 가능하다.
- 계산 그래프 및 자동 미분 엔진: 신경망을 구축하고 학습시키는 데 사용된다.

## 1.1. `ndarray` vs `tensor`

`torch.Tensor`: GPU 및 PyTorch의 computational graph와 통합되도록 설계되었다.
- `torch.tensor()`를 사용하여 텐서를 생성할 수 있으며, 사용법은 NumPy의 ndarray와 비슷하다.

In [3]:
# TODO 1:
tensor_1 = torch.tensor([1, 2, 3])

In [4]:
tensor_2 = torch.tensor([1, 2, 3], dtype=torch.float32)
tensor_3 = torch.tensor(np.array([1, 2, 3]))

for t in [tensor_1, tensor_2, tensor_3]:
    print(f"{t}, dtype: {t.dtype}")

tensor([1, 2, 3]), dtype: torch.int64
tensor([1., 2., 3.]), dtype: torch.float32
tensor([1, 2, 3]), dtype: torch.int64


In [5]:
# zeroes
torch.zeros(2, 2)

tensor([[0., 0.],
        [0., 0.]])

In [6]:
# ones
torch.ones(2, 2)

tensor([[1., 1.],
        [1., 1.]])

In [7]:
# random normal
torch.randn(3, 2)

tensor([[ 0.7184, -1.4936],
        [-0.6761, -0.7584],
        [-0.1686,  1.6210]])

In [8]:
# TODO 2: rand uniform
torch.rand(2, 3, 2)

tensor([[[0.5636, 0.3571],
         [0.2857, 0.7381],
         [0.7887, 0.6892]],

        [[0.9711, 0.9077],
         [0.9444, 0.4997],
         [0.5369, 0.6002]]])

In [9]:
# TODO 3:
x = torch.rand(2, 3, 2, 2)
x.shape

torch.Size([2, 3, 2, 2])

In [10]:
# TODO 4:
x.ndim

4

## 1.2. Tensors and Data Types

- 신경망을 구현할 경우 PyTorch 수천 또는 수백만 개의 floating point 계산을 수행하기 때문에 데이터 유형에 따라 메모리 및 속도에 미치는 영향이 다르다.
- `float32`와 같은 작은 데이터 타입을 사용하여 계산 속도를 크게 높이고 메모리 요구량을 줄일 수 있다.
- PyTorch의 기본 floating point 데이터 타입은 `float32` (NumPy 디폴트는 `float64`)

In [11]:
print(np.array([3.14159]).dtype)
print(torch.tensor([3.14159]).dtype)

float64
torch.float32


In [12]:
print(torch.tensor([3.14159], dtype=torch.float64).dtype)

torch.float64


## 1.3. Operations on Tensors

- 텐서는 ndarray처럼 작동하며, NumPy 메서드와 동일한 이름의 메서드를 사용하여 텐서의 연산과 처리를 할 수 있다.
- broadcasting: 서로 다른 크기의 텐서를 자동으로 확장하여 연산을 수행할 수 있도록 돕는 기능

<img src="https://miro.medium.com/v2/resize:fit:1400/format:webp/1*MItSaXr1p6UEfHuM43Ubzw.png" width=500>

- broadcasting 규칙:
  - 두 텐서의 각 차원에서 크기가 같거나 둘 중 하나가 1이어야 broadcasting이 가능하다.
  - 차원 수가 다르면 더 작은 텐서의 앞쪽에 1 크기의 차원을 추가하여 맞춰야 한다.

- broadcasting이 되지 않는 경우:
  ```python
    a = torch.randn(2, 3)  # Shape: (2, 3)
    b = torch.randn(3, 2)  # Shape: (3, 2)
  ```
- 해결 방법:
  ```python
    b_t = b.T  # b를 전치하여 크기 맞추기 (2, 3)
    a + b_t  # Broadcasting 성공
  ```

In [13]:
# TODO 5:
a = torch.rand(1, 3)
b = torch.rand(3, 1)

a + b

tensor([[1.4373, 0.8325, 1.4070],
        [0.9911, 0.3863, 0.9608],
        [1.6068, 1.0020, 1.5765]])

In [14]:
# TODO 6:
a * b

tensor([[0.4883, 0.1552, 0.4716],
        [0.0926, 0.0294, 0.0895],
        [0.6385, 0.2029, 0.6167]])

In [15]:
a.mean()

tensor(0.6749)

In [16]:
a.sum()

tensor(2.0248)

## 1.4. Indexing

In [17]:
X = torch.rand(5, 2)
print(X)

tensor([[0.7147, 0.1026],
        [0.8557, 0.6395],
        [0.0897, 0.6182],
        [0.8083, 0.6129],
        [0.9317, 0.8399]])


In [18]:
# TODO 7:
print(X[0, :])
print(X[:, 0])

tensor([0.7147, 0.1026])
tensor([0.7147, 0.8557, 0.0897, 0.8083, 0.9317])


## 1.5. NumPy Bridge

- 텐서를 다시 NumPy로 변환해야 할 경우가 있으며, `.numpy()` 메서드를 사용하여 변환 가능하다.

In [19]:
X = torch.rand(3,3)
print(type(X))

<class 'torch.Tensor'>


In [20]:
# TODO 8:
X_np = X.numpy()

In [21]:
print(type(X_np))

<class 'numpy.ndarray'>


## 1.6. GPU and CUDA Tensors

- GPU(Graphical Processing Unit) 대량의 데이터를 병렬로 처리하는 데 매우 빠른 성능을 보여준다.
- 신경망은 보통 더 작은 계산들로 나뉘어질 수 있어, 이를 GPU에서 병렬로 처리 가능하다.
- PyTorch는 CUDA을 통한 GPU 연산 구현을 지원한다.
  - CUDA: GPU에서 병렬 연산을 수행할 수 있도록 하는 NVIDIA가 개발한 툴킷

In [22]:
# TODO 9:
torch.cuda.is_available()

False

- GPU가 있는 머신에서 학습할 때, PyTorch에 GPU를 사용하겠다고 명시해야 하며, PyTorch 코드에서는 아래와 같은 코드를 상단에 작성한다.

In [23]:
# GPU를 사용할 수 있으면 `"cuda"`를, 그렇지 않으면 `"cpu"`를 사용하도록 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


- 텐서를 생성할 때 `device` 파라미터를 사용하여 CPU 또는 GPU를 사용할지 지정할 수 있다.
- 텐서를 CPU와 GPU 간에 이동하려면 `.to()` 메서드를 사용할 수 있다.

In [24]:
X = torch.rand(2, 2, 2, device=device) # 혹은 X = X.to(device)
print(X.device)

cpu


In [25]:
# 텐서를 cpu로 이동하는 예시:
X.to('cpu')

tensor([[[0.6442, 0.3104],
         [0.0185, 0.7192]],

        [[0.4090, 0.8499],
         [0.7907, 0.4297]]])

# 2. PyTorch 신경망 기본

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/nn-6.png">

## 2.1. 선형 회귀 구현으로 알아보는 PyTorch

선형 회귀를 모방하는 가장 간단한 모델을 구현하여 PyTorch의 작동방식을 익힌다.


In [26]:
# 간단한 회귀 데이터셋 생성:
X, y = make_regression(n_samples=500, n_features=1, random_state=0, noise=10.0)
plot_regression(X, y)

In [27]:
# sklearn을 사용하여 이 데이터에 간단한 선형 회귀를 적용
sk_model = LinearRegression().fit(X, y)
plot_regression(X, y, sk_model.predict(X))

In [28]:
# 훈련된 선형 회귀 모델의 weight 출력:
print(f"w_0: {sk_model.intercept_:.2f} (bias/intercept)")
print(f"w_1: {sk_model.coef_[0]:.2f}")

w_0: -0.77 (bias/intercept)
w_1: 45.50


선형 회귀 모델의 등식 형태:

$$\hat{y}=-0.77 + 45.50X$$

행렬 형태:

$$\begin{bmatrix} \hat{y_1} \\ \hat{y_2} \\ \vdots \\ \hat{y_n} \end{bmatrix}=\begin{bmatrix} 1 & x_1 \\ 1 & x_2 \\ \vdots & \vdots \\ 1 & x_n \end{bmatrix} \begin{bmatrix} -0.77 \\ 45.55 \end{bmatrix}$$

그래프 형태:

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/nn-1.png">

## 2.2. PyTorch 신경망으로 구현하는 선형 회귀

대부분의 PyTorch 신경망 모델은 `torch.nn.Module`을 상속받아 작성하게 된다.

PyTorch로 `linearRegression` 모델을 구현하는 형태를 살펴본다.

In [29]:
class linearRegression(nn.Module):  # nn.Module 상속
    def __init__(self, input_size, output_size):
        super().__init__()      # torch.nn.Module에서 모든 것을 상속받도록 함

        # 단순 linear 레이어: wX + b
        self.linear = nn.Linear(input_size, output_size)

    def forward(self, x):
        out = self.linear(x)
        return out

위 코드의 설명이다.:

```python
class linearRegression(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
```

- 여기서는 `linearRegression`이라는 클래스를 생성하고 `nn.Module`의 메서드와 속성을 상속받는다.
- `help(linearRegression)`을 통해 `nn.Module`에서 상속받은 모든 내용을 확인할 수 있다.

```python
        self.linear = nn.Linear(input_size, output_size)
```

- 여기서는 Linear 레이어를 정의한다.
- 이는 단순히 $wX+b$를 의미하며, 모델의 weight가 input feature에 곱해지고 bias가 더해지는 과정을 나타낸다.

```python
    def forward(self, x):
        out = self.linear(x)
        return out
```

- `nn.Module`로 생성된 PyTorch 모델은 반드시 `forward()` 메서드를 포함해야 한다.
- 이 메서드는 입력 데이터 `x`를 받아 정의된 연산을 통과시킨다.
- 위 코드에서는 `x`를 linear 레이어에 전달하고 출력 `out`을 반환한다.

In [30]:
# TODO 10:
model = linearRegression(input_size=1, output_size=1).to(device)

In [31]:
# 모델 구조 출력:
print(model)

linearRegression(
  (linear): Linear(in_features=1, out_features=1, bias=True)
)


In [32]:
# 구체적인 모델 디테일 출력:
summary(model, (1,));

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                    [-1, 1]               2
Total params: 2
Trainable params: 2
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.00
Estimated Total Size (MB): 0.00
----------------------------------------------------------------


- 선형 회귀 모델의 `Param #`: 하나는 weight(`w1`), 다른 하나는 bias(`w0`)  
- 이들은 우리가 모델을 생성할 때 PyTorch에서 임의로 초기화한 값들이다.
- 이 값들은 `model.state_dict()`를 사용하여 확인할 수 있다.

In [33]:
model.state_dict()

OrderedDict([('linear.weight', tensor([[0.8482]])),
             ('linear.bias', tensor([-0.6387]))])

생성한 `x`와 `y` 데이터는 현재 NumPy `ndarray`타입이라 PyTorch `tensor`로 변환해야 한다.:

In [34]:
X_t = torch.tensor(X, dtype=torch.float32, device=device)
y_t = torch.tensor(y, dtype=torch.float32, device=device)

In [35]:
# 임의로 초기화된 weight으로 y_pred 계산 예시:
y_p = model(X_t[0]).item()
print(f"Predicted: {y_p:.2f}")
print(f"   Actual: {y[0]:.2f}")

Predicted: -0.11
   Actual: 31.08


모델이 아직 학습되지 않았기 때문에 예측이 정확하지 않다. 모델을 학습시키기 위해서는 다음이 필요하다.:

1. Loss Function
   - 예측이 얼마나 좋은지/나쁜지를 알려주는 기준. PyTorch에서는 "`criterion`"이라고 불림.  
   - 회귀 모델이기에 평균 제곱 오차(mean squared error)를 사용하며, PyTorch에서는 `torch.nn.MSELoss()`로 작성된다.

2. 최적화 알고리즘
   - 모델의 매개변수를 최적화하는 알고리즘
   - 여기서는 SGD(Stochastic Gradient Descent)을 사용하며, `torch.optim.SGD()`로 작성된다.

In [36]:
LEARNING_RATE = 0.1
# loss function
criterion = nn.MSELoss()
# optimization algorithm is SGD
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)

훈련을 시작하기 전에 데이터를 배치 처리하는 데 도움을 줄 "데이터 로더"를 생성할 것이다.요청 시 데이터를 제공하는 생성기라고 생각하면 된다.

<img src="https://api.wandb.ai/files/srishti-gureja-wandb/images/views/2431735/8ebab91c.png">

`BATCH_SIZE = 50`로 설정한다. 데이터가 500개이므로 10개의 배치가 생성될 것이다.

In [37]:
BATCH_SIZE = 50
dataset = TensorDataset(X_t, y_t)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

In [38]:
# 10개의 batch 확인:
print(f"Total number of batches: {len(dataloader)}")

Total number of batches: 10


In [39]:
# 다음 코드로 배치 하나의 디멘션 확인:
XX, yy = next(iter(dataloader))
print(f"Shape of feature data (X) in batch: {XX.shape}")
print(f"Shape of response data (y) in batch: {yy.shape}")

Shape of feature data (X) in batch: torch.Size([50, 1])
Shape of response data (y) in batch: torch.Size([50])


In [40]:
# TODO 11:
def trainer(model, criterion, optimizer, dataloader, epochs=5, verbose=True):

    for epoch in range(epochs):
        losses = 0
        for X, y in dataloader:
            optimizer.zero_grad()       # 모델 파라미터에 대한 gradient 초기화
            y_hat = model(X).flatten()   # Forward pass
            loss = criterion(y_hat, y)  # loss 계산
            loss.backward()             # 모델 파라미터에 대한 gradient 계산
            optimizer.step()            # 모델 parameter 업데이트
            losses += loss.item()       # batch loss을 전체 loss에 추가
        if verbose: print(f"epoch: {epoch + 1}, loss: {losses / len(dataloader):.4f}")

trainer(model, criterion, optimizer, dataloader, epochs=5, verbose=True)

epoch: 1, loss: 642.7272
epoch: 2, loss: 99.9036
epoch: 3, loss: 93.9393
epoch: 4, loss: 93.3983
epoch: 5, loss: 93.7733


- `.flatten()`: 다차원 텐서를 1차원 텐서로 변환

In [41]:
trainer(model, criterion, optimizer, dataloader, epochs=5, verbose=True)

epoch: 1, loss: 93.7024
epoch: 2, loss: 93.6954
epoch: 3, loss: 93.3688
epoch: 4, loss: 93.7643
epoch: 5, loss: 93.6020


In [42]:
# 훈련된 모델 파라미터 확인:
model.state_dict()

OrderedDict([('linear.weight', tensor([[45.7813]])),
             ('linear.bias', tensor([-1.0001]))])

In [43]:
# scikit-learn 모델과 비교:
pd.DataFrame({"w0": [sk_model.intercept_, model.state_dict()['linear.bias'].item()],
              "w1": [sk_model.coef_[0], model.state_dict()['linear.weight'].item()]},
             index=['sklearn', 'pytorch']).round(2)

Unnamed: 0,w0,w1
sklearn,-0.77,45.5
pytorch,-1.0,45.78


- epoch을 늘리거나 learning rate을 조정하여 더 좋은 결과를 얻어볼 수 있다.
- `trainer()`을 다시 실행하면 모델은 현재 파라미터에서부터 시작하여 다시 학습을 시작한다.
- 따라서 처음부터 다시 훈련을 하려면 model이나 optimizer을 재선언하여 초기화시킬 수 있다.

In [44]:
trainer(model, criterion, optimizer, dataloader, epochs=5, verbose=True)

epoch: 1, loss: 93.5794
epoch: 2, loss: 93.8167
epoch: 3, loss: 93.6106
epoch: 4, loss: 93.8083
epoch: 5, loss: 93.2512


## 2.3. 신경망으로 구현하는 다중 선형 회귀

feature이 3개로 늘어난 다중 선형 회귀 모델을 구현해본다.

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/nn-3.png">


In [45]:
# 데이터 생성:
X, y = make_regression(n_samples=500, n_features=3, random_state=0, noise=10.0)
X_t = torch.tensor(X, dtype=torch.float32, device=device)
y_t = torch.tensor(y, dtype=torch.float32, device=device)

# dataloader 생성
dataset = TensorDataset(X_t, y_t)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

In [46]:
# TODO 12:
model = linearRegression(input_size=3, output_size=1).to(device)

In [47]:
summary(model, (3,));

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                    [-1, 1]               4
Total params: 4
Trainable params: 4
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.00
Estimated Total Size (MB): 0.00
----------------------------------------------------------------


모델을 훈련시키고 sklearn의 `LinearRegression()`과 비교:

In [48]:
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)
trainer(model, criterion, optimizer, dataloader, epochs=5, verbose=True)

epoch: 1, loss: 1008.3516
epoch: 2, loss: 111.1169
epoch: 3, loss: 101.7081
epoch: 4, loss: 101.8423
epoch: 5, loss: 101.7846


In [49]:
sk_model = LinearRegression().fit(X, y)
pd.DataFrame({"w0": [sk_model.intercept_, model.state_dict()['linear.bias'].item()],
              "w1": [sk_model.coef_[0], model.state_dict()['linear.weight'][0, 0].item()],
              "w2": [sk_model.coef_[1], model.state_dict()['linear.weight'][0, 1].item()],
              "w3": [sk_model.coef_[2], model.state_dict()['linear.weight'][0, 2].item()]},
             index=['sklearn', 'pytorch']).round(2)

Unnamed: 0,w0,w1,w2,w3
sklearn,0.43,0.62,55.99,11.14
pytorch,0.04,0.49,55.86,11.24


## 2.4. 신경망으로 구현하는 비선형 회귀


In [50]:
# 데이터셋 생성
np.random.seed(2020)
X = np.sort(np.random.randn(500))
y = X ** 2 + 15 * np.sin(X) **3
X_t = torch.tensor(X[:, None], dtype=torch.float32).to(device)
y_t = torch.tensor(y, dtype=torch.float32).to(device)

# dataloader 생성
dataset = TensorDataset(X_t, y_t)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
plot_regression(X, y, y_range=[-25, 25], dy=5)

이 데이터는 명백히 비선형적이므로, 네트워크에 비선형성(non-linearity)을 도입해야 한다. 이는 "활성화 함수"(activation function)을 통해 가능하다.

$$S(X)=\frac{1}{1+e^{-x}}$$

- Sigmoid 함수는 `x`를 비선형적으로 0과 1 사이의 값으로 매핑한다.

In [51]:
# TODO 13:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [52]:
xs = np.linspace(-15, 15, 100)
plot_regression(xs, [0], sigmoid(xs), x_range=[-5, 5], y_range=[0, 1], dy=0.2)

- 이제 다음 그림의 구조를 가진 모델을 구현해본다.

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/nn-5.png">

- Hidden layer의 각 노드 값이 "활성화 함수"에 의해 변환되어, 모델에 비선형 요소를 도입하게 된다.
- 위 모델을 PyTorch에서 생성하는 두 가지 주요 방법이 있다.

In [53]:
# `forward()` 메서드가 hidden layer 이후에 `nn.Sigmoid()` 함수를 통해 `x`를 전달하는 방식:
class nonlinRegression(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.hidden = nn.Linear(input_size, hidden_size)
        self.output = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.hidden(x)       # input -> hidden layer
        x = self.sigmoid(x)      # sigmoid activation function in hidden layer
        x = self.output(x)       # hidden -> output layer
        return x

In [54]:
# 이번 실습에서는 아래 방식을 사용해본다.
# Constructor에서 nn.Sequential()을 통해 레이어를 결합하는 방식:
class nonlinRegression(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.main = torch.nn.Sequential(         # sequential container
            nn.Linear(input_size, hidden_size),  # input -> hidden layer
            nn.Sigmoid(),                        # sigmoid activation function in hidden layer
            nn.Linear(hidden_size, output_size)  # hidden -> output layer
        )

    def forward(self, x):
        x = self.main(x)
        return x

- 참고: [PyTorch의 AlexNet 구현](https://github.com/pytorch/vision/blob/main/torchvision/models/alexnet.py)

In [55]:
# 총 10개의 모델 파라미터(6개의 weight + 4개의 bias)가 있는지 확인:
model = nonlinRegression(1, 3, 1).to(device)
summary(model, (1,));

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                    [-1, 3]               6
           Sigmoid-2                    [-1, 3]               0
            Linear-3                    [-1, 1]               4
Total params: 10
Trainable params: 10
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.00
Estimated Total Size (MB): 0.00
----------------------------------------------------------------


In [56]:
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.3)
trainer(model, criterion, optimizer, dataloader, epochs=5, verbose=True)

epoch: 1, loss: 37.7588
epoch: 2, loss: 18.6707
epoch: 3, loss: 14.3016
epoch: 4, loss: 7.7737
epoch: 5, loss: 5.5433


In [57]:
# TODO 14:
y_p = model(X_t).cpu().detach().numpy().squeeze()

- `.cpu()`: 텐서를 cpu로 이동
- `.detach()`: PyTorch의 computational graph에서 분리
- `.numpy()`: 텐서를 NumPy의 ndarray로 변환
- `.squeeze()`: 차원의 크기가 1인 축을 제거하기 위해 사용

In [58]:
plot_regression(X, y, y_p, y_range=[-25, 25], dy=5)

In [59]:
# 각 hidden node가 예측에 사용될 비선형 특징을 만들어냄을 확인:
plot_nodes(X, y_p, model.cpu())

## 2.5. 두 개의 hidden layer을 가진 MLP 구현

다음 모델을 구현해본다.

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/nn-6.png">

- 모델 이름: `deepRegression`
- lr: `0.3`
- epoch: `20`

In [60]:
# TODO 15:
class deepRegression(nn.Module):
    def __init__(self, input_size, hidden_size_1, hidden_size_2, output_size):
        super().__init__()
        self.main = nn.Sequential(
            nn.Linear(input_size, hidden_size_1),
            nn.Sigmoid(),
            nn.Linear(hidden_size_1, hidden_size_2),
            nn.Sigmoid(),
            nn.Linear(hidden_size_2, output_size)
        )

    def forward(self, x):
        out = self.main(x)
        return out

In [61]:
# TODO 16:
model = deepRegression(1, 5, 3, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.3)
trainer(model, criterion, optimizer, dataloader, epochs=20, verbose=False)

In [62]:
plot_regression(X, y, model(X_t).detach(), y_range=[-25, 25], dy=5)

# 3. 활성화 함수

활성화 함수는 복잡하고 비선형적인 함수들을 모델링할 수 있게 해주는 요소이다. 활성화 함수에는 여러 가지 종류가 있다.
  - [PyTorch Non-linear Activations Documentation](https://pytorch.org/docs/stable/nn.html#non-linear-activations-weighted-sum-nonlinearity)

In [63]:
functions = [torch.nn.Sigmoid, torch.nn.Tanh, torch.nn.Softplus, torch.nn.ReLU, torch.nn.LeakyReLU, torch.nn.SELU]
plot_activations(torch.linspace(-6, 6, 100), functions)

활성화 함수는 비선형적이어야 하며, 보통 단조함수(monotonic)이며 연속적으로 미분 가능한(continuously differentiable) 특성을 가지나 ReLU 에서 볼 수 있듯이 항상 그런 것은 아니다.

- ReLU(Rectified Linear Unit):구현이 간단하고 계산 효율이 높아 훈련 시간을 줄일 수 있다.

- Softplus:ReLU 함수의 smooth appproximation이며, 머신의 출력을 항상 양수로 제한하는 데 사용할 수 있다.

- LeakyReLU: 음수 입력에 대해 아주 작은 non-zero 출력을 생성하여 vanishing gradient을 완화하고 노이즈나 이상치가 많은 데이터에 잘 작동할 수 있다.

활성화 함수는 훈련과정에서 최적화 할 하이퍼파라미터로 간주할 수 있다.

# 4. 신경망 분류 모델

## 4.1. 이진분류

In [64]:
# 이진 분류 데이터 생성
X, y = make_circles(n_samples=300, factor=0.5, noise=0.1, random_state=2020)
X_t = torch.tensor(X, dtype=torch.float32).to(device)
y_t = torch.tensor(y, dtype=torch.float32).to(device)

# dataloader 생성
dataset = TensorDataset(X_t, y_t)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
plot_classification_2d(X, y)

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/nn-7.png">

현재 딥러닝 모델 구현 및 훈련에 많이 사용되는 활성화 함수로 [ReLU](https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html#torch.nn.ReLU)을 사용하고, optimizer로 [Adam](https://pytorch.org/docs/stable/generated/torch.optim.Adam.html#torch.optim.Adam)을 사용한다.

분류 모델 구현이 목표이므로 loss function을 binary cross entropy로 사용한다.

$$f(w) = \sum_{x,y \in D} -y log(\hat{y}) - (1-y)log(1-\hat{y})$$

PyTorch에서 이진 교차 엔트로피 손실 기준은 `torch.nn.BCELoss`이다. 이 공식은 확률값을 입력으로 받기 때문에, 네트워크의 끝에 시그모이드 함수를 추가한다.

In [65]:
class binaryClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.main = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size),
            nn.Sigmoid()
        )

    def forward(self, x):
        out = self.main(x)
        return out

위 방식으로 진행한 뒤 `torch.nn.BCELoss` 손실 함수를 사용하여 학습할 수도 있지만, 시그모이드 함수를 생략하고 `torch.nn.BCEWithLogitsLoss`를 사용한다(시그모이드와 BCELoss를 결합한 형태).

PyTorch `BCEWithLogitsLoss` [다큐멘테이션](https://pytorch.org/docs/stable/generated/torch.nn.BCEWithLogitsLoss.html#torch.nn.BCEWithLogitsLoss) 번역:
> 이 손실 함수는 하나의 클래스에 시그모이드 층과 `BCELoss`를 결합한 형태다. 단순히 시그모이드 함수와 `BCELoss`를 순서대로 사용하는 것보다 수치적으로 더 안정적인데, 이는 연산을 하나의 층으로 결합함으로써 log-sum-exp 트릭을 활용해 수치적 안정성을 높이기 때문이다.

In [66]:
# 다음 구현 방식으로 사용:
# 손실 함수에 이미 포함되어 있으므로 끝에 시그모이드 층이 없음을 확인
class binaryClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.main = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)
        )

    def forward(self, x):
        out = self.main(x)
        return out

이번 실습에서는 위에 작성된 구현 방식으로 사용하며 시그모이드가 손실 함수에 이미 포함되어 있으므로 끝에 시그모이드 층이 없음을 확인한다.

In [67]:
model = binaryClassifier(2, 5, 1).to(device)
criterion = torch.nn.BCEWithLogitsLoss() # loss function
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)  # optimization algorithm
trainer(model, criterion, optimizer, dataloader, epochs=20, verbose=True)

epoch: 1, loss: 0.6875
epoch: 2, loss: 0.6271
epoch: 3, loss: 0.5573
epoch: 4, loss: 0.4732
epoch: 5, loss: 0.3967
epoch: 6, loss: 0.3321
epoch: 7, loss: 0.2774
epoch: 8, loss: 0.2428
epoch: 9, loss: 0.1819
epoch: 10, loss: 0.1655
epoch: 11, loss: 0.1422
epoch: 12, loss: 0.1159
epoch: 13, loss: 0.1078
epoch: 14, loss: 0.0978
epoch: 15, loss: 0.0852
epoch: 16, loss: 0.0768
epoch: 17, loss: 0.0690
epoch: 18, loss: 0.0684
epoch: 19, loss: 0.0653
epoch: 20, loss: 0.0605


In [68]:
plot_classification_2d(X, y, model.cpu())


torch.meshgrid: in an upcoming release, it will be required to pass the indexing argument. (Triggered internally at ../aten/src/ATen/native/TensorShape.cpp:3595.)



위에서 구현한 모델은 단순히 -∞에서 +∞ 사이의 숫자를 출력하고 있을 뿐이며, 모델 내부에서 시그모이드를 적용하지 않는다. 따라서:

- 확률을 얻으려면 출력을 시그모이드 함수에 통과시켜야 한다.
- 클래스(class)를 얻으려면 이 확률에 어떤 임계값(보통 0.5)을 적용하면 된다.

예를 들어, 위 시각화에서 (0,0)은 높은 확률을 가질 것으로 예상되며, (-1,-1)은 낮은 확률을 가질 것이다.

In [69]:
prediction = model(torch.tensor([[0, 0], [-1, -1]], dtype=torch.float32)).detach()
print(prediction)

tensor([[ 10.6038],
        [-11.5148]])


In [70]:
probability = nn.Sigmoid()(prediction)
print(probability)

tensor([[9.9998e-01],
        [9.9810e-06]])


In [71]:
# TODO 17:
classes = np.where(probability > 0.5, 1, 0)
print(classes)

[[1]
 [0]]


## 4.2. 다중 클래스 분류

In [72]:
X, y = make_blobs(n_samples=200, centers=4, center_box=(-1.2, 1.2), cluster_std=[0.15, 0.15, 0.15, 0.15], random_state=12345)
X_t = torch.tensor(X, dtype=torch.float32).to(device)
y_t = torch.tensor(y, dtype=torch.int64).to(device)

# Create dataloader
dataset = TensorDataset(X_t, y_t)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
plot_classification_2d(X, y)

다중 클래스 분류를 위해서는 소프트맥스(softmax) 함수가 필요하다.

$$\sigma(\vec{z})_i=\frac{e^{z_i}}{\sum_{j=1}^{K}e^{z_j}}$$

소프트맥스 함수는 우리가 예측하고자 하는 각 클래스에 대한 확률을 출력하며, 이 확률의 합은 항상 1이 된다.  
`torch.nn.CrossEntropyLoss`는 소프트맥스 함수와 교차 엔트로피 손실을 결합한 손실 함수다.  

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/nn-8.png">

<img src="https://images.contentstack.io/v3/assets/bltac01ee6daa3a1e14/blte5e1674e3883fab3/65ef8ba4039fdd4df8335b7c/img_blog_image1_inline_(2).png?width=1024&disable=upscale&auto=webp" width=500>

위 구조의 모델을 사용해 4개의 클래스를 분류하는 문제를 구현해본다.

In [73]:
class multiClassifier(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.main = torch.nn.Sequential(
            torch.nn.Linear(input_size, hidden_size),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_size, output_size)
        )

    def forward(self, x):
        out = self.main(x)
        return out

- PyTorch의 `CrossEntropyLoss()` [다큐멘테이션](https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html) 확인

In [74]:
model = multiClassifier(2, 5, 4).to(device)
# CE Loss:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.2)

for epoch in range(10):
    losses = 0
    for X_batch, y_batch in dataloader:
        optimizer.zero_grad()
        y_hat = model(X_batch)
        loss = criterion(y_hat, y_batch)
        loss.backward()
        optimizer.step()
        losses += loss.item()
    print(f"epoch: {epoch + 1}, loss: {losses / len(dataloader):.4f}")

epoch: 1, loss: 1.2390
epoch: 2, loss: 0.6612
epoch: 3, loss: 0.2272
epoch: 4, loss: 0.0633
epoch: 5, loss: 0.0179
epoch: 6, loss: 0.0046
epoch: 7, loss: 0.0034
epoch: 8, loss: 0.0010
epoch: 9, loss: 0.0007
epoch: 10, loss: 0.0007


In [75]:
plot_classification_2d(X, y, model.cpu(), transform="Softmax")

모델은 단순히 -∞에서 +∞ 사이의 숫자를 출력할 뿐이다. 따라서:

- 확률을 얻으려면 출력을 소프트맥스 함수에 통과시켜야 한다.
- 클래스를 얻으려면 가장 큰 확률을 선택해야 한다.

예를 들어, 위 그래프에서 (-1,-1)은 클래스 1에 속할 확률이 높을 것이며, (0,0)은 클래스 2에 속할 확률이 가장 높을 것이다.

In [76]:
prediction = model(torch.tensor([[-1, -1], [0, 0]], dtype=torch.float32)).detach()
print(prediction)

tensor([[-12.1817,  17.5944,   5.4841, -20.7453],
        [ -9.5405,  -0.3583,   9.1486,  -6.4244]])


In [77]:
# 각 데이터 포인트에 대해 4개의 예측값(4개의 클래스 각각에 대한 예측)이 출력됨.
probability = nn.Softmax(dim=1)(prediction)
print(probability)

tensor([[1.1706e-13, 9.9999e-01, 5.5028e-06, 2.2350e-17],
        [7.6456e-09, 7.4335e-05, 9.9993e-01, 1.7246e-07]])


In [78]:
# 예측값의 합이 1임을 확인:
probability.sum(dim=1)

tensor([1., 1.])

In [79]:
# TODO 18: argmax()를 사용하여 최대 확률을 가진 클래스 출력:
classes = probability.argmax(dim=1)
print(classes)

tensor([1, 2])


# 5. 미분, 역전파, `Autograd`

최적화 알고리즘은 손실 함수에 대한 모델 파라미터의 gradient를 통해 모델 파라미터를 최적화한다.

$$\nabla \mathscr{L}(\mathbf{w}) = \begin{bmatrix} \frac{\partial \mathscr{L}}{\partial w_1} \\ \frac{\partial \mathscr{L}}{\partial w_2} \\ \vdots \\ \frac{\partial \mathscr{L}}{\partial w_d} \end{bmatrix}$$

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-1.png">

그 네트워크의 출력을 계산하는 식은 아래와 같다. 이는 linear layer와 활성화 함수(sigmoid)가 재귀적으로 결합된 형태이다:  

$$S(x)=\frac{1}{1+e^{-x}}$$

$$\hat{y}=w_3S(w_1x+b_1) + w_4S(w_2x+b_2) + b_3$$

MSE 손실(mean squared error loss)에 대해 모든 모델 파라미터에 대한 gradient를 역전파를 통해 계산하는 방법을 알아본다.

$$\mathscr{L}(\mathbf{w}) = \frac{1}{n}\sum^{n}_{i=1}(y_i-\hat{y_i})^2$$

$$\nabla \mathscr{L}(\mathbf{w}) = \begin{bmatrix} \frac{\partial \mathscr{L}}{\partial w_1} \\ \frac{\partial \mathscr{L}}{\partial w_2} \\ \vdots \\ \frac{\partial \mathscr{L}}{\partial w_d} \end{bmatrix}$$


## 5.1. 역전파

역전파(backpropagation)는 신경망의 매개변수를 훈련하기 위해 필요한 그래디언트를 계산하는 데 사용하는 알고리즘이다. 역전파의 핵심 아이디어는 네트워크를 **더 작은 연산들로 분해**하고, 각각의 연산에 대해 간단하고 코드로 구현 가능한 도함수(Derivative)를 사용하는 것이다. 그런 다음, **연쇄 법칙(chain rule)**을 통해 이러한 작은 연산들을 결합한다.

"역전파"라는 용어는 네트워크의 끝에서 시작하여 **뒤로 거슬러 올라가면서 그래디언트를 전파**한다는 사실에서 유래되었다.  

다음 모델을 바탕으로 역전파를 이해해본다.

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-2.png">

먼저 위 모델의 forward pass을 다음과 같이 분해한다.

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-3.png">

중간 상태를 저장하기 위해 $z_i$ (활성화 함수 적용 전 노드 출력)와 $a_i$ (활성화 함수 적용 후 노드 출력)를 사용한다.

하나의 샘플 데이터 `(x, y)` = `(1, 3)`를 입력하고, 중간 출력값은 **초록색**, 최종 loss은 **빨간색**으로 표시했다.

이 과정은 **forward pass** 단계라고 하며, 데이터를 입력하고 왼쪽에서 오른쪽으로 출력을 계산하는 것이다:  

이제 출력 노드에서 시작하여 해당 노드에 연결된 매개변수에 대한 그래디언트를 계산한다.

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-4.png">

다음 그림과 같이 정리될 수 있다.:

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-5.png">

이러한 결과를 활용해 **연쇄 법칙(chain rule)**을 사용하여 네트워크 초기 부분의 도함수도 쉽게 계산할 수 있다.

아래에서는 $z_1$과 $w_1$에 대해 이를 계산해본다.
복잡해 보일 수 있지만 작은 도함수들을 연쇄 법칙으로 결합한 형태이다.:  

 <img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-6.png">

나머지 모델 파라미터의 gradient 계산은 다음과 같다.:

 <img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-7.png">

요약:

1. 데이터를 네트워크를 통해 **"순전파(forward pass)"** 한다.  
2. 네트워크를 통해 **"역전파(backpropagate)"** 하여 loss를 전파하고 gradient를 계산한다.  

## 5.2. Autograd

`torch.autograd`는 PyTorch의 자동 미분 엔진 패키지([다큐멘테이션](https://pytorch.org/docs/stable/autograd.html))으로, 역전파(backpropagation)를 구현하는 데 도움을 준다.  
즉 `torch.autograd`는 네트워크의 derivative를 자동으로 계산하고 저장해준다.  

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-2.png">

위 모델을 구현해본다.

In [80]:
class network(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.hidden = torch.nn.Linear(input_size, hidden_size)
        self.output = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.hidden(x)
        x = torch.sigmoid(x)
        x = self.output(x)
        return x

In [81]:
# 모델의 인스턴스 생성:
model = network(1, 2, 1).to(device)

# 위 그림과 동일하게 모델 파라미터 수정:
model.state_dict()['hidden.weight'][:] = torch.tensor([[1], [-1]])
model.state_dict()['hidden.bias'][:] = torch.tensor([1, 2])
model.state_dict()['output.weight'][:] = torch.tensor([[1, 2]])
model.state_dict()['output.bias'][:] = torch.tensor([-1])

In [82]:
# 데이터 인스턴스 생성:
x, y = torch.tensor([1.0]).to(device), torch.tensor([3.0]).to(device)

In [83]:
# 출력 노드의 bias에 대한 gradient 확인:
print(model.output.bias.grad)

None


`None`이 출력되는 이유는 모델 레이여 연산을 수행하고 gradient를 계산하는 방법이 구현되어 있으나 실제 연산은 진행되지 않은 상태이다.
이는 loss function을 정의하지 않았고, 손실을 계산하기 위한 forward pass를 수행하지 않았기 때문이다.
따라서 아직 역전파(backpropagation)할 데이터가 없다.

이제 loss function을 정의한다.:

In [84]:
criterion = torch.nn.MSELoss()

이제 모델에 오류를 **"역전파(backpropagate)"**하는 과정을 다음과 같이 구현할 수 있다.:

1. `(x, y)` 데이터를 사용하여 forward pass를 수행하고 `loss`를 계산한다.
2. `loss.backward()`를 호출하여 **"역전파(backpropagation)"**를 수행한다.

In [85]:
loss = criterion(model(x), y)
loss.backward()  # 역전파 실행

출력 노드의 bias $b_3$에 대한 gradient ($\frac{\partial \mathscr{L}}{\partial b_3}$)을 확인한다:

In [86]:
print(model.output.bias.grad)

tensor([-3.3142])


이전 계산과 일치함을 확인할 수 있다.

<img src="https://raw.githubusercontent.com/TomasBeuzen/deep-learning-with-pytorch/56fb575affa33b58c0621eb017a4839db924b566/chapters/img/backprop-8.png">

In [87]:
print("Hidden Layer Gradients")
print("Bias:", model.hidden.bias.grad)
print("Weights:", model.hidden.weight.grad.squeeze())
print()
print("Output Layer Gradients")
print("Bias:", model.output.bias.grad)
print("Weights:", model.output.weight.grad.squeeze())

Hidden Layer Gradients
Bias: tensor([-0.3480, -1.3032])
Weights: tensor([-0.3480, -1.3032])

Output Layer Gradients
Bias: tensor([-3.3142])
Weights: tensor([-2.9191, -2.4229])


다음 단계는 최적화 알고리즘을 사용해 모델 파라미터를 업데이트하는 것이다.

In [88]:
model.state_dict()

OrderedDict([('hidden.weight',
              tensor([[ 1.],
                      [-1.]])),
             ('hidden.bias', tensor([1., 2.])),
             ('output.weight', tensor([[1., 2.]])),
             ('output.bias', tensor([-1.]))])

가중치 최적화는 다음과 같이 구현된다.:

1. `optimizer`를 정의한다.  
2. `optimizer.step()`을 호출하여 gradient를 기반으로 가중치를 업데이트하도록 요청한다.  

In [89]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
optimizer.step()

In [90]:
# 변경된 weight 확인:
model.state_dict()

OrderedDict([('hidden.weight',
              tensor([[ 1.0348],
                      [-0.8697]])),
             ('hidden.bias', tensor([1.0348, 2.1303])),
             ('output.weight', tensor([[1.2919, 2.2423]])),
             ('output.bias', tensor([-0.6686]))])

**PyTorch는 그래디언트를 사용한 후 자동으로 초기화하지 않는다.**  
따라서 그래디언트는 `loss.backward()`를 호출할 때마다 누적된다.

In [91]:
optimizer.zero_grad()  # <- 다음 셀에서 설명
for _ in range(1, 6):
    loss = criterion(model(x), y)
    loss.backward()
    print(f"b3 gradient after call {_} of loss.backward():", model.hidden.bias.grad)

b3 gradient after call 1 of loss.backward(): tensor([-0.1991, -0.5976])
b3 gradient after call 2 of loss.backward(): tensor([-0.3983, -1.1953])
b3 gradient after call 3 of loss.backward(): tensor([-0.5974, -1.7929])
b3 gradient after call 4 of loss.backward(): tensor([-0.7966, -2.3906])
b3 gradient after call 5 of loss.backward(): tensor([-0.9957, -2.9882])


따라서 매 반복(iteration)마다 PyTorch에 **"그래디언트를 초기화(zero)"** 하도록 `optimizer.zero_grad()`를 사용해야 한다:  

In [92]:
for _ in range(1, 6):
    optimizer.zero_grad()
    loss = criterion(model(x), y)
    loss.backward()
    print(f"b3 gradient after call {_} of loss.backward():", model.hidden.bias.grad)

b3 gradient after call 1 of loss.backward(): tensor([-0.1991, -0.5976])
b3 gradient after call 2 of loss.backward(): tensor([-0.1991, -0.5976])
b3 gradient after call 3 of loss.backward(): tensor([-0.1991, -0.5976])
b3 gradient after call 4 of loss.backward(): tensor([-0.1991, -0.5976])
b3 gradient after call 5 of loss.backward(): tensor([-0.1991, -0.5976])


> Note: 몇몇 경우에는 gradient을 누적시키는 것이 유용하다. 예를 들어 가중치를 업데이트하기 전에 여러 배치(batch)에 대한 gradient을 계산하고 싶을 때가 그렇다. 대부분의 경우, 매 반복(iteration)마다 gradient을 초기화 하는 것이 필요하다.

# 6. 신경망 훈련

In [93]:
def trainer(model, criterion, optimizer, dataloader, epochs=5):
    """Simple training wrapper for PyTorch network."""

    train_loss = []
    for epoch in range(epochs):  # for each epoch
        losses = 0
        for X, y in dataloader:  # for each batch
            optimizer.zero_grad()       # 모델 파라미터에 대한 gradient 초기화
            y_hat = model(X).flatten()   # Forward pass to get output
            loss = criterion(y_hat, y)  # loss 계산
            loss.backward()             # 모델 파라미터에 대한 gradient 계산
            optimizer.step()            # 모델 파라미터 업데이트
            losses += loss.item()       # 이 배치의 loss을 누적 loss에 추가

        # loss = total loss in epoch / number of batches = loss per batch
        train_loss.append(losses / len(dataloader))
    return train_loss

각 epoch에 대해 loss을 계산할 때, 해당 epoch의 각 배치(batch)에서 loss을 합산한 뒤, 총 배치 수로 나누어 epoch 내 배치당 평균 loss을 구함을 확인한다(이 loss은 `running_losses`에 저장된다).

> 배치 수로 나누면 loss 값이 배치 크기(batch size)와 **"독립적(decoupled)"**이 된다. 따라서, 다른 배치 크기로 실험을 실행하더라도 이전 실험과 이번 실험의 손실을 비교할 수 있다.

모델이 제대로 훈련되고 있다면 loss은 시간이 지남에 따라 감소해야 한다. 샘플 데이터를 사용해 이를 시도해본다.

In [94]:
# dataset 생성
torch.manual_seed(0)
X = torch.arange(-3, 3, 0.15).to(device)
y = X ** 2 + X * torch.normal(0, 1, (40,)).to(device)
dataloader = DataLoader(TensorDataset(X[:, None], y), batch_size=1, shuffle=True)
plot_regression(X.cpu(), y.cpu(), y_range=[-1, 10], dy=1)

In [95]:
model = network(1, 3, 1).to(device)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), 0.1)
train_loss = trainer(model, criterion, optimizer, dataloader, epochs=101)
plot_regression(X.cpu(), y.cpu(), model(X[:, None]).cpu().detach(), y_range=[-1, 10], dy=1)

In [96]:
# epoch 진행에 따른 loss 변화:
plot_loss(train_loss)

## 6.1. Validation Loss

지금까지는 **훈련 손실(training loss)**에 집중해 왔지만 모델을 새롭고 **"보지 못한(unseen)"** 데이터에서 잘 작동하는지 성능을 검증해야 한다.
이를 위해 검증 데이터(validation data)가 필요하며 데이터셋을 절반으로 나누어 `trainloader`와 `validloader`를 생성한다:  

In [97]:
# dataset 생성
torch.manual_seed(0)
X_valid = torch.arange(-3.0, 3.0).to(device)
y_valid = (X_valid ** 2).to(device)
trainloader = DataLoader(TensorDataset(X, y), batch_size=1, shuffle=True)
validloader = DataLoader(TensorDataset(X_valid, y_valid), batch_size=1, shuffle=True)

각 epoch 이후 검증 배치(validation batch)를 반복(iterate)하며 검증 손실(validation loss)을 기록한다.

In [98]:
def trainer(model, criterion, optimizer, trainloader, validloader, epochs=5):
    """Simple training wrapper for PyTorch network."""

    train_loss = []
    valid_loss = []
    for epoch in range(epochs):  # for each epoch
        train_batch_loss = 0
        valid_batch_loss = 0

        # Training
        model.train()   # 모델을 훈련 모드로 설정(디폴트)
                        # 검증에서는 evaluation mode로 전환

        for X, y in trainloader:
            optimizer.zero_grad()       # 모델 파라미터에 대한 gradient 초기화
            y_hat = model(X).flatten()   # Forward pass to get output
            loss = criterion(y_hat, y)  # loss 계산
            loss.backward()             # 모델 파라미터에 대한 gradient 계산
            optimizer.step()            # 모델 파라미터 업데이트
            train_batch_loss += loss.item()  # 이 배치의 loss을 누적 loss에 추가

        # loss = total loss in epoch / number of batches = loss per batch
        train_loss.append(train_batch_loss / len(trainloader))

        # Validation
        model.eval()  # 모델을 평가모드로 전환한다. 모델에 dropout와 같은 랜덤성을 포함한 레이어가 있을 경우,
                      # 검증 시 이러한 랜덤성을 비활성화하여 정확한 결과를 얻도록 한다.


        # TODO 19:
        with torch.no_grad():  # gradient 계산 비활성화
            for X_valid, y_valid in validloader:
                y_hat = model(X_valid).flatten()   # Forward pass to get output
                loss = criterion(y_hat, y_valid)  # loss 계산
                valid_batch_loss += loss.item()   # 이 배치의 loss을 누적 loss에 추가
        valid_loss.append(valid_batch_loss / len(validloader))
    return train_loss, valid_loss

In [99]:
model = network(1, 6, 1).to(device)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.05)
train_loss, valid_loss = trainer(model, criterion, optimizer, trainloader, validloader, epochs=201)
plot_loss(train_loss, valid_loss)

위 훈련 과정에서 과적합이 발생했음을 알 수 있다. (훈련 데이터의 loss은 감소하지만 검증 데이터에서는 loss가 증가)

## 6.2. Early stopping

오버피팅을 방지할 수 있는 방법의 PyTorch 구현을 이해해본다. 훈련이 진행되면서 훈련 손실은 감소하는 동안 validation loss이 증가하는 것은 오버피팅의 신호이다.  
Validation loss은 epoch마다 증가하거나 감소할 수 있으므로, 일반적으로 "patience"라는 값을 정의한다.  
patience는 validation loss이 연속적으로 증가하는 것을 몇 번의 epoch까지 허용할지를 나타낸다.  
**patience**를 초과하면 훈련을 중단한다.  

In [100]:
def trainer(model, criterion, optimizer, trainloader, validloader, epochs=5, patience=3):
    """Simple training wrapper for PyTorch network."""

    train_loss = []
    valid_loss = []
    for epoch in range(epochs):  # for each epoch
        train_batch_loss = 0
        valid_batch_loss = 0

        # Training
        for X, y in trainloader:
            optimizer.zero_grad()
            y_hat = model(X).flatten()
            loss = criterion(y_hat, y)
            loss.backward()
            optimizer.step()
            train_batch_loss += loss.item()
        train_loss.append(train_batch_loss / len(trainloader))

        # Validation
        with torch.no_grad():
            for X_valid, y_valid in validloader:
                y_hat = model(X_valid).flatten()
                loss = criterion(y_hat, y_valid)
                valid_batch_loss += loss.item()

        valid_loss.append(valid_batch_loss / len(validloader))

        # Early stopping 구현:
        if epoch > 0 and valid_loss[-1] > valid_loss[-2]:
            consec_increases += 1
        else:
            consec_increases = 0
        if consec_increases == patience:
            print(f"Stopped early at epoch {epoch + 1} - val loss increased for {consec_increases} consecutive epochs!")
            break

    return train_loss, valid_loss

In [101]:
model = network(1, 6, 1).to(device)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.05)
train_loss, valid_loss = trainer(model, criterion, optimizer, trainloader, validloader, epochs=201, patience=3)
plot_loss(train_loss, valid_loss)

Stopped early at epoch 37 - val loss increased for 3 consecutive epochs!


# 7. 정규화

정규화(regularization)은 과적합을 방지하는 데 도움을 주며, 딥러닝에서 사용할 수 있는 다양한 기법이 있지만 이번 실습에서는 두 가지만 다룬다.
1. Drop Out
2. L2 정규화

## 7.1. Drop Out

<img src="https://miro.medium.com/v2/resize:fit:1044/1*iWQzxhVlvadk6VAJjsgXgg.png" width=500>

Drop out 기법은 각 반복(iteration)마다 레이어에서 일부 노드를 랜덤하게 선택하고 해당 노드의 출력을 0으로 설정하여 가중치를 업데이트하지 않는다.

In [102]:
# TODO 20:
# 0.5: 노드가 0으로 설정되어 비활성화 될 확률이 50%
dropout_layer = torch.nn.Dropout(p=0.5)

In [103]:
inputs = torch.randn(5, 3)
inputs

tensor([[-0.9962,  0.1853,  1.2287],
        [-0.6026,  1.1602, -0.0921],
        [-2.1831,  1.0611, -1.5108],
        [-0.0180,  0.5990,  1.2808],
        [ 0.7645,  1.8589, -0.8471]])

In [104]:
dropout_layer(inputs)

tensor([[-0.0000,  0.3706,  2.4574],
        [-1.2051,  2.3204, -0.0000],
        [-0.0000,  0.0000, -0.0000],
        [-0.0000,  0.0000,  0.0000],
        [ 1.5289,  0.0000, -1.6941]])

50%의 노드가 0이 되었음을 확인할 수 있다.

## 7.2. L2 정규화

정규화는 손실 함수에 다음과 같은 패널티 항을 추가한다.
<img src='https://miro.medium.com/v2/resize:fit:1100/format:webp/1*ozLs-feHr73kJTfKL8figA.png' width=700>

- $\lambda$은 정규화 파라미터이다.
- L2 정규화는 PyTorch에서 `weight_decay`로 불린다. 최적화 알고리즘의 파라미터 형태로 설정할 수 있다.

In [105]:
# 다음 코드에서 weight_decay은 위 공식에서 λ 파라미터이다.
torch.optim.Adam(model.parameters(), lr=0.1, weight_decay=0.5)

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.1
    maximize: False
    weight_decay: 0.5
)