In [None]:
import pickle
import random
import time
from functools import reduce
from pprint import pp
from typing import Callable

import pandas as pd

from algebra.graph.graph import Graph, MultipleType
from algebra.monoid.controller import MonoidController
from algos.graph_builder import military_algo
from algos.graph_processor.hclasses_searcher import search_Hclasses
from algos.graph_processor.idempotents_markupper import markup_idempotents
from algos.isom_builder.eco.eco_algo import IsomBuilderEcoAlgo
from algos.isom_builder.naive.naive_algo import IsomBuilderNaiveAlgo
from algos.isom_builder.shared.algo_config import AlgoConfig
from algos.isom_builder.shared.algo_init_set import AlgoInitSet
from samples import funny_samples, random_sample, simple_samples
from utils.events.eventsHandler import EH
from utils.tester.tester import (FuncSet, NamedConfig, TestCase, comboStats,
                                 inputType, inStats, intermediateType,
                                 postStats, resultType, run_tests, sumStats)

import sys 
sys.setrecursionlimit(9999999)

### test run functions


In [None]:
def timer(func: Callable[[postStats], resultType], args: postStats):
    tstart = time.time()
    res = func(args)
    tfinish = time.time()
    return tfinish-tstart, res


def get_events():
    return EH.process_events()


def composer(args: tuple[float, dict[str, int | float]]):
    d = args[1]
    d['time'] = args[0]
    print(d)
    return d


def sumUpper(stats_list: list[comboStats]):
    res = dict()
    for k in stats_list[0].keys(): # type: ignore
        res[k] = list(map(lambda x: x[k], stats_list)) # type: ignore
    return res


def preparator(args: tuple[MonoidController, MonoidController, AlgoConfig]):
    S1, S2, config = args
    G1 = military_algo(S1)
    G2 = military_algo(S2)
    init_set = AlgoInitSet(S1=S1, S2=S2, G1=G1,
                           G2=G2, config=config, H1=set(), H2=set())
    return init_set


def f(init_set: AlgoInitSet):

    match init_set.config.use_eco_algo:
        case True:
            markup_idempotents(init_set.G1)
            markup_idempotents(init_set.G2)
            init_set.H1 = set(search_Hclasses(init_set.G1))
            init_set.H2 = set(search_Hclasses(init_set.G2))

            algo = IsomBuilderEcoAlgo(init_set)
            result = algo.run()
            return result

        case False:
            algo = IsomBuilderNaiveAlgo(init_set)
            result = algo.run()
            return result


fs = FuncSet(
    preparator=preparator,
    f=f,
    inProcessor=timer,
    postProcessor=get_events,
    statsComposer=composer,
    sumUpper=sumUpper
)

### generate tests

In [None]:
def gen_tests(name: str, A: int, B: int, generators: int):
    Ski = []
    while len(Ski) < 5:
        S = random_sample.gen_random_group(
            set_size=5, generators_num=generators, minimize=True)
        if len(S.generators) != generators:
            continue
        g = military_algo(S)
        print(len(g.nodes))
        if len(g.nodes) > B or len(g.nodes) < A:
            continue

        print(S)
        Ski.append(S)

    Ski = sorted(Ski, key=len)
    TestCases = [TestCase(name+"_sg_"+f'{i+1}', S.mixed())
                 for i, S in enumerate(Ski)]
    return TestCases


def gen_tests2():
    As = [200, 400, 600, 800]
    Bs = [400, 600, 800, 1000]
    geneartors = [2, 3, 4]
    d = dict()

    for A, B in zip(As, Bs):
        d[(A, B)] = dict()
        for gn in geneartors:
            Ski = []
            while len(Ski) < 5:
                S = random_sample.gen_random_sample(
                    set_size=random.randint(4, 5), generators_num=gn, minimize=True)
                if len(S.generators) != gn:
                    continue
                g = military_algo(S)
                if len(g.nodes) > B or len(g.nodes) < A:
                    continue

                Ski.append(TestCase(f'tt-{A}-{B}-{gn}-{len(Ski)+1}', S))

            d[(A, B)][gn] = Ski
    return d


def gen_group(size: int, generators: int):
    def fact(x): return 1 if x == 1 else x * fact(x-1)

    while True:
        S = random_sample.gen_random_group(
            set_size=size,
            generators_num=generators,
            minimize=True)
        if len(S.generators) != generators:
            continue
        g = military_algo(S)
        print(len(g.nodes))
        if len(g.nodes) != fact(size):
            continue

        return TestCase(f'S{size}_{generators}', S)

