<a href="https://colab.research.google.com/github/https-deeplearning-ai/tensorflow-1-public/blob/main/C4/W1/ungraded_labs/C4_W1_Lab_1_time_series.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Time Series Plots

This notebook aims to show different terminologies and attributes of a time series by generating and plotting synthetic data.

In [None]:
import matplotlib.pyplot as plt
import numpy as np

## Plot Utilities

 The following code will visualize numpy arrays into a graph using Pyplot's [plot()](https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.plot.html) method.

In [None]:
def plot_series(time, series, format="-", start=0, end=None, label=None):
    """
    Visualizes time series data

    Args:
      time (array of int) - contains the time steps
      series (array of int) - contains the measurements for each time step
      format (string) - line style when plotting the graph
      start (int) - first time step to plot
      end (int) - last time step to plot
      label (list of strings)- tag for the line
    """

    # setup dimensions of the graph figure
    plt.figure(figsize=(10, 6))

    # plot the time series data
    plt.plot(time[start:end], series[start:end], format)

    # label the x-axis
    plt.xlabel("Time")

    # label the y-axis
    plt.ylabel("Value")

    if label:
      plt.legend(fontsize=14, labels=label)

    # overlay a grid on the graph
    plt.grid(True)

    # draw the graph on screen
    plt.show()

In [None]:
def trend(time, slope=0):
    """
    Generates synthetic data that follows a straight line given a slope value.

    Args:
      time (array of int) - contains the time steps
      slope (float) - determines the direction and steepness of the line

    Returns:
      series (array of float) - measurements that follow a straight line
    """

    # compute the linear series given the slope
    series = slope * time

    return series

In [None]:
# Generate time steps. Assume 1 per day for one year (365 days)
time = np.arange(365)

# define the slope
slope = 0.1

# generate measurements with the defined slope
series = trend(time, slope)

# results
plot_series(time, series, label=[f'slope={slope}'])

## Seasonality


In [None]:
def seasonal_pattern(season_time):
    """
    Just an arbitrary pattern, you can change it if you wish

    Args:
      season_time (array of float) - contains the measurements per time step

    Returns:
      data_pattern (array of float) -  contains revised measurement values according
                                  to the defined pattern
    """

    # generate the values using an arbitrary pattern
    data_pattern = np.where(season_time < 0.4,
                    np.cos(season_time * 2 * np.pi),
                    1 / np.exp(3 * season_time))

    return data_pattern

def seasonality(time, period, amplitude=1, phase=0):
    """
    Repeats the same pattern at each period

    Args:
      time (array of int) - contains the time steps
      period (int) - number of time steps before the pattern repeats
      amplitude (int) - peak measured value in a period
      phase (int) - number of time steps to shift the measured values

    Returns:
      data_pattern (array of float) - seasonal data scaled by the defined amplitude
    """

    # Define the measured values per period
    season_time = ((time + phase) % period) / period

    # Generates the seasonal data scaled by the defined amplitude
    data_pattern = amplitude * seasonal_pattern(season_time)

    return data_pattern

In [None]:
# generate time steps
time = np.arange(4 * 365 + 1)

# define the parameters of the seasonal data
period = 365
amplitude = 40

# generate the seasonal data
series = seasonality(time, period=period, amplitude=amplitude)

# plot the results
plot_series(time, series)

In [None]:
# Define seasonal parameters
slope = 0.05
period = 365
amplitude = 40

# Generate the data
series = trend(time, slope) + seasonality(time, period=period, amplitude=amplitude)

# Plot the results
plot_series(time, series)

## Noise

The next cells will show what a noisy signal looks like:

In [None]:
def noise(time, noise_level=1, seed=None):
    """Generates a normally distributed noisy signal

    Args:
      time (array of int) - contains the time steps
      noise_level (float) - scaling factor for the generated signal
      seed (int) - number generator seed for repeatability

    Returns:
      noise (array of float) - the noisy signal

    """

    # initialize the random number generator
    rnd = np.random.RandomState(seed)

    # generate a random number for each time step and scale by the noise level
    noise = rnd.randn(len(time)) * noise_level

    return noise


In [None]:
# define noise level
noise_level = 5

# generate noisy signal
noise_signal = noise(time, noise_level=noise_level, seed=42)

# plot the results
plot_series(time, noise_signal)

In [None]:
# add the noise to the time series
series += noise_signal

# plot the results
plot_series(time, series)

## Autocorrelation



