## Univariate CNN Models
Although traditionally developed for two-dimensional image data, CNNs can be used to model univariate time series forecasting problems.

Univariate time series are datasets comprised of a single series of observations with a temporal ordering and a model is required to learn from the series of past observations to predict the next value in the sequence.

We'll need to do:
1. Data Preparation
2. CNN Model



Data Preparation
Before a univariate series can be modeled, it must be prepared.

The CNN model will learn a function that maps a sequence of past observations as input to an output observation. As such, the sequence of observations must be transformed into multiple examples from which the model can learn.

Consider a given univariate sequence:

In [None]:
[10, 20, 30, 40, 50, 60, 70, 80, 90]

We can divide the sequence into multiple input/output patterns called samples, where three time steps are used as input and one time step is used as output for the one-step prediction that is being learned.



In [None]:
X,				y
10, 20, 30		40
20, 30, 40		50
30, 40, 50		60
...

The split_sequence() function below implements this behavior and will split a given univariate sequence into multiple samples where each sample has a specified number of time steps and the output is a single time step.



In [2]:

# univariate data preparation
from numpy import array
 
# split a univariate sequence into samples
def split_sequence(sequence, n_steps):
	X, y = list(), list()
	for i in range(len(sequence)):
		# find the end of this pattern
		end_ix = i + n_steps
		# check if we are beyond the sequence
		if end_ix > len(sequence)-1:
			break
		# gather input and output parts of the pattern
		seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
		X.append(seq_x)
		y.append(seq_y)
	return array(X), array(y)
 
# define input sequence
raw_seq = [10, 20, 30, 40, 50, 60, 70, 80, 90]
# choose a number of time steps
n_steps = 3
# split into samples
X, y = split_sequence(raw_seq, n_steps)
# summarize the data
for i in range(len(X)):
	print(X[i], y[i])

[10 20 30] 40
[20 30 40] 50
[30 40 50] 60
[40 50 60] 70
[50 60 70] 80
[60 70 80] 90


### CNN Model
A one-dimensional CNN is a CNN model that has a convolutional hidden layer that operates over a 1D sequence. This is followed by perhaps a second convolutional layer in some cases, such as very long input sequences, and then a pooling layer whose job it is to distill the output of the convolutional layer to the most salient elements.

The convolutional and pooling layers are followed by a dense fully connected layer that interprets the features extracted by the convolutional part of the model. A flatten layer is used between the convolutional layers and the dense layer to reduce the feature maps to a single one-dimensional vector.

We can define a 1D CNN Model for univariate time series forecasting as follows.




In [None]:
# define model
model = Sequential()
model.add(Conv1D(filters=64, kernel_size=2, activation='relu', input_shape=(n_steps, n_features)))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(50, activation='relu'))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mse')

Key in the definition is the shape of the input; that is what the model expects as input for each sample in terms of the number of time steps and the number of features.

We are working with a univariate series, so the number of features is one, for one variable.

The number of time steps as input is the number we chose when preparing our dataset as an argument to the split_sequence() function.

The input shape for each sample is specified in the input_shape argument on the definition of the first hidden layer.

We almost always have multiple samples, therefore, the model will expect the input component of training data to have the dimensions or shape:

[samples, timesteps, features]

Our split_sequence() function in the previous section outputs the X with the shape [samples, timesteps], so we can easily reshape it to have an additional dimension for the one feature.



The CNN does not actually view the data as having time steps, instead, it is treated as a sequence over which convolutional read operations can be performed, like a one-dimensional image.

In this example, we define a convolutional layer with 64 filter maps and a kernel size of 2. This is followed by a max pooling layer and a dense layer to interpret the input feature. An output layer is specified that predicts a single numerical value.

The model is fit using the efficient Adam version of stochastic gradient descent and optimized using the mean squared error, or ‘mse‘, loss function.

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Split a univariate sequence into samples
def split_sequence(sequence, n_steps):
    X, y = list(), list()
    for i in range(len(sequence)):
        end_ix = i + n_steps
        if end_ix > len(sequence) - 1:
            break
        seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
        X.append(seq_x)
        y.append(seq_y)
    return np.array(X), np.array(y)

# Define input sequence
raw_seq = [10, 20, 30, 40, 50, 60, 70, 80, 90]
n_steps = 3

# Prepare data
X, y = split_sequence(raw_seq, n_steps)
X = X.reshape((X.shape[0], 1, X.shape[1]))  # PyTorch expects (batch, channel, seq_len)
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32).view(-1, 1)

