### Helper for loading the datasets

In [1]:
import pandas as pd
from usp_stream_datasets import *

pd.set_option('mode.use_inf_as_na', True)

attr_columns = ['Att1', 'Att2', 'Att3', 'Att4', 'Att5', 'Att6', 'Att7', 'Att8', 'Att9',
                'Att10', 'Att11', 'Att12', 'Att13', 'Att14', 'Att15', 'Att16', 'Att17',
                'Att18', 'Att19', 'Att20', 'Att21', 'Att22', 'Att23', 'Att24', 'Att25',
                'Att26', 'Att27', 'Att28', 'Att29', 'Att30', 'Att31', 'Att32', 'Att33']

NUMBER_OF_POOLS = 8
DEBUG_SIZE = None

### Load a specific Dataset

In [4]:
import json
import time
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from scipy import stats


def test_class_attr(df_baseline: pd.DataFrame, df_stream: pd.DataFrame, window_size, attr: str):
    print(f"Testing: {attr} - Window Size: {window_size}")
    results = []
    windows = df_stream[attr].rolling(window=window_size)
    count = window_size
    for window in windows:
        ks = stats.kstest(df_baseline[attr], window)
        results.append({"attr": attr, "start_index": count - window_size, "end_index": count, "p_value": ks.pvalue})
        count += 1
    return results


def write_metadata(metadata: dict, dataset_prefix: str):
    json_object = json.dumps(metadata, indent=4)
    with open(f"{dataset_prefix}_metadata.json", "w") as outfile:
        outfile.write(json_object)


#######################################################
################ MULTIPROCESSING STUFF ################
#######################################################

# https://stackoverflow.com/questions/63096168/how-to-apply-multiprocessing-to-a-sliding-window
from multiprocessing import Process
from multiprocessing import Pool
from timeit import default_timer as timer
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support


def window(start_idx, df_baseline, df_stream, attr, window_size):
    start = start_idx + 1
    end = start + window_size
    baseline = df_baseline[attr]
    stream = df_stream[attr][start:end]
    ks = stats.kstest(baseline, stream)
    return {"attr": attr, "start": start, "end": end, "p_value": ks.pvalue}


def async_ks_test(df_baseline, df_stream, attr, DEBUG_SIZE=None):
    NUM_EL = len(df_stream[attr])
    WINDOW_SIZE = len(df_baseline[attr])
    DATA = list(range(NUM_EL))
    STARTS = list(range(NUM_EL - WINDOW_SIZE - 1))
    if DEBUG_SIZE:
        STARTS = list(range(DEBUG_SIZE))
    
    with Pool(NUMBER_OF_POOLS) as p:
        result_multi = p.starmap(window, zip(STARTS, repeat(df_baseline), repeat(df_stream), repeat(attr), repeat(WINDOW_SIZE)))   
        # https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments
    
    df_results = pd.DataFrame(result_multi)
    return df_results


def test_dataset(dataset: str):
    results_folder = "results/"
    dataset_prefix = results_folder + dataset.lower().replace(".","").replace("(","").replace(")","").replace(" ","-")
    metadata = {
        "dataset": dataset,
        "execution_time": {},
        "pools": NUMBER_OF_POOLS
    }

    if DEBUG_SIZE:
        metadata["debug_size"] = DEBUG_SIZE
    
    # open dataframe TOTAL, fetch minimal size W and number of species N
    total_df = load_insect_dataset(insects_datasets[dataset]["filename"])
    classes = total_df['class'].unique().tolist()
    minimal_class = total_df["class"].value_counts().min()
    window_size = int(minimal_class * 0.25)
    N_species = len(classes)
    
    # fetch slices from all species, concat and create a dataframe BASELINE
    df_baseline = []
    for species in classes:
        df_species = total_df[total_df["class"] == species]
        df_species_baseline = df_species.iloc[:window_size,]
        df_baseline.append(df_species_baseline)
    
    df_baseline = pd.concat(df_baseline)
    index_as_list = df_baseline.index.tolist()
    
    # remove the ids located on the BASELINE dataframe from the TOTAL dataframe, creating a STREAM dataframe
    df_stream = total_df.loc[~total_df.index.isin(index_as_list)]
    df_stream = df_stream.rename_axis('original_index').reset_index()

    print(f'DF Total: {total_df.shape}')
    print(f'DF baseline: {df_baseline.shape}')
    print(f'DF stream: {df_stream.shape}')
    
    
    # For all attributes, test the corresponding attribute from baseline using a window with size (N * W)
    results = []
    
    for attr in attr_columns:
        print(f"Testing for attr: {attr}")
        attr_start_time = time.time()
        attr_results = async_ks_test(df_baseline, df_stream, attr, DEBUG_SIZE=DEBUG_SIZE)
        attr_end_time = time.time()
        elapsed_attr_time = attr_end_time - attr_start_time
        metadata["execution_time"][attr] = elapsed_attr_time
        results.append(attr_results)
    
    metadata["Window size"] = window_size
    metadata["Baseline size"] = df_baseline.shape[0]
    metadata["Stream size"] = df_stream.shape[0]
    write_metadata(metadata, dataset_prefix)
    
    dataset_results = pd.concat(results)
    dataset_results.to_csv(dataset_prefix + '.csv', index=None)

## Run KS tests for all datasets

In [3]:
for dataset in insects_datasets.keys():
    print(f"Testing KS on dataset: {dataset}")
    test_dataset(dataset)

Testing KS on dataset: Incremental (bal.)
DF Total: (57018, 34)
DF baseline: (14250, 34)
DF stream: (42768, 35)
Testing for attr: Att1
Testing for attr: Att2
Testing for attr: Att3
Testing for attr: Att4
Testing for attr: Att5
Testing for attr: Att6
Testing for attr: Att7
Testing for attr: Att8
Testing for attr: Att9
Testing for attr: Att10
Testing for attr: Att11
Testing for attr: Att12
Testing for attr: Att13
Testing for attr: Att14
Testing for attr: Att15
Testing for attr: Att16
Testing for attr: Att17
Testing for attr: Att18
Testing for attr: Att19
Testing for attr: Att20
Testing for attr: Att21
Testing for attr: Att22
Testing for attr: Att23
Testing for attr: Att24
Testing for attr: Att25
Testing for attr: Att26
Testing for attr: Att27
Testing for attr: Att28
Testing for attr: Att29
Testing for attr: Att30
Testing for attr: Att31
Testing for attr: Att32
Testing for attr: Att33
Testing KS on dataset: Incremental (imbal.)
DF Total: (452044, 34)
DF baseline: (19992, 34)
DF stream: (4

Process ForkPoolWorker-1856:
Process ForkPoolWorker-1849:
Process ForkPoolWorker-1854:
Process ForkPoolWorker-1853:
Process ForkPoolWorker-1855:
Process ForkPoolWorker-1850:
Process ForkPoolWorker-1852:
Process ForkPoolWorker-1851:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 51, in 

KeyboardInterrupt: 

In [None]:
dataset_results

In [None]:
# def slice_species_dataframes(main_df, species: str):
#     df_species = main_df[main_df["class"] == species]
#     df_baseline = df_species.iloc[:window_size,]
#     df_stream = df_species.iloc[window_size:,]
#     return df_baseline, df_stream


# import time

# for dataset in insects_datasets.keys():
#     if not dataset == "Incremental-gradual (imbal.)":
#         print(f"Testing on dataset: {dataset}")
#         dataset_prefix = dataset.lower().replace(".","").replace("(","").replace(")","").replace(" ","-")
#         main_df = load_insect_dataset(insects_datasets[dataset]["filename"])
#         classes = main_df['class'].unique().tolist()
#         minimal_class = main_df["class"].value_counts().min()
#         window_size = int(minimal_class * 0.25)

#         for species in classes:
#             species_start_time = time.time()   
#             for attr in attr_columns:
#                 attr_start_time = time.time()
#                 print(f"Testing KS windows on species: {species} - {attr}")
#                 test_class_attr(species, dataset_prefix, attr)
#                 attr_end_time = time.time()
#                 elapsed_attr_time = attr_end_time - attr_start_time
#                 print(f'Execution time for {species} - {attr}: {elapsed_attr_time} seconds.')
            
#             species_end_time = time.time()
#             elapsed_time = species_end_time - species_start_time
#             print(f'Total execution time for {species}: {elapsed_time} seconds.')
#             print()

#         print()

# import time
# import math
# from multiprocessing import Pool
# from multiprocessing import freeze_support


# def run_multiprocessing_ks(func, baseline, windows, n_processors):
#     '''Define function to run mutiple processors and pool the results together'''
#     with Pool(processes=n_processors) as pool:
#         return pool.map(func, baseline, windows)


# def ks_test(baseline, window, count, window_size):
#     ks = stats.kstest(baseline, window)
#     return {"attr": attr, "start_index": count - window_size, "end_index": count, "p_value": ks.pvalue}


# def test_class_attr(df_baseline: pd.DataFrame, df_stream: pd.DataFrame, window_size, attr: str):
#     results = []
#     windows = df_stream[attr].rolling(window=window_size)
#     count = window_size
#     for window in windows:
#         ks = stats.kstest(df_baseline[attr], window)
#         results.append({"attr": attr, "start_index": count - window_size, "end_index": count, "p_value": ks.pvalue})
#         count += 1
#     return results


# def main():
#     start = time.time()
    
#     # set up parameters required by the task
#     num_max = 1000000
#     x_ls = list(range(num_max))
    
#     # the windowing for goes here...
#     # pass the task function, followed by the parameters to processors
#     n_processors = 6
#     out = run_multiprocessing(ks_test, baseline, stream, n_processors)

#     # Infos about running time
#     print("Input length: {}".format(len(x_ls)))
#     print("Output length: {}".format(len(out)))
#     print("Mutiprocessing time: {}mins\n".format((time.time()-start)/60))
#     print("Mutiprocessing time: {}secs\n".format((time.time()-start)))
#     return out