**Running this notebook consumes multiple hours and excessive RAM. <br> As a result, file references and function calls are commented out.
<br> <br>
The code is provided for clarity and understanding.** <br>

## Import Libraries

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%%capture
!pip install mplfinance
!pip install opencv-python-headless
!pip install pyts
!pip install tqdm
import pickle
import mplfinance as mpf
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import cv2
from google.colab.patches import cv2_imshow
from io import BytesIO
from pyts.image import MarkovTransitionField
from pyts.image import GramianAngularField
from pyts.image import RecurrencePlot
from scipy.signal import spectrogram
import pywt
import gc
import time
from skimage.measure import block_reduce
from tqdm import tqdm

import warnings

warnings.filterwarnings("ignore", message="Some quantiles are equal.")

## Load Time Series (RAW)

This code segment loads the time series windows and labels generated in the previous notebook **Thesis_Data**.

In [None]:
# Define the file path
# file_path = "/content/drive/MyDrive/20230424_windows_and_labels.pkl"

# Load the variables
# with open(file_path, 'rb') as f:
#    windows, labels, max_years = pickle.load(f)

## Encode Time Series (RAW) as Images

The following code segments use the previously loaded time series data and encode them as images, <br>  applying the CND, MTF and GAF methods defined in the thesis paper.

# Candlestick Charts (CND)

In [None]:
# Used to visualize image encodigs
def display_arrays(arrays):
    for i, array in enumerate(arrays):
        cv2_imshow(array)

        # Add a separator line between images
        print('\n' + '-' * 15 + '\n')

In [None]:
def custom_greyscale_conversion(img):
    red_channel = img[:, :, 2]
    green_channel = img[:, :, 1]
    blue_channel = img[:, :, 0]

    # Convert the image to greyscale using OpenCV's built-in function
    grey_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # Create masks to extract red and green candles
    red_mask = (red_channel > green_channel) & (blue_channel < red_channel)
    green_mask = (green_channel > red_channel) & (blue_channel < green_channel)

    # Create a custom greyscale image by emphasizing the difference between red and green channels
    custom_grey_img = grey_img.copy()
    custom_grey_img[red_mask] = grey_img[red_mask] * 1.5
    custom_grey_img[green_mask] = grey_img[green_mask] * 0.5

    # Clip the custom greyscale image to the range [0, 255]
    custom_grey_img = np.clip(custom_grey_img, 0, 255)

    return custom_grey_img.astype(np.uint8)

def max_pooling(img, pool_size):
    return block_reduce(img, block_size=(pool_size, pool_size), func=np.max)

def generate_candlestick_array(window):
    target_size = (20, 20)
    pool_size = 2

    num_rows = window.shape[0]
    temp_index = pd.date_range(start='2000-01-01', periods=num_rows, freq='D')
    temp_window = window.copy()
    temp_window.index = temp_index

    fig, ax = mpf.plot(temp_window, type='candle', style='charles', returnfig=True, axisoff=True)

    buf = BytesIO()
    fig.savefig(buf, format='png', bbox_inches='tight')
    plt.close(fig)

    buf.seek(0)
    img = cv2.imdecode(np.frombuffer(buf.read(), np.uint8), -1)

    grey_img = custom_greyscale_conversion(img)

    pooled_grey_img = max_pooling(grey_img, pool_size)

    resized_grey_img = cv2.resize(pooled_grey_img, target_size, interpolation=cv2.INTER_AREA)

    return resized_grey_img

In [None]:
# For Candlestick image generation
def split_list(lst, chunk_size):
    return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

chunk_size = 30000

# windows_chunks = split_list(windows, chunk_size)
# labels_chunks = split_list(labels, chunk_size)
# max_years_chunks = split_list(max_years, chunk_size)

In [None]:
candlestick_arrays_chunk_8 = []

for window in tqdm(windows_chunks[7], desc="Processing windows"):
    candlestick_array = generate_candlestick_array(window)
    candlestick_arrays_chunk_8.append(candlestick_array)
    del candlestick_array

## Upload arrays to Google Drive

# Define the file path
# file_path = "/content/drive/MyDrive/20230425_candlestick_arrays_chunk_8.pkl"

# with open(file_path, 'wb') as f:
#   pickle.dump((candlestick_arrays_chunk_8), f)

Processing windows: 100%|██████████| 28678/28678 [2:25:21<00:00,  3.29it/s]


# Markov Transition Fields (MTF)

In [None]:
def generate_mtf_arrays(windows, n_bins=5, size=(20, 20)):
    images = []
    mtf = MarkovTransitionField(n_bins=n_bins)

    for window in windows:
        # Extract the 'Close' column from the window
        close_prices = window['Close'].values

        # Compute the MTF
        mtf_image = mtf.fit_transform([close_prices])[0]

        # Normalize the MTF image to the range [0, 1]
        mtf_image_normalized = (mtf_image - mtf_image.min()) / (mtf_image.max() - mtf_image.min())

        # Convert the normalized MTF image to a grayscale image
        mtf_image_grayscale = (mtf_image_normalized * 255).astype(np.uint8)

        # Resize the MTF image
        mtf_image_resized = cv2.resize(mtf_image_grayscale, size, interpolation=cv2.INTER_LINEAR)

        # Append the image to the list of images
        images.append(mtf_image_resized)

    return images

# mtf_arrays = generate_mtf_arrays(windows)

## Upload arrays to Google Drive

# Define the file path
# file_path = "/content/drive/MyDrive/20230425_mtf_arrays.pkl"

# with open(file_path, 'wb') as f:
#   pickle.dump((mtf_arrays), f)

# display_arrays(mtf_arrays)

# Gramian Angular Fields (GAF)

In [None]:
def generate_gaf_difference_arrays(windows, method='difference', size=(20, 20)):
    arrays = []
    gaf = GramianAngularField(method=method)

    for window in windows:
        close_prices = window['Close'].values
        gaf_image = gaf.fit_transform([close_prices])[0]
        gaf_image_normalized = (gaf_image - gaf_image.min()) / (gaf_image.max() - gaf_image.min())

        # Convert the normalized GAF image to a greyscale image
        gaf_image_grayscale = (gaf_image_normalized * 255).astype(np.uint8)

        # Resize the greyscale GAF image
        gaf_image_resized = cv2.resize(gaf_image_grayscale, size, interpolation=cv2.INTER_LINEAR)

        arrays.append(gaf_image_resized)  # Append the resized greyscale array to the list of arrays

    return arrays

# gaf_arrays = generate_gaf_difference_arrays(windows)

## Upload arrays to Google Drive

# Define the file path
# file_path = "/content/drive/MyDrive/20230425_gaf_arrays.pkl"

# with open(file_path, 'wb') as f:
#   pickle.dump((gaf_arrays), f)

# display_arrays(gaf_arrays)