### Входные данные

У вас имеется поток данных (генератор data_stream). Поля это случайные величины - так сделано для упрощения генерации данных. Есть три поля (названы по уровню сложности задания)

### Задание
##### Мотивация:
У вас есть куча временных рядов, вы хотите научиться предсказывать следующее значение по 1000 предыдущим. 1000 признаков окна это слишком много, однако вы решили заменить их 5ю: средним, дисперсией, минимумом, медианой и максимумом. Однако, все эти признаки надо подсчитать, причём хочется уметь это делать быстро (в течение часа)
##### Для каждого поля нужно сделать следующее:

1. Пробежаться по данным окном размера 1000 (окно сдвигается на 1, то есть следующее окно пересекается с предыдущим по 999 элементам).

2. Для каждого окна посчитайте среднее значение поля и его дисперсию. Делайте yield этих значений, получая генератор tuple. 

3. Для каждого окна найдине минимум, медиану и максимум в нём. Делайте yield этих значений, получая генератор tuple. 

Ответом, который нужно будет засабмитить в гугл форму, является среднее значение tuple по получившемуся потоку, округлённое до 2го знака.

### Замечания

1. Обратите внимания как генерируются поля. Постарайтесь понять особенность каждого поля и как это можно использовать. Желательно, чтобы для каждого поля у вас было своё решение, максимально эффективно использующее знание об этом поле.
2. Полезные библиотеки: itertools, numpy, collections + всё что найдёте в интернете и можно поставить через pip install
3. **Медианой отсортированного массива arr считайте значение arr[len(arr) // 2]**



Если измерять время работы функций временем работы функции example, то примерное время работы такое:
Одновременно среднее, дисперсия - 1.17
Одновременно минимум, максимум и медиана:easy - 0.87
medium - 2.11
nightmare - 2.85


#### Генерация данных

In [196]:
from collections import namedtuple, deque
import random, copy
from decimal import Decimal

Record = namedtuple('Record', 'easy medium nightmare')

def data_stream():
    random_generator = random.Random(42)
    easy = 0
    for _ in range(10000000):
        easy += random_generator.randint(0, 2) 
        medium = random_generator.randint(0, 256 - 1)
        nightmare = random_generator.randint(0, 1000000000 - 1)
        
        yield Record(
            easy=easy,
            medium=medium,
            nightmare=nightmare
        )
        
def easy_stream():
    for record in data_stream():
        yield record.easy
        
def medium_stream():
    for record in data_stream():
        yield record.medium
        
def nightmare_stream():
    for record in data_stream():
        yield record.nightmare

#### Подсчёт среднего значения tuple по потоку

In [79]:
import numpy as np

def get_tuple_stream_mean(stream, number_of_values):
    result = np.zeros(number_of_values, dtype=np.dtype(Decimal))
    count = Decimal(0) 
    for streamed_tuple in stream:
        result += streamed_tuple
        count += 1
    #return ['{:0.2f}'.format(x) for x in result / count]
    return result / count

In [None]:
%%time
def example(stream):
    for value in stream:
        yield (value, value + 10)
print(get_tuple_stream_mean(example(medium_stream()), 2))

In [108]:
# Генератор среднего значения и дисперсии для потока easy
def easy_MV(window_size): # расчет "в лоб", с помощью Numpy
    window = deque(range(0, window_size), window_size) # создаем очередь для заполнения окна
    gen = easy_stream()
    count = 0 # счетчик для подсчета количества элементов в потоке и для отслеживания наполнения окна
    mean = 0
    variance = 0
    for d in gen:
        count +=1
        window.append(d)
        if count < window_size: 
            continue # если окно не заполнено, то продолжаем его заполнение
        
        mean = np.mean(window)
        variance = np.var(window)
        yield (mean, variance)

def easy_MV_custom(window_size=1000): # оптимизированный расчет
    window = deque(range(0, window_size), window_size) # создаем очередь для заполнения окна
    gen = easy_stream()
    count = 0 # счетчик для подсчета количества элементов в потоке и для отслеживания наполнения окна
    n0 = 0 # первое число в очереди, которое будет вытесняся следующим числом из потока
    summ = 0 # сумма элементов в окне
    summ_square = 0 # сумма квадратов элементов в окне, для расчета дисперсии 
# Будем уточнять сумму элементов в окне, после добавления нового элемента. 
# Для этого из предыдущей суммы (суммы элементов предыдущего окна) вычтем элемент очереди, который удалили
# и добавим элемент, который добавили в очередь. Таким образом нам не нужно будет пересчитывать среднее и 
# использовать np.mean() на выборке из 1000 элементов
    for _ in gen:
        count +=1
        window.append(next(gen))
        print(window[-1])
        if count < window_size: continue # если окно не заполнено, продолжаем его заполнять
        
        if count == window_size: # если окно только что заполнилось, то первый раз:
            summ = np.sum(window) # считаем сумму первый раз
            variance = np.var(window) # считаем дисперсию первый раз
            mean = np.mean(window) # считаем среднее первый раз
            n0 = window[0] # запоминаем первое число в очереди, потом оно вытеснится новым число из потока
            n0_power = window[0]**2
            summ_square = np.sum(np.square(window))
            yield (mean, variance)
            continue

# обновляем значение суммы, вычитаем самый элемент из предыдущего окна и 
# добавляем новый - самый правый в текущем окне 
        summ = summ - n0 + window[-1] 
        summ_square = summ_square - n0_power + window[-1]**2
# пересчитываем новое значение среднего для нового (текущего) окна. Не суммируя снова все элементы
        mean = summ / Decimal(window_size)
        #mean = np.mean(window) # для проверки используем стандартную функцию np.mean()
        #variance = np.var(window) # для проверки используем стандартную функцию np.var()
        #variance = (summ_square - window_size * (mean**2)) / Decimal(window_size)
        n0 = window[0]
        n0_power = window[0]**2
        yield (mean, variance)    

In [24]:
%%time
stat = get_tuple_stream_mean(easy_MV(1000), 2)

1007.954 340324.613884
1009.974 340344.959324
1011.997 340363.224991
1014.021 340379.380559
1016.045 340395.438975
1018.07 340409.3811
1020.095 340419.171975
1022.12 340428.8616
1024.146 340436.426684
1026.173 340445.915071
CPU times: user 19min 38s, sys: 107 ms, total: 19min 38s
Wall time: 19min 38s


In [25]:
print('Средние значения дисперсии и среднего арфиметического на потоке easy:\n')
print('Среднее значение mean -\t\t', stat[0])
print('Среднее значение variance -\t', stat[1])

Средние значения дисперсии и среднего арфиметического на потоке easy:

Среднее значение mean -		 4999675.76
Среднее значение variance -	 333535.65


CPU times: user 1min 20s, sys: 49 ms, total: 1min 20s
Wall time: 1min 20s

In [None]:
print('Средние значения дисперсии и среднего арфиметического на потоке easy:\n')
print('Среднее значение mean -\t\t', stat[0])
print('Среднее значение variance -\t', stat[1])

In [98]:
# Генератор минимума, медианы и максимума для потока easy
def MMM_easy(window_size):
    window = deque(range(0, window_size), window_size) # создаем очередь для заполнения окна
    gen = easy_stream()
    count = 0 # счетчик для подсчета количества элементов в потоке и для отслеживания наполнения окна
    for d in gen:
        count +=1
        window.append(d)
        if count >= window_size: # если окно заполнено, то начинаем считать статистики
            easy_min = window[0]
            easy_median = window[(len(window))//2]
            easy_max = window[-1]
            yield (easy_min, easy_median, easy_max)

In [99]:
%%time
stat = get_tuple_stream_mean(MMM_easy(1000), 3)

CPU times: user 1min 44s, sys: 20.3 ms, total: 1min 44s
Wall time: 1min 44s


In [100]:
print('Средние значения минимума, медианы и максимума на потоке easy:\n')
print('Средний значение min -\t\t', stat[0])
print('Среднее значение median -\t', stat[1])
print('Среднее значение max -\t\t', stat[2])
print('Округленные значнения:', ['{:0.2f}'.format(x) for x in stat])

Средние значения минимума, медианы и максимума на потоке easy:

Средний значение min -		 4999175.792842704986228124190
Среднее значение median -	 4999675.776641486484499801530
Среднее значение max -		 5000174.759596183658747508876
Округленные значнения: ['4999175.79', '4999675.78', '5000174.76']


In [42]:
def test_gen(size):
    for i in range(1, size):
        yield i*random.randint(0, 2)



In [71]:
# Генератор среднего значения и дисперсии для всех потоков
def MV_custom(stream, window_size): # оптимизированный расчет
    window = deque(range(0, window_size), window_size) # создаем очередь для заполнения окна
    gen = stream
    count = 0 # счетчик для подсчета количества элементов в потоке и для отслеживания наполнения окна
    X_0 = 0 # первое число в очереди, которое будет вытесняся следующим числом из потока
    Mean_prev = 0 # первое значение среднего
    Variance_prev = 0 # первое значение дисперсии
# Будем уточнять сумму элементов в окне, после добавления нового элемента. 
# Для этого из предыдущей суммы (суммы элементов предыдущего окна) вычтем элемент очереди, который удалили
# и добавим элемент, который добавили в очередь. Таким образом нам не нужно будет пересчитывать среднее и 
# использовать np.mean() на выборке из 1000 элементов
    for number in gen:
        count +=1
        window.append(number)
        if count < window_size: continue # если окно не заполнено, продолжаем его заполнять
        
        if count == window_size: # если окно только что заполнилось, то первый раз:
            Variance = Decimal(np.var(window)) # считаем дисперсию первый раз
            Mean = Decimal(np.mean(window)) # считаем среднее первый раз
            X_0 = Decimal(window[0]) # запоминаем первое число в очереди, потом оно вытеснится новым число из потока
                 
            yield (Mean, Variance)
            continue
            

# пересчитываем новое значение среднего для нового (текущего) окна. Не суммируя снова все элементы
        Mean_prev = Mean # сохраняем пердыдущее значение среднего
        Variance_prev = Variance # сохраняем предыдущее значение дисперсии
        Mean = Mean_prev + Decimal(window[-1] - X_0)/ Decimal(window_size)
        Variance = Variance_prev + (Decimal(window[-1]) - X_0) * (Decimal(window[-1]) + X_0 - Mean - Mean_prev) / Decimal(window_size)
        
        # расчет в лоб, с помощью Numpy
        #Mean = np.mean(window)
        #Variance = np.var(window)
        
            
        #print("Window:", window)
        #print("X_0:", X_0, "\tX_n:", window[-1])
        #print("Mean_prev:", Mean_prev, "\tMean:", Mean)
        #print("Variance_prev:", Variance_prev, "\tVariance:", Variance, "Real Variance:", np.var(window), "\n")
        
        
        X_0 = window[0] # сохраняем значение самого левого элемента, перед тем как он будет вытеснен из очереди
        yield (Mean, Variance)    
    

In [96]:
%%time
stat = get_tuple_stream_mean(MV_custom(easy_stream(), 1000), 2)
#stat = MV_custom(test_gen(100), 10)


CPU times: user 2min 24s, sys: 13.8 ms, total: 2min 24s
Wall time: 2min 24s


In [97]:
print('Средние значения дисперсии и среднего арфиметического на потоке easy:\n')
print('Среднее значение mean -\t\t', stat[0])
print('Среднее значение variance -\t', stat[1])
print('Округленные значнения:', ['{:0.2f}'.format(x) for x in stat])
print(type(stat[0]))

Средние значения дисперсии и среднего арфиметического на потоке easy:

Среднее значение mean -		 4999675.276494780928603468009
Среднее значение variance -	 83439.33665582540527690643697
Округленные значнения: ['4999675.28', '83439.34']
<class 'decimal.Decimal'>


Средние значения дисперсии и среднего арфиметического на потоке easy до использования типа Decimal:

Среднее значение mean -		 4999675.28
Среднее значение variance -	 83444.67

In [313]:
# Расчет Min, Median, Max (МММ) для потоков medium и nightmare. Эти потоки не сортированы по возрастанию, поэтому
# нужно включить сортировку и оптимизировать поиск медианы.

def MMM_brute(stream, window_size):
    window = deque(range(0, window_size), window_size) # создаем очередь для заполнения окна
    gen = stream
    count = 0 # счетчик для подсчета количества элементов в потоке и для отслеживания наполнения окна
    Min = 0
    Mean = 0
    Max = 0
    for d in stream:
        window.append(d)
        count += 1
        if count < window_size: 
            continue
        if count == window_size:
            sort_list = np.sort(window)
            Min = sort_list[0]
            Mean = sort_list[len(sort_list) // 2]
            Max = sort_list[-1]
            yield (Min, Mean, Max)
               
        sort_list = np.sort(window)
        Min = sort_list[0]
        Mean = sort_list[len(sort_list) // 2]
        Max = sort_list[-1]
# Отладочный блок, можно удалить
        #print("Window:", window)
        #print("Sorted window:", sort_list)
        #print("Min:", Min, "\tMean:", Mean, "\tMax:", Max)
        
        yield (Min, Mean, Max)
        
def MMM_custom(stream, window_size):
# Суть метода - создаем "зеркало" "окна", в котором выполняем сортировку и хранием его все время отсортированным.
# При поступлении из потока в "окно" нового значения, мы вставляем его в "зеркало" в соотвествии с ранжировкой,
# в свою очередь вбывшее из "окна" значение удаляем из "зеркала", сохраняя упорядоченность элементов в "зеркале".
# Таким образом, "окно" нам нужно только для того, что бы знать значение выбывшего и прибывшего элементов, 
# а минимум, максиммум, и медиану мы находим в "зеркале".
    
    window = deque(range(0, window_size), window_size) # создаем очередь для заполнения окна
    gen = stream
    count = 0 # счетчик для подсчета количества элементов в потоке и для отслеживания наполнения окна
    Min = 0
    Mean = 0
    Max = 0
    X_lost = 0 # значение, которое будет вытесняться из очереди следующим числом из потока
    for d in stream:
        window.append(d)
        count += 1
        if count < window_size: 
            continue
        if count == window_size: # если окно заполнилось, делаем его сортировку и сохраняем в зеркале
            mirrow = deque(np.sort(window), len(window)) # создаем массив для хранения "зеркала".
            Min = mirrow[0]
            Mean = mirrow[len(window) // 2]
            Max = mirrow[-1]
            X_lost = window[0] # сохраняем значение, которое будет вытесняться следующим числом из потока
            yield (Min, Mean, Max)
            continue
        
        mirrow.remove(X_lost)
        if window[-1] >= mirrow[-1]: 
            mirrow.append(window[-1])
        elif window[-1] <= mirrow[0]:
            mirrow.appendleft(window[-1])
        else:
            for i, _ in enumerate(mirrow):
                if mirrow[i] <= window[-1] <= mirrow[i+1]: 
                    mirrow.insert(i+1, window[-1])
                    break 
        X_lost = window[0]
        Min = mirrow[0]
        Mean = mirrow[len(mirrow) // 2]
        Max = mirrow[-1]
        yield (Min, Mean, Max)

In [316]:
%%time
stat = get_tuple_stream_mean(MMM_custom(nightmare_stream(),1000), 3)

CPU times: user 19min 21s, sys: 208 ms, total: 19min 21s
Wall time: 19min 21s


In [317]:
print('Средние значения Min, Median, Max на потоке nightmare:\n')
print('Среднее значение Min -\t\t', stat[0])
print('Среднее значение Mean -\t', stat[1])
print('Среднее значение Max -\t', stat[2])
print('Округленные значнения:', ['{:0.2f}'.format(x) for x in stat])
print(type(stat[0]))

Средние значения Min, Median, Max на потоке nightmare:

Среднее значение Min -		 1017512.293608431482305082278
Среднее значение Mean -	 500438415.6358754239548530898
Среднее значение Max -	 999017359.9740516077556147859
Округленные значнения: ['1017512.29', '500438415.64', '999017359.97']
<class 'decimal.Decimal'>


In [177]:
%%time
stat = get_tuple_stream_mean(MMM_custom(easy_stream(),1000), 3)

KeyboardInterrupt: 

In [None]:
print('Средние значения (MMM_custom) Min, Median, Max на потоке medium:\n')
print('Среднее значение Min -\t\t', stat[0])
print('Среднее значение Mean -\t', stat[1])
print('Среднее значение Max -\t', stat[2])
print('Округленные значения:', ['{:0.2f}'.format(x) for x in stat])
print(type(stat[0]))

In [182]:

for d in MMM_custom(test_gen(15), 5):
    print(d)
    #print("Window:", d[0])
    #print("Sorted window:", d[1])
    #print("Min:", d[2], "\tMean:", d[3], "\tMax:", d[4])



(0, 0, 2)
(0, 0, 2)
(0, 0, 6)
(0, 0, 6)
(0, 0, 6)
(0, 0, 18)
(0, 0, 18)
(0, 0, 18)
(0, 11, 18)
(0, 12, 18)
(0, 11, 13)


In [166]:
d = next(MMM_custom(test_gen(15), 5))
print(type(d))
print(len(d))
print(d)

<class 'tuple'>
3
(0, 1, 4)


###### print('Средние значения дисперсии и среднего арфиметического на потоке easy:\n')
print('Среднее значение mean -\t\t', stat[0])
print('Среднее значение variance -\t', stat[1])

In [None]:
%%time
stat = get_tuple_stream_mean(MV_custom(easy_stream(), 1000), 2)

In [27]:
print('Средние значения дисперсии и среднего арфиметического на потоке easy:\n')
print('Среднее значение mean -\t\t', stat[0])
print('Среднее значение variance -\t', stat[1])

Средние значения дисперсии и среднего арфиметического на потоке easy:

Среднее значение mean -		 4999675.28
Среднее значение variance -	 83444.67


CPU times: user 18min 41s, sys: 140 ms, total: 18min 41s
Wall time: 18min 42s

Cредние значения дисперсии и среднего арфиметического на потоке medium:

Среднее значение mean -		 4999674.76
Среднее значение variance -	 333535.65

In [307]:
window = deque(range(5), 5)
#print(window)
for _ in range(5):
    window.append(random.randint(1,20))
print("Исходное окно", window)
mirrow = deque(np.sort(window), len(window))

print("Сортированное зеркало окна", mirrow)
X_lost = window[0]
window.append(random.randint(1, 20))
print("Окно с новым элементом", window)
print("Элемент окна, который был вытеснен", X_lost)
print("Новый элемент окна, который зашел из потока:", window[-1])
mirrow.remove(X_lost)
print("Зеркало с удаленным элементом, который был вытеснен из окна:", mirrow)
if window[-1] >= mirrow[-1]: 
    mirrow.append(window[-1])
elif window[-1] <= mirrow[0]:
    mirrow.appendleft(window[-1])
else:
    for i, _ in enumerate(mirrow):
        if mirrow[i] <= window[-1] <= mirrow[i+1]: 
            mirrow.insert(i+1, window[-1])
            break
print("Зеркало с новым элементом", mirrow)

Исходное окно deque([15, 18, 16, 14, 11], maxlen=5)
Сортированное зеркало окна deque([11, 14, 15, 16, 18], maxlen=5)
Окно с новым элементом deque([18, 16, 14, 11, 7], maxlen=5)
Элемент окна, который был вытеснен 15
Новый элемент окна, который зашел из потока: 7
Зеркало с удаленным элементом, который был вытеснен из окна: deque([11, 14, 16, 18], maxlen=5)
Зеркало с новым элементом deque([7, 11, 14, 16, 18], maxlen=5)


In [304]:
window[-1]

4

In [31]:
stream = deque(range(1,16), 15)
D_prev = np.var(stream)
X_0 = stream[0]
Mean_prev = np.mean(stream)
print(stream)
for i in range(16, 25):
    stream.append(i*2)
    print(stream)
    Mean_curr = np.mean(stream)
    Mean_calc = Mean_prev + (stream[-1] - X_0) / len(stream)
    X_n = stream[-1]
    D_curr = np.var(stream)
    D_calc = D_prev + (X_n - X_0)*(X_n + X_0 - Mean_prev - Mean_curr)
    print(D_curr, D_calc)
    print(Mean_curr, Mean_calc)

deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=15)
deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 32], maxlen=15)
49.5288888889 481.6
10.0666666667 10.0666666667
deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 32, 34], maxlen=15)
78.8266666667 507.066666667
12.2 10.2
deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 32, 34, 36], maxlen=15)
106.106666667 529.666666667
14.4 10.3333333333
deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 32, 34, 36, 38], maxlen=15)
130.888888889 549.0
16.6666666667 10.4666666667
deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 32, 34, 36, 38, 40], maxlen=15)
152.666666667 564.666666667
19.0 10.6
deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 32, 34, 36, 38, 40, 42], maxlen=15)
170.906666667 576.266666667
21.4 10.7333333333
deque([8, 9, 10, 11, 12, 13, 14, 15, 32, 34, 36, 38, 40, 42, 44], maxlen=15)
185.048888889 583.4
23.8666666667 10.8666666667
deque([9, 10, 11, 12, 13, 14, 15, 32, 34, 36, 38, 40, 42, 44, 46], maxlen=15)
194.506666667 585.66

