In [2]:
import tensorflow as tf
import numpy as np
import time
import matplotlib.pyplot as plt
import os
import random
import datetime
import pandas as pd
from tensorflow.keras import layers
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, precision_recall_fscore_support
from sklearn.preprocessing import MinMaxScaler, StandardScaler

In [3]:
# Callbacks

def tensorboard_cb(dirpath, model_name):
    return tf.keras.callbacks.TensorBoard(os.path.join(dirpath, 
                                                       model_name, 
                                                       datetime.datetime.now().strftime("%Y%m%d-%H%M%S")))

def checkpoint_cb(dirpath, model_name, save_format=None, save_weights=False):
    return tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(dirpath, model_name + save_format), 
                                              save_best_only=True, 
                                              save_weights_only=save_weights,
                                              monitor="val_loss",
                                              verbose=1)

def early_stopping_cb(patience):
    return tf.keras.callbacks.EarlyStopping(monitor="val_loss", 
                                            restore_best_weights=True, 
                                            patience=patience,
                                            verbose=1)

def reduce_lr_cb(patience, factor):
    return tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", 
                                                patience=patience, 
                                                factor=factor, 
                                                min_lr=1e-7,
                                                verbose=1)

def lr_scheduler_cb(lr_init, lr_div):
    return tf.keras.callbacks.LearningRateScheduler(lambda epoch: lr_init * 10 ** (epoch/lr_div))

In [4]:
def univariate_trailing_window(data, window_size, horizon):
    """
    Creates trailing windows and corresponding horizons.

    Parameters:
    - data: list or array, the input time series data.
    - window_size: int, size of the trailing window.
    - horizon: int, the number of steps to look ahead.

    Returns:
    - windows: list of arrays, trailing windows.
    - horizons: list, corresponding horizons.
    """
    windows = []
    horizons = []

    for i in range(len(data) - window_size - horizon + 1):
        window = data[i:i + window_size]
        target = data[i + window_size:i + window_size + horizon]

        windows.append(window)
        horizons.append(target)

    return windows, horizons

