In [75]:
import numpy as np
from numba import njit, jit
from itertools import product
from multiprocessing import Pool

import time


class Point:
    def __init__(self, real_value, predictions_set, predicted_value, is_virgin, is_completed):
        self.real_value = real_value
        self.predictions_set = predictions_set
        self.predicted_value = predicted_value
        self.is_virgin = is_virgin
        self.is_completed = is_completed

    def info(self):
        print('{ ', self.predictions_set.size, self.real_value, self.predicted_value, '}', end=' ')


def normalize(arr):
    return (arr - arr.min()) / (arr.max() - arr.min())

LORENZ = (np.genfromtxt("lorenz.txt"))  # последние k элементов ряда - тестовая выборка
# train = (np.genfromtxt("lorenz.txt", skip_footer=90000))  # ряд без последних k элементов - тренировочная выборка

TEST_BEGIN = 99900
TEST_END = 100000

CLAWS_MAX_DIST = 9
NUMBER_OF_CLAWS = 4

TRAIN_GAP = 1000
TEST_GAP = 100

MAX_NORM_DELTA = 0.015  # было 0.015
MAX_ABS_ERROR = 0.05  # изначально было 0.05

S = 34  # количество предшедствующих точек ряда, необходимое для прогнозирования точки

K_MAX = 26

In [76]:
def reforecast(points, first_not_completed):
    for template_number in range(len(templates_by_distances)):
        x, y, z = templates_by_distances[template_number]
        for middle_point in range(first_not_completed, len(points)):  # middle_point - это индекс в points
            if middle_point + z + 1 >= len(points) or points[middle_point].is_virgin or points[middle_point + z + 1].is_completed:
                continue

            left_part = np.array(
                [points[middle_point - y - x - 2].predicted_value,
                 points[middle_point - y - 1].predicted_value,
                 points[middle_point].predicted_value]
            )

            if np.isnan(np.sum(left_part)):
                # print("template", template_number, "can't be used")
                continue

            for shifted_template in shifts_for_each_template[template_number]:
                if np.linalg.norm(left_part - shifted_template[:3]) <= MAX_NORM_DELTA:
                    points[middle_point + z + 1].predictions_set = np.append(points[middle_point + z + 1].predictions_set, shifted_template[3])
                    points[middle_point + z + 1].is_virgin = False

    for middle_point in range(first_not_completed, len(points)):
        # print("  recalculating point", middle_point, )
        point_obj = points[middle_point]

        if point_obj.predictions_set.size:
            point_obj.predicted_value = sum(point_obj.predictions_set) / len(point_obj.predictions_set)
        else:
            continue

        cur_error = abs(point_obj.real_value - point_obj.predicted_value)

        if np.isnan(point_obj.predicted_value) or (cur_error > MAX_ABS_ERROR and middle_point != len(points) - 1):
            point_obj.predicted_value = np.nan
            # print("%d-th point is unpredictable, error = %f" % (middle_point, cur_error))

        # print("%d-th point is predictable, predicted_value: %f, error = %f" % (middle_point, predicted_value, cur_error))
    # for printed_point_index in range(S, len(points)):
    #     points[printed_point_index].info()
    # print('\n')

    points[first_not_completed].is_completed = True


In [93]:
@njit
def jit_reforecast(points, first_not_completed):
    for template_number in range(len(templates_by_distances)):
        x, y, z = templates_by_distances[template_number]
        for middle_point in range(first_not_completed, len(points)):  # middle_point - это индекс в points
            if middle_point + z + 1 >= len(points) or points[middle_point].is_virgin or points[middle_point + z + 1].is_completed:
                continue

            left_part = np.array(
                [points[middle_point - y - x - 2].predicted_value,
                 points[middle_point - y - 1].predicted_value,
                 points[middle_point].predicted_value]
            )

            if np.isnan(np.sum(left_part)):
                # print("template", template_number, "can't be used")
                continue

            for shifted_template in shifts_for_each_template[template_number]:
                if np.linalg.norm(left_part - shifted_template[:3]) <= MAX_NORM_DELTA:
                    points[middle_point + z + 1].predictions_set = np.append(points[middle_point + z + 1].predictions_set, shifted_template[3])
                    points[middle_point + z + 1].is_virgin = False

    for middle_point in range(first_not_completed, len(points)):
        # print("  recalculating point", middle_point, )
        point_obj = points[middle_point]

        setSum = 0
        for cnt in range(len(point_obj.predictions_set)):
            setSum += point_obj.predictions_set[cnt]
            
        if point_obj.predictions_set.size:
            point_obj.predicted_value = setSum / len(point_obj.predictions_set)
        else:
            continue

        cur_error = abs(point_obj.real_value - point_obj.predicted_value)

        if np.isnan(point_obj.predicted_value) or (cur_error > MAX_ABS_ERROR and middle_point != len(points) - 1):
            point_obj.predicted_value = np.nan

    # for printed_point_index in range(S, len(points)):
    #     points[printed_point_index].info()
    # print('\n')

    points[first_not_completed].is_completed = True


In [1]:
def predict(i, k):
    complete_points = [Point(_, np.array([]), _, 0, 1) for _ in LORENZ[i - k - 33: i - k + 1]]  # правая граница не включена => это список из 34 + k точек
    new_points = [Point(_, np.array([]), np.nan, 1, 0) for _ in LORENZ[i - k + 1: i + 1]]
    points = complete_points + new_points
    
    reforecast(points, S - 10)
    for cur_point in range(1, k):
        reforecast(points, S + cur_point)
        
    return abs(LORENZ[i] - points[-1].predicted_value), not np.isnan(points[-1].predicted_value)

In [79]:
%%time
# Generating templates
templates_by_distances = np.array(list(
    product(range(CLAWS_MAX_DIST + 1), range(CLAWS_MAX_DIST + 1), range(CLAWS_MAX_DIST + 1)))
)

Wall time: 2 ms


In [80]:
%%time
# Training - FIT
shifts_for_each_template = []
for template_number in range(len(templates_by_distances)):
    [x, y, z] = templates_by_distances[template_number]
    cur_claws_indexes = np.array([0, x + 1, x + y + 2, x + y + z + 3])
    tmp = cur_claws_indexes + np.arange(TRAIN_GAP - cur_claws_indexes[3]).reshape(-1, 1)
    shifts_for_each_template.append(LORENZ[tmp])

Wall time: 76 ms


In [91]:
i, k = 99950, 1
complete_points = [Point(_, np.array([]), _, 0, 1) for _ in LORENZ[i - k - 33: i - k + 1]]  # правая граница не включена => это список из 34 + k точек
new_points = [Point(_, np.array([]), np.nan, 1, 0) for _ in LORENZ[i - k + 1: i + 1]]
points = complete_points + new_points

In [88]:
%%time
reforecast(points, S - 10)

Wall time: 9.26 s


In [94]:
%%time
jit_reforecast(points, S - 10)

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
[1m[1mnon-precise type pyobject[0m
[0m[1mDuring: typing of argument at <ipython-input-93-2a895ce4a275> (3)[0m
[1m
File "<ipython-input-93-2a895ce4a275>", line 3:[0m
[1mdef jit_reforecast(points, first_not_completed):
[1m    for template_number in range(len(templates_by_distances)):
[0m    [1m^[0m[0m

This error may have been caused by the following argument(s):
- argument 0: Cannot type list element type <class '__main__.Point'>