In [None]:
summa = np.sum(window)
Decimal(summa)/Decimal(len(window))
    

In [133]:
?list.p

In [45]:
st = MV_custom(easy_stream(), 1000)
stats = []
mean_stats =[]
count = 0
for i in range(0,4):
    stats.append(next(st))
    count +=1
    mean

stats

[(503.78899999999999, 84541.220478999996),
 (504.803, 84550.759999999995),
 (505.81599999999997, 84561.275999999998),
 (506.82999999999998, 84570.760999999999)]

In [77]:
st = [23.34, 34.675, 89.87]
summ = 0
count = 0
for i in st:
    count += 1
    summ += Decimal((i + 2))
print(summ)
print(type(summ))

153.8850000000000015631940187
<class 'decimal.Decimal'>


In [275]:
dir(deque)

['__add__',
 '__bool__',
 '__class__',
 '__contains__',
 '__copy__',
 '__delattr__',
 '__delitem__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__iadd__',
 '__imul__',
 '__init__',
 '__init_subclass__',
 '__iter__',
 '__le__',
 '__len__',
 '__lt__',
 '__mul__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__reversed__',
 '__rmul__',
 '__setattr__',
 '__setitem__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 'append',
 'appendleft',
 'clear',
 'copy',
 'count',
 'extend',
 'extendleft',
 'index',
 'insert',
 'maxlen',
 'pop',
 'popleft',
 'remove',
 'reverse',
 'rotate']

In [293]:
?deque.append