In [5]:
def univariate_centered_window(data, window_size, horizon):
    """
    Creates centered windows and corresponding horizons.

    Parameters:
    - data: list or array, the input time series data.
    - window_size: int, size of the centered window.
    - horizon: int, the number of steps to look ahead.

    Returns:
    - windows: list of arrays, centered windows.
    - horizons: list, corresponding horizons.
    """
    windows = []
    horizons = []

    for i in range(len(data) - window_size - horizon + 1):
        center_index = i + window_size // 2
        window = data[center_index - window_size // 2:center_index + window_size // 2]
        target = data[center_index:center_index + horizon]

        windows.append(window)
        horizons.append(target)

    return windows, horizons

In [6]:
def create_trailing_window(data, num_targets, window_size, horizon):
    """
    Generate trailing windows for time series prediction.

    Parameters:
    - data: pandas DataFrame, input time series data with columns representing features and targets
    - window_size: int, size of the trailing window
    - horizon: int, number of steps to forecast into the future
    - num_targets: int, number of target variables

    Returns:
    - X: numpy array, input features (shape: [num_samples - window_size - horizon + 1, window_size, num_features])
    - y: numpy array, target values (shape: [num_samples - window_size - horizon + 1, horizon, num_targets])
    """

    num_samples = len(data)
    num_features = data.shape[1] - num_targets  # Subtract num_targets for target columns

    X, y = [], []

    for i in range(num_samples - window_size - horizon + 1):
        X.append(data.iloc[i:i + window_size, :-num_targets].values)
        y.append(data.iloc[i + window_size:i + window_size + horizon, -num_targets:].values.reshape(-1, num_targets))

    return np.array(X), np.array(y)

In [7]:
def create_centered_window(data, num_targets, window_size, horizon):
    """
    Generate centered windows for time series prediction.

    Parameters:
    - data: pandas DataFrame, input time series data with columns representing features and targets
    - window_size: int, size of the centered window (even number)
    - horizon: int, number of steps to forecast into the future

    Returns:
    - X: numpy array, input features (shape: [num_samples - window_size + 1, window_size, num_features])
    - y: numpy array, target values (shape: [num_samples - window_size + 1, horizon, num_targets])
    """

    num_samples = len(data)
    num_features = data.shape[1] - num_targets  # Subtract num_targets for target columns
    num_targets = data.shape[1] - num_features

    X, y = [], []

    # Ensure the window size is even
    if window_size % 2 != 0:
        raise ValueError("Window size must be an even number for a centered window.")

    half_window = window_size // 2

    for i in range(half_window, num_samples - half_window - horizon + 1):
        start_idx = i - half_window
        end_idx = i + half_window - 1  # Adjust the end index

        X.append(data.iloc[start_idx:end_idx + 1, :-num_targets].values)
        y.append(data.iloc[i:i + horizon, -num_targets:].values.reshape(-1, num_targets))

    return np.array(X), np.array(y)

In [8]:
def scale_2d_3d_data(data, scaler=None, norm=False, stand=False):
    """
    Scale 2D or 3D data using Min-Max scaling or standardization.

    Parameters:
    - data: numpy array, input data (2D or 3D)
    - scaler: object, pre-defined scaler
    - norm: bool, whether to perform Min-Max scaling
    - stand: bool, whether to perform standardization

    Returns:
    - If norm=True: Tuple of (scaler, scaled_data)
    - If stand=True: Tuple of (scaler, standardized_data)
    - If neither norm nor stand is True: None
    """

    original_shape = data.shape if len(data.shape) == 3 else None

    # Check if the input data is 3D, and reshape to 2D if necessary
    if original_shape:
        data = data.reshape((original_shape[0] * original_shape[1], original_shape[2]))

    scaler = scaler
    scaled_data = None

    if norm:
        if scaler is None:
            # Min-Max scaling
            scaler = MinMaxScaler().fit(data)
        scaled_data = scaler.transform(data)

    elif stand:
        if scaler is None:
            # Standardization
            scaler = StandardScaler().fit(data)
        scaled_data = scaler.transform(data)

    # Reshape the scaled data back to 3D if the input was originally 3D
    if original_shape and scaled_data is not None:
        scaled_data = scaled_data.reshape(original_shape)

    return scaler, scaled_data

In [9]:
def create_performant_features_targets(features, targets, batch_size=32, pre=False, map_func=None, shuffle=False, shuffle_size=None):
    """
    Create a performant dataset for escalating efficiency during training/evaluating a model.

    Parameters:
    - features: numpy array, input features
    - targets: numpy array, target values
    - batch_size: int, batch size for the dataset
    - pre: bool, whether to apply a preprocessing function
    - map_func: function, preprocessing function to be applied if pre=True
    - shuffle: bool, whether to shuffle the dataset
    - shuffle_size: int, size of the shuffle buffer

    Returns:
    - tf.data.Dataset: TensorFlow dataset
    """

    features_tensor = tf.data.Dataset.from_tensor_slices(tf.cast(features, dtype=tf.float32))
    targets_tensor = tf.data.Dataset.from_tensor_slices(tf.cast(targets, dtype=tf.float32))
    dataset_tensor = tf.data.Dataset.zip((features_tensor, targets_tensor))

    if pre:
        dataset_tensor = dataset_tensor.map(map_func=map_func, num_parallel_calls=tf.data.AUTOTUNE)

    if shuffle:
        dataset_tensor = dataset_tensor.shuffle(shuffle_size)

    dataset_tensor = dataset_tensor.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    return dataset_tensor

In [10]:
def create_performant_dataset(dataset, batch_size=32, pre=False, map_func=None, shuffle=False, shuffle_size=None):
    """
    Create a performant dataset with optional preprocessing and shuffling.

    Parameters:
    - dataset: tf.data.Dataset, input dataset
    - batch_size: int, batch size for the dataset
    - pre: bool, whether to apply a preprocessing function
    - map_func: function, preprocessing function to be applied if pre=True
    - shuffle: bool, whether to shuffle the dataset
    - shuffle_size: int, size of the shuffle buffer

    Returns:
    - tf.data.Dataset: TensorFlow dataset
    """

    if pre:
        dataset = dataset.map(map_func=map_func, num_parallel_calls=tf.data.AUTOTUNE)

    if shuffle:
        dataset = dataset.shuffle(shuffle_size)

    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    return dataset

In [None]:
sentence_len = [len(sentence.split()) for sentence in train_sentences]
mean_sentence_len = np.mean(sentence_len)

max_vocab_length = 10000
max_length = int(np.percentile(sentence_len, 95))

text_vectorizer = TextVectorization(max_tokens=max_vocab_length,
                                    output_mode="int",
                                    output_sequence_length=max_length)

# text_vectorizer.adapt(train_sentences)

text_embedding = Embedding(input_dim=len(text_vectorizer.get_vocabulary()),
                           output_dim=128,
                           input_length=max_length,
                           mask_zero=True)

In [None]:
datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    rescale=1./255,          # Rescale pixel values to [0, 1]
    rotation_range=20,       # Random rotation up to 20 degrees
    width_shift_range=0.2,   # Randomly shift images horizontally by up to 20% of the width
    height_shift_range=0.2,  # Randomly shift images vertically by up to 20% of the height
    shear_range=0.2,         # Shear intensity (shear angle in radians)
    zoom_range=0.2,          # Random zoom up to 20%
    horizontal_flip=True,    # Randomly flip images horizontally
    fill_mode='nearest'      # Strategy for filling in newly created pixels after rotation or shifting
)

In [None]:
batch_size = len(your_image_array)
augmented_images_batch = datagen.flow(your_image_array, batch_size=batch_size, shuffle=False)
augmented_images = augmented_images_batch.next()

In [9]:
def train_aug_img(dirpath, img_size, class_type, batch_size=32):
    return datagen.flow_from_directory(
        dirpath,
        target_size=(img_size, img_size),   # Target size of the images
        batch_size=batch_size,              # Batch size
        class_mode=class_type               # Type of classification task (binary, categorical)
    )

In [10]:
def img_augmentation_layer(img_size):
    return tf.keras.Sequential([
        tf.keras.layers.RandomFlip("horizontal"),
        tf.keras.layers.RandomHeight(0.2),
        tf.keras.layers.RandomWidth(0.2),
        tf.keras.layers.RandomRotation(0.2),
        tf.keras.layers.RandomZoom(0.2),
        tf.keras.layers.Resizing(img_size, img_size)
    ], name="img_augmentation_layer")

In [11]:
def take_image_dataset_from_directory(filepath, label_mode, img_size, batch_size, shuffle=False):
    img_dataset = tf.keras.preprocessing.image_dataset_from_directory(filepath,
                                                                      label_mode=label_mode,
                                                                      batch_size=batch_size,
                                                                      image_size=(img_size, img_size),
                                                                      shuffle=shuffle)

    return img_dataset

In [11]:
def evaluate_ts_rg_model(actual_values, predicted_values):
    """
    Evaluate a time series/regression model for forecasting population.

    Parameters:
    - actual_values: 1D array or list of actual population values.
    - predicted_values: 1D array or list of predicted population values.

    Returns:
    - evaluation_results: Dictionary containing evaluation metrics.
    """
    actual_values = np.array(actual_values)
    predicted_values = np.array(predicted_values)

    # Mean Absolute Error (MAE)
    mae = mean_absolute_error(actual_values, predicted_values)

    # Mean Squared Error (MSE)
    mse = mean_squared_error(actual_values, predicted_values)

    # Root Mean Squared Error (RMSE)
    rmse = np.sqrt(mse)

    # Mean Absolute Percentage Error (MAPE)
    mask = actual_values != 0  # Avoid division by zero
    mape = np.mean(np.abs((actual_values - predicted_values) / actual_values)[mask]) * 100

    # R-squared (R2)
    r2 = r2_score(actual_values, predicted_values)

    evaluation_results = {
        'MAE': mae,
        'MSE': mse,
        'RMSE': rmse,
        'MAPE': mape,
        'R2': r2
    }

    return evaluation_results

In [12]:
def evaluate_classification_model(y_true, y_preds):
    """
    Evaluate a classification model for binary/multi-class classification.

    Parameters:
    - y_true: 1D array or list of actual values.
    - y_preds: 1D array or list of predicted values.

    Returns:
    - evaluation_results: Dictionary containing evaluation metrics.
    """
    acc = accuracy_score(y_true, y_preds)
    pre, rec, f1, _ = precision_recall_fscore_support(y_true, y_preds, average="weighted", zero_division=1)
    return {"acc": acc,
            "pre": pre,
            "rec": rec,
            "f1": f1}

In [None]:
offset = np.abs(48-len(*preds*))
offset

In [13]:
def plot_ts_rg_preds_interval(y_true, y_preds, offset, figsize=(10, 7)):
    """
    Plot a graph with a defined offset to visualize the 95% confidence of the model's predictions versus actual.
    
    Parameters:
    - y_true: 1D array or list of actual values.
    - y_preds: 1D array or list of predicted values.
    - offset: The duration to visualize.
    """
    residuals = y_true - y_preds
    standard_errors = np.std(residuals)
    margin_of_errors = 1.96 * standard_errors
    upper = y_preds + margin_of_errors
    lower = y_preds - margin_of_errors

    plt.figure(figsize=figsize)
    plt.plot(y_true[offset:], label="Actual", color="green")
    plt.plot(y_preds[offset:], label="Preds", color="blue")
    plt.fill_between(range(len(y_true[offset:])), lower[offset:], upper[offset:], color="lightgrey", label="Preds Interval")
    plt.grid(True)
    plt.legend(fontsize=14)
    plt.xlabel("Duration")
    plt.ylabel("Targets")

In [14]:
def pred_timer(model, samples):
    """
    Times how long a model takes to make predictions on samples.

    Parameters:
    - model: tf.keras.Model, the trained model
    - samples: numpy array, input samples for prediction

    Returns:
    - tuple: Total time taken, time per prediction
    """

    start_time = time.perf_counter()
    model.predict(samples)
    end_time = time.perf_counter()
    
    total_time = end_time - start_time
    time_per_pred = total_time / len(samples) if len(samples) > 0 else np.nan
    
    return total_time, time_per_pred