# Define model
class CNN1D(nn.Module):
    def __init__(self):
        super(CNN1D, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=2)
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(64 * ((n_steps - 2 + 1) // 2), 50)
        self.fc2 = nn.Linear(50, 1)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool(x)
        x = self.flatten(x)
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

model = CNN1D()

# Compile model
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Fit model
epochs = 1000
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_tensor)
    loss = criterion(outputs, y_tensor)
    loss.backward()
    optimizer.step()

# Predict
x_input = np.array([70, 80, 90]).reshape((1, 1, n_steps))
x_input_tensor = torch.tensor(x_input, dtype=torch.float32)
model.eval()
with torch.no_grad():
    yhat = model(x_input_tensor)
print(yhat.item())

101.60421752929688


In [15]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# ---- 1. Generate Synthetic Data ----
np.random.seed(42)

years = list(range(2018, 2026))  # Include 2026 for prediction
months = list(range(1, 13))
countries = [f"Country_{i}" for i in range(10)]
items = ["banana", "apple", "orange"]

records = []
for country in countries:
    for item in items:
        for year in years:
            monthly_feature = np.random.rand(12) * 10  # e.g., rainfall or temperature
            yield_value = 5 + 0.1 * monthly_feature.sum() + np.random.normal(0, 1)
            records.append({
                "country": country,
                "item": item,
                "year": year,
                **{f"month_{i+1}": monthly_feature[i] for i in range(12)},
                "yield": yield_value
            })

df = pd.DataFrame(records)

# ---- 2. Prepare Data ----
le_country = LabelEncoder()
le_item = LabelEncoder()

df["country_idx"] = le_country.fit_transform(df["country"])
df["item_idx"] = le_item.fit_transform(df["item"])

# Normalize year as numeric input
df["year_norm"] = (df["year"] - df["year"].min()) / (df["year"].max() - df["year"].min())

# Train/test split (predict 2026 only)
df_train = df[df.year < 2026]
df_test = df[df.year == 2026]

def to_tensor(df_part):
    X_seq = df_part[[f"month_{i}" for i in range(1, 13)]].values
    X_seq = torch.tensor(X_seq, dtype=torch.float32).unsqueeze(1)  # (B, 1, 12)
    country = torch.tensor(df_part["country_idx"].values, dtype=torch.long)
    item = torch.tensor(df_part["item_idx"].values, dtype=torch.long)
    year_norm = torch.tensor(df_part["year_norm"].values, dtype=torch.float32).unsqueeze(1)
    y = torch.tensor(df_part["yield"].values, dtype=torch.float32).unsqueeze(1)
    return X_seq, country, item, year_norm, y

X_train, c_train, i_train, y_train_norm, y_train = to_tensor(df_train)
X_test, c_test, i_test, y_test_norm, y_test = to_tensor(df_test)

# ---- 3. Define Model ----
class YieldCNN(nn.Module):
    def __init__(self, num_countries, num_items):
        super().__init__()
        self.conv1 = nn.Conv1d(1, 64, kernel_size=3)
        self.pool = nn.MaxPool1d(2)
        self.flatten = nn.Flatten()
        self.emb_country = nn.Embedding(num_countries, 8)
        self.emb_item = nn.Embedding(num_items, 8)
        self.fc1 = nn.Linear(64 * 5 + 8 + 8 + 1, 50)
        self.fc2 = nn.Linear(50, 1)

    def forward(self, x_seq, c, i, year_float):
        x = torch.relu(self.conv1(x_seq))  # (B, 64, 10)
        x = self.pool(x)                   # (B, 64, 5)
        x = self.flatten(x)                # (B, 320)
        emb_c = self.emb_country(c)
        emb_i = self.emb_item(i)
        x = torch.cat([x, emb_c, emb_i, year_float], dim=1)
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

# ---- 4. Train Model ----
model = YieldCNN(num_countries=len(le_country.classes_),
                 num_items=len(le_item.classes_))
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

for epoch in range(100):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train, c_train, i_train, y_train_norm)
    loss = criterion(y_pred, y_train)
    loss.backward()
    optimizer.step()
    if epoch % 10 == 0:
        print(f"Epoch {epoch}: Loss = {loss.item():.4f}")

# ---- 5. Predict 2026 ----
model.eval()
with torch.no_grad():
    y_pred_2026 = model(X_test, c_test, i_test, y_test_norm)
    print("\nSample predictions for 2026:")
    print(y_pred_2026[:5].squeeze())
    print("True values:")
    print(y_test[:5].squeeze())


Epoch 0: Loss = 112.1747
Epoch 10: Loss = 1.9400
Epoch 20: Loss = 6.6606
Epoch 30: Loss = 3.1378
Epoch 40: Loss = 1.8552
Epoch 50: Loss = 2.0000
Epoch 60: Loss = 1.7135
Epoch 70: Loss = 1.6185
Epoch 80: Loss = 1.5866
Epoch 90: Loss = 1.5448

Sample predictions for 2026:
tensor([])
True values:
tensor([])
