In [1]:
# Prepare import of modules from parent directory.
import os
import sys
module_path = os.path.abspath(os.path.join('../../'))
if module_path not in sys.path:
    sys.path.append(module_path)

import multiprocessing
from IPython.display import display, clear_output
from os import getpid

from preparations import things_provider

tp = things_provider.ThingsProvider()
tp.filter_only_primary_signal_datastreams()
primary_signal_datastream_ids = tp.get_datastream_ids()

tp = things_provider.ThingsProvider()
tp.filter_only_cycle_second_datastreams()
cycle_second_datastream_ids = tp.get_datastream_ids()

datastream_ids = primary_signal_datastream_ids + cycle_second_datastream_ids

tp = things_provider.ThingsProvider()
things_and_layer_names_by_datastream_id = tp.get_map_from_datastream_ids_to_thing_names_and_layer_names()

PROCESSES = 4
datastream_ids_by_things = {}
for datastream_id in datastream_ids:
    thing_name, layer_name = things_and_layer_names_by_datastream_id[datastream_id]
    if layer_name != 'primary_signal' and layer_name != 'cycle_second':
        continue
    if thing_name not in datastream_ids_by_things:
        datastream_ids_by_things[thing_name] = []
    datastream_ids_by_things[thing_name].append(datastream_id)
datastream_ids_for_processes = [[] for i in range(PROCESSES)]
i = 0
for thing_name, datastreams in datastream_ids_by_things.items():
    process_index = i % PROCESSES
        
    datastream_ids_for_processes[process_index] += datastreams
    i += 1
    
# Safety check:
total_datastreams = 0
for datastreams in datastream_ids_for_processes:
    total_datastreams += len(datastreams)
if total_datastreams != len(datastream_ids):
    raise Exception('Total datastreams is not equal to number of datastreams: ' + str(total_datastreams) + ' != ' + str(len(datastream_ids)))
for datastream in datastream_ids:
    found = False
    for datastreams in datastream_ids_for_processes:
        if datastream in datastreams:
            found = True
            break
    if not found:
        raise Exception('Datastream not found in any process: ' + str(datastream))
    

# Test with one thing:
""" thing_name = None
for datastream_id in things_and_layer_names_by_datastream_id:
    if thing_name is None and (
        things_and_layer_names_by_datastream_id[datastream_id][1] == 'primary_signal' or
        things_and_layer_names_by_datastream_id[datastream_id][1] == 'cycle_second'
    ):
        datastream_ids.append(datastream_id)
        thing_name = things_and_layer_names_by_datastream_id[datastream_id][0]
    elif thing_name == things_and_layer_names_by_datastream_id[datastream_id][0] and (
        things_and_layer_names_by_datastream_id[datastream_id][1] == 'primary_signal' or
        things_and_layer_names_by_datastream_id[datastream_id][1] == 'cycle_second'
    ):
        datastream_ids.append(datastream_id)
        
if len(datastream_ids) != 2:
    raise Exception('Number of datastream ids is not 2: ' + str(len(datastream_ids))) """

def run(datastream_ids):
    from studies import db
    from studies.cycles.lib import query_builder
    from studies.cycles.lib.thing import Thing
    from preparations import things_provider
    
    WINDOW_SIZE_DB = 50000
    WINDOW_SIZE_THINGS = 500
    LIMIT = 1000000
    
    db = db.DBClient(WINDOW_SIZE_DB, id)
    relevant_observations_query_bike = query_builder.get_relevant_observations(datastream_ids, LIMIT)
    relevant_observation_rows_bike_generator = db.execute_query(relevant_observations_query_bike)
    
    tp = things_provider.ThingsProvider()
    things_and_layer_names_by_datastream_id = tp.get_map_from_datastream_ids_to_thing_names_and_layer_names()

    VALIDATION = False
    RETRIEVE_ALL_CLEANUP_STATS = False

    things: dict[str,Thing] = {}
    i = 0
    print(f"{getpid()}: lets go \n")
    for row in relevant_observation_rows_bike_generator:
        thing_name, layer_name = things_and_layer_names_by_datastream_id[row[2]]
        if thing_name is None:
            raise Exception('Thing name is None for datastream id: ' + str(row[2]))
        if layer_name is None:
            raise Exception('Layer name is None for datastream id: ' + str(row[2]))
        if thing_name not in things:
            things[thing_name] = Thing(thing_name, WINDOW_SIZE_THINGS, VALIDATION, RETRIEVE_ALL_CLEANUP_STATS)
            
        things[thing_name].add_observation(layer_name, row[0], row[1])
        if i % 50000 == 0:
            # clear_output(wait=True)
            print(str(getpid()) + ': Iteration: ' + str(i) +' - Number of things ' + str(len(things)) + "\n")
        
        i += 1

    db.close()
    
    return things

pool = multiprocessing.Pool(processes = PROCESSES)
returns = pool.map(run, datastream_ids_for_processes)


Amount of datastreams: 19824
Amount of datastream ids: 19824
Amount of datastreams: 19825
Amount of datastream ids: 19825
442444: lets go 

442445: lets go 

442447: lets go 

442446: lets go 

442445: Iteration: 0 - Number of things 1

442444: Iteration: 0 - Number of things 1
442446: Iteration: 0 - Number of things 1


442447: Iteration: 0 - Number of things 1

442447: Iteration: 50000 - Number of things 4014

442445: Iteration: 50000 - Number of things 4014
442444: Iteration: 50000 - Number of things 4027


442446: Iteration: 50000 - Number of things 4016

442447: Iteration: 100000 - Number of things 4051

442444: Iteration: 100000 - Number of things 4064

442446: Iteration: 100000 - Number of things 4048

442445: Iteration: 100000 - Number of things 4053

442444: Iteration: 150000 - Number of things 4072

442447: Iteration: 150000 - Number of things 4055

442446: Iteration: 150000 - Number of things 4052

442445: Iteration: 150000 - Number of things 4054

442444: Iteration: 200000 

TypeError: 'NoneType' object is not iterable

In [5]:
import numpy as np
from juliacall import Main as jl
jl.seval("using OnlineStats")

total_distances = [
    [
        [] for i in range(24)
    ] for j in range(7)
]


for thing in things.values():
    day_idx = 0
    for day in thing.metrics:
        hour_idx = 0
        for hour in day:
            quantiles = jl.OnlineStats.value(hour)
            median = quantiles[0][2]
            total_distances[day_idx][hour_idx].append(median)
            hour_idx += 1
        day_idx += 1
   
median_distances = [
    [
        None for i in range(24)
    ] for j in range(7)
]
        
day_idx = 0
for day in total_distances:
    hour_idx = 0
    for hour in day:
        np_distances = np.array(hour)
        try:
            median = np.median(np_distances)
        except:
            print(hour)
            print(np_distances)
            raise Exception('Error')
        median_distances[day_idx][hour_idx] = median
        hour_idx += 1
    day_idx += 1
    
# Pretty print median distances
for day_idx in range(7):
    print('Day: ' + str(day_idx))
    for hour_idx in range(24):
        print('Hour: ' + str(hour_idx) + ' - Median distance: ' + str(median_distances[day_idx][hour_idx]))

Day: 0
Hour: 0 - Median distance: 0.0
Hour: 1 - Median distance: 0.0
Hour: 2 - Median distance: 0.0
Hour: 3 - Median distance: 0.0
Hour: 4 - Median distance: 0.0
Hour: 5 - Median distance: 0.0
Hour: 6 - Median distance: 0.0
Hour: 7 - Median distance: 0.0
Hour: 8 - Median distance: 0.0
Hour: 9 - Median distance: 0.0
Hour: 10 - Median distance: 0.0
Hour: 11 - Median distance: 0.0
Hour: 12 - Median distance: 0.0
Hour: 13 - Median distance: 0.0
Hour: 14 - Median distance: 0.0
Hour: 15 - Median distance: 0.0
Hour: 16 - Median distance: 0.0
Hour: 17 - Median distance: 0.0
Hour: 18 - Median distance: 0.0
Hour: 19 - Median distance: 0.0
Hour: 20 - Median distance: 0.0
Hour: 21 - Median distance: 0.0
Hour: 22 - Median distance: 0.0
Hour: 23 - Median distance: 0.0
Day: 1
Hour: 0 - Median distance: 0.0
Hour: 1 - Median distance: 0.0
Hour: 2 - Median distance: 0.0
Hour: 3 - Median distance: 0.0
Hour: 4 - Median distance: 0.0
Hour: 5 - Median distance: 0.0
Hour: 6 - Median distance: 0.0
Hour: 7 - M