In [None]:
def load_tests(name):
    with open(f'../output/{name}.pickle', 'rb') as f:
        TestCases = pickle.load(f)
    return TestCases


def save_tests(name, res: list[TestCase]):
    with open(f'../output/{name}.pickle', 'wb') as f:
        pickle.dump(res, f)


def load_result(name):
    with open(f'../output/{name}_res.pickle', 'rb') as f:
            res = pickle.load(f)
    return res

def save_result(name, res):
    with open(f'../output/{name}_res.pickle', 'wb') as f:
            pickle.dump(res, f)

In [None]:
# f1 = gen_tests('F1', 500, 1000, 3)
# f2 = gen_tests('F2', 500, 1000, 4)
# f3 = gen_tests('F3', 200, 300, 2)
# f4 = gen_tests('F4', 70, 80, 3)

# save_tests('f1', f1)
# save_tests('f2', f2)
# save_tests('f3', f3)
# save_tests('f4', f4)

# f1 = load_tests('f1')
# f2 = load_tests('f2')
# f3 = load_tests('f3')
# f4 = load_tests('f4')

# si = [gen_group(i, 2) for i in range(4, 6)] + [gen_group(i, 3) for i in range(4, 6)]
# save_tests('Si', si)
# si = load_tests('Si')

from algebra.universe.transformations import Transformation


cyc = [
    TestCase('acyclic\\_500', MonoidController([Transformation([1] + list(range(1, 500)))])),
    TestCase('acyclic\\_1000', MonoidController([Transformation([1] + list(range(1, 1000)))])),
    TestCase('cyclic\\_500', MonoidController([Transformation([500] + list(range(1, 500)))])),
    TestCase('cyclic\\_1000', MonoidController([Transformation([1000] + list(range(1, 1000)))])),
]

# tt = gen_tests2()
# save_tests('tt', tt)
tt = load_tests('tt')

### define configs


In [None]:
std_eco = NamedConfig("std_eco",
                      AlgoConfig(
                      ))

monoid_mul = NamedConfig("monoid_mul",
                         AlgoConfig(
                             second_chain_mult_type=MultipleType.monoid_multiply
                         ))

no_image = NamedConfig("no_image",
                       AlgoConfig(
                           cache_isom_images_set=False,
                           cache_isom_h_images_set=False,
                           second_chain_mult_type=MultipleType.monoid_multiply
                       ))

no_chain = NamedConfig("no_chain",
                       AlgoConfig(
                           cache_image_chains=False,
                           second_chain_mult_type=MultipleType.monoid_multiply
                       ))

all_eco_configs = [
    std_eco, monoid_mul, no_image, no_chain,

]

eco_vs_naive = [
    monoid_mul,
    NamedConfig("naive_mul",
                AlgoConfig(
                    use_eco_algo=False,
                    second_chain_mult_type=MultipleType.monoid_multiply,
                )),
    NamedConfig("naive_cay",
                AlgoConfig(
                    use_eco_algo=False,
                    second_chain_mult_type=MultipleType.graph_traverse,
                )),
]

eco_vs_naive2 = [
    monoid_mul,
   NamedConfig("naive_mul",
                AlgoConfig(
                    use_eco_algo=False,
                    second_chain_mult_type=MultipleType.monoid_multiply,
                )),
]

### run tests

In [None]:
def go(name: str, tests: list[TestCase], configs: list[NamedConfig]):
    res = run_tests(
        tests,
        configs,
        fs,
        1,
    )

    save_result(name, res)
    return res

In [None]:
# go('f1', f1, all_eco_configs)
# go('f2', f2, all_eco_configs)

In [None]:
# go('f3', f3, eco_vs_naive)
# go('f4', f4, eco_vs_naive)

In [None]:
# go('cyc', cyc, eco_vs_naive2)

In [None]:
# sizes = list(tt.keys())
# gns = list(range(2,5))

# df = pd.DataFrame(index=sizes, columns=gns)
# print(df)
# for s in sizes:
#     for gn in gns:
#         res = go('', tt[s][gn], [monoid_mul])
#         times = []
#         for v in res.values():
#             for v2 in v.values():
#                 times += v2['time']
#         print(times)
#         df.loc[str(s), gn] = sum(times)/len(times)
#         df.to_csv('res.csv')
#         print(df)
# df

