In [5]:
import logging
import multiprocessing
import pickle
import time
from math import ceil, floor
from typing import List

import pymongo


# quickfix of an older class with a typo in the attributes
from time_utils_legacy import Counters
from time_utils_legacy import Timebar

In [6]:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logging.debug("Log test")
sh = logger.handlers[0]
# formatter = logging.Formatter('%(asctime)s\t %(levelnames)s\t %(message)s')
formatter = logging.Formatter('%(levelname)s \t: %(message)s')
sh.setFormatter(formatter)
logging.debug("Log test")

DEBUG:root:Log test
DEBUG 	: Log test


In [7]:
def init_arrays(slots_per_timebar: int = 86400):
    """
    Create a array of Timebars
    :param slots_per_timebar: split timebar in how many slots in a day
    :return: array os timebars
    """

    db: List[Timebar] = []

    # python time yday range [1, 366]
    for day in range(1, 367):
        db.append(Timebar(slots_per_timebar))

    # logging.debug(f"Init with db with {len(database)} day slots")

    return db


def collection_worker(collections_names: list, delta_t: int =1):
    """
    A worker per collection (aka Lab probe). Parallelized mongo extraction
    :param collections_names: mongo collection name
    :param delta_t: Timebar's delta t granuallity
    """
    mongo = pymongo.MongoClient('mongodb://nuno:nunotpr@glua.ua.pt:22070/')
    mongo_db = mongo['thesis']

    # hardcoded for one year
    database = init_arrays()

    for collection in collections_names:
        cursor = mongo_db[collection]

        # number of items in the probe's collection
        how_many = cursor.count()
        percent = floor(how_many / 100)

        logging.info(f"Processing {collection}. {how_many} packets")

        loop = 0
        for packet in cursor.find():
            try:
                # Where are we?
                epoch_ref = packet['timestamp']
                t = time.gmtime(epoch_ref)  # type: time.struct_time

                # 1. Find the working day slot
                # day relative second (0.. 86400)
                drs = (t.tm_hour * 60 * 60) + (t.tm_min * 60) + t.tm_sec
                drs = int(drs / delta_t)

                # print(f"{drs} of {t.tm_year}-{t.tm_mon}-{t.tm_mday} {t.tm_hour}:{t.tm_min}:{t.tm_sec}")

                if database[t.tm_yday][drs] is None:
                    c = Counters()
                    c.inc_stats(packet)
                    database[t.tm_yday][drs] = c

                    # update hint
                    database[t.tm_yday].hint(epoch_ref)
                else:
                    database[t.tm_yday][drs].inc_stats(packet)

            except RuntimeError as e:
                logging.debug(f"{multiprocessing.current_process().pid} {multiprocessing.current_process().name} -{e}")
                pass

            loop = loop + 1
            if loop % percent == 0:
                logging.info(
                    f"{multiprocessing.current_process().pid} {multiprocessing.current_process().name} - {ceil((loop/how_many)*100)}")

    logging.info(
        f"{multiprocessing.current_process().pid} {multiprocessing.current_process().name} \nDumping {collection}... ")
    pickle.dump(database, open("counters_" + collection + ".raw", "wb"))
    logging.info(
        f"{multiprocessing.current_process().pid} {multiprocessing.current_process().name} Dumping of {collection} done.")





In [None]:
workers = []

for collection in [['windows10x86'], ['freebsd'], ['nuc'], ['toshiba'], ['ubuntu'], ['x58-PC', 'x58-pc', 'x58pc']]:
    w = multiprocessing.Process(
        name=f"{collection}",
        target=collection_worker,
        args=(collection,)
    )

    workers.append(w)

for i in workers:
    i.start()
    i.join()

for i in workers:
    i.join()