In [None]:
def autocorrelation(time, amplitude, seed=None):
    """
    Generates autocorrelated data

    Args:
      time (array of int) - contains the time steps
      amplitude (float) - scaling factor
      seed (int) - number generator seed for repeatability

    Returns:
      ar (array of float) - autocorrelated data
    """

    # initialize random number generator
    rnd = np.random.RandomState(seed)

    # initialize array of random numbers equal to the length
    # of the given time steps plus 50
    ar = rnd.randn(len(time) + 50)

    # set first 50 elements to a constant
    ar[:50] = 100

    # define scaling factors
    phi1 = 0.5
    phi2 = -0.1

    # autocorrelate element 51 onwards with the measurement at
    # (t-50) and (t-30), where t is the current time step
    for step in range(50, len(time) + 50):
        ar[step] += phi1 * ar[step - 50]
        ar[step] += phi2 * ar[step - 33]

    # get the autocorrelated data and scale with the given amplitude.
    # the first 50 elements of the original array is truncated because
    # those are just constant and not autocorrelated.
    ar = ar[50:] * amplitude

    return ar

In [None]:
# use time steps from previous section and generate autocorrelated data
series = autocorrelation(time, amplitude=10, seed=42)

# plot the first 200 elements to see the pattern more clearly
plot_series(time[:200], series[:200])

In [None]:
def autocorrelation(time, amplitude, seed=None):
    """
    Generates autocorrelated data

    Args:
      time (array of int) - contains the time steps
      amplitude (float) - scaling factor
      seed (int) - number generator seed for repeatability

    Returns:
      ar (array of float) - generated autocorrelated data
    """

    # initialize random number generator
    rnd = np.random.RandomState(seed)

    # initialize array of random numbers equal to the length
    # of the given time steps plus an additional step
    ar = rnd.randn(len(time) + 1)

    # define scaling factor
    phi = 0.8

    # autocorrelate element 11 onwards with the measurement at
    # (t-1), where t is the current time step
    for step in range(1, len(time) + 1):
        ar[step] += phi * ar[step - 1]

    # get the autocorrelated data and scale with the given amplitude.
    ar = ar[1:] * amplitude

    return ar

In [None]:
# use time steps from previous section and generate autocorrelated data
series = autocorrelation(time, amplitude=10, seed=42)

# plot the results
plot_series(time[:200], series[:200])

In [None]:
# with decay

def impulses(time, num_impulses, amplitude=1, seed=None):
    """
    Generates random impulses

    Args:
      time (array of int) - contains the time steps
      num_impulses (int) - number of impulses to generate
      amplitude (float) - scaling factor
      seed (int) - number generator seed for repeatability

    Returns:
      series (array of float) - array containing the impulses
    """

    # initialize random number generator
    rnd = np.random.RandomState(seed)

    # generate random numbers
    impulse_indices = rnd.randint(len(time), size=num_impulses)

    # initialize series
    series = np.zeros(len(time))

    # insert random impulses
    for index in impulse_indices:
        series[index] += rnd.rand() * amplitude

    return series

In [None]:
# generate random impulses
impulses_signal = impulses(time, num_impulses=10, seed=42)

# plot the results
plot_series(time, impulses_signal)

In [None]:
def autocorrelation_impulses(source, phis):
    """
    Generates autocorrelated data from impulses

    Args:
      source (array of float) - contains the time steps with impulses
      phis (dict) - dictionary containing the lag time and decay rates

    Returns:
      ar (array of float) - generated autocorrelated data
    """

    # copy the source
    ar = source.copy()

    # compute new series values based on the lag times and decay rates
    for step, value in enumerate(source):
        for lag, phi in phis.items():
            if step - lag > 0:
              ar[step] += phi * ar[step - lag]

    return ar

In [None]:
# use the impulses from the previous section and generate autocorrelated data
series = autocorrelation_impulses(impulses_signal, {1: 0.99})

# plot the results
plot_series(time, series)

In [None]:
# use the impulses from the previous section and generate autocorrelated data
series = autocorrelation_impulses(impulses_signal, {1: 0.70, 50: 0.2})

# plot the results
plot_series(time, series)

In [None]:
# generate autocorrelated data with an upward trend
series = autocorrelation(time, 10, seed=42) + trend(time, 2)

# plot the results
plot_series(time[:200], series[:200])

In [None]:
# generate autocorrelated data with an upward trend + seasonality
series = autocorrelation(time, 10, seed=42) + seasonality(time, period=50, amplitude=150) + trend(time, 2)

# plot the results
plot_series(time[:200], series[:200])


## Non-stationary Time Series



In [None]:
# generate data with positive trend
series = autocorrelation(time, 10, seed=42) + seasonality(time, period=50, amplitude=150) + trend(time, 2)

# generate data with negative trend
series2 = autocorrelation(time, 5, seed=42) + seasonality(time, period=50, amplitude=2) + trend(time, -1) + 550

# splice the downward trending data into the first one at time step = 200
series[200:] = series2[200:]

# plot the result
plot_series(time[:300], series[:300])