### Generate result tables

#### functions

In [None]:
def results_to_table(res, tests, configs):
    test_names = [tc.name for tc in tests[:2]]
    config_names = [nc.name for nc in configs]
    stats = list(res[test_names[0]][config_names[0]].keys())
    cols = ['conf_name'] + stats

    table = pd.DataFrame(columns=cols)

    def sum_many_dicts(dicts):
        def add_dicts(d1, d2): return dict(
            [(k, d1[k]+d2[k]) for k in d1.keys()])
        return reduce(
            add_dicts, dicts[1:], dicts[0]
        )

    def mean(x):
        d = {}
        for k, v in x.items():
            d[k] = sum(v)/len(v)
        return d

    gbc = dict(
        (
            f'\\raw{{{config_name.replace("_", "\\_")}}}',
            mean(sum_many_dicts(
                list(map(lambda x: x[config_name], res.values()))))
        )
        for config_name in config_names
    )

    for k1 in gbc.keys():
        for k2, v in gbc[k1].items():
            match k2:
                case 'time':
                    gbc[k1][k2] = (
                        "{:.2f}".format(v)
                    )
                case _:
                    gbc[k1][k2] = (
                        str(int(v))
                    )

    table = pd.DataFrame.from_dict(gbc).transpose()

    print('\\hline\n'.join(table.to_latex().split('\n')[4:-3]) + '\\hline')
    return table

def results_to_table2(res, tests, configs):

    table = pd.DataFrame(columns = ['test', 'config', 'checks', 'mults', 'time'])
    def mean(x):
        d = {}
        for k, v in x.items():
            d[k] = sum(v)/len(v)
        return d
    
    for test_name, configs in res.items():
        for config_name, result in configs.items():
            row = [test_name, config_name] 
            for k2, v in result.items():
                v = sum(v)/len(v)
                match k2:
                    case 'time':
                        row.append(
                            "{:.2f}".format(v)
                        )
                    case _:
                        row.append(
                            str(int(v))
                        )
            table.loc[len(table)] = row


 

    print('\\hline\n'.join(table.to_latex(index=False).split('\n')[4:-3]) + '\\hline')
    return table

def results_to_table3(res, tests, configs):

    table = pd.DataFrame(columns = ['test', 'config', 'checks', 'mults', 'time'])
    def mean(x):
        d = {}
        for k, v in x.items():
            d[k] = sum(v)/len(v)
        return d
    
    for test_name, configs in res.items():
        for config_name, result in configs.items():
            row = [test_name, config_name] 
            for k2, v in result.items():
                v = sum(v)/len(v)
                match k2:
                    case 'time':
                        row.append(
                            "{:.2f}".format(v)
                        )
                    case _:
                        row.append(
                            str(int(v))
                        )
            table.loc[len(table)] = row


 

    print('\\hline\n'.join(table.to_latex(index=False).split('\n')[4:-3]) + '\\hline')
    return table

In [None]:
def describe_tests(TestCases: list[TestCase], name):
    table = pd.DataFrame(
        columns=['Семейство', 'порождающие элементы', 'кол-во элементов', 'кол-во $\\gh$--классов'])

    for i, test_case in enumerate(TestCases):
        g = military_algo(test_case.S)
        hclasses = search_Hclasses(g)
        hclasses_sizes = dict()
        for hclass in hclasses:
            s = len(hclass.elems)
            if s in hclasses_sizes:
                hclasses_sizes[s] += 1
            else:
                hclasses_sizes[s] = 1

        row = [
            f"$F_{i//5+1}$",
            '\\makecell{$' + str(test_case.S.generators).replace('), ',
                                                                 ')$, $').replace('[', '').replace(']', '') + '$}',
            len(g.nodes),
            len(hclasses)]
        table.loc[i] = row

    # table = table.sort_values("кол-во элементов")
    pp(table)
    res = table.to_latex(index=False)
    res = res.replace('\n', '\\hline \n')
    print(res)
    with open(f'../output/{name}_tests.tex', 'w') as f:
        f.write(res)

    return table

#### use

In [None]:
results_to_table2(load_result('cyc'), cyc, eco_vs_naive2)

In [None]:
describe_tests(load_tests('f3'), 'f3')