In [9]:
from collections import defaultdict

import streamlit as st
import pickle as pkl

import torch
from sklearn.model_selection import train_test_split

from denoisers.ConditionalUnetDenoiser import ConditionalUnetDenoiser
from denoisers.ConditionalUnetMatrixDenoiser import ConditionalUnetMatrixDenoiser
from utils.graph_utils import get_process_model_reachability_graph_transition_matrix
from utils.pm_utils import discover_dk_process, remove_duplicates_dataset, pad_to_multiple_of_n
from utils.Config import Config
import plotly.express as px
import plotly.graph_objects as go
from dataset.dataset import SaladsDataset
from ddpm.ddpm_multinomial import Diffusion
from utils.initialization import initialize
import os
import argparse
import json
from streamlit.components.v1 import html
from pm4py.visualization.petri_net import visualizer as pn_visualizer
from pm4py.visualization.bpmn import visualizer as bpmn_visualizer
from pm4py import convert_to_bpmn
from torch.utils.data import DataLoader
from tqdm import tqdm

In [10]:
def load_experiment_config(target_dir):
    config_path = os.path.join(target_dir, "cfg.json")
    if os.path.exists(config_path):
        with open(config_path, "r") as f:
            return Config(**json.load(f))
    else:
        st.warning("Configuration file not found.")
        return None


def load_experiment_data_and_model(target_dir, cfg):
    with open(cfg.data_path, "rb") as f:
        base_dataset = pkl.load(f)
    dataset = SaladsDataset(base_dataset['target'], base_dataset['stochastic'])
    train_dataset, test_dataset = train_test_split(dataset, train_size=cfg.train_percent, shuffle=True,
                                                   random_state=cfg.seed)
    dk_process_model, dk_init_marking, dk_final_marking = discover_dk_process(train_dataset, cfg,
                                                                              preprocess=remove_duplicates_dataset)
    diffuser = Diffusion(noise_steps=cfg.num_timesteps, device=cfg.device)
    if cfg.enable_matrix:
        rg_nx, rg_transition_matrix = get_process_model_reachability_graph_transition_matrix(dk_process_model,
                                                                                             dk_init_marking)
        rg_transition_matrix = torch.tensor(rg_transition_matrix, device=cfg.device).float()
        rg_transition_matrix = pad_to_multiple_of_n(rg_transition_matrix)
        denoiser = ConditionalUnetMatrixDenoiser(in_ch=cfg.num_classes, out_ch=cfg.num_classes,
                                                 max_input_dim=dataset.sequence_length,
                                                 transition_dim=rg_transition_matrix.shape[-1],
                                                 device=cfg.device).to(cfg.device).float()
    else:
        rg_transition_matrix = torch.randn((cfg.num_classes, 2, 2)).to(cfg.device)
        denoiser = ConditionalUnetDenoiser(in_ch=cfg.num_classes, out_ch=cfg.num_classes,
                                           max_input_dim=dataset.sequence_length,
                                           device=cfg.device).to(cfg.device).float()
    ckpt_path = os.path.join(target_dir, "best.ckpt")
    denoiser.load_state_dict(torch.load(ckpt_path, map_location=cfg.device)['model_state'])
    final_res_path = os.path.join(target_dir, "final_results.json")
    if os.path.exists(final_res_path):
        with open(final_res_path, "r") as f:
            final_res = json.load(f)
    else:
        st.warning("Final results not found.")

    return (train_dataset, test_dataset, dk_process_model, dk_init_marking, dk_final_marking, rg_transition_matrix,
            diffuser, denoiser, final_res)

In [11]:
target_dir = r"D:\Projects\trace-denoise\pretty_results\50_salads_matrix_no_dropout_new"
cfg = load_experiment_config(target_dir)
cfg.device = "cuda:0"
train_dataset, test_dataset, dk_process_model, dk_init_marking, dk_final_marking, rg_transition_matrix, diffuser, denoiser, final_res = load_experiment_data_and_model(target_dir, cfg)
with open(os.path.join(target_dir, "trace_kinds_train.pkl"), "rb") as f:
    x_train, y_train, x_hat_train = pkl.load(f)
with open(os.path.join(target_dir, "trace_kinds_test.pkl"), "rb") as f:
    x_test, y_test, x_hat_test = pkl.load(f)

In [13]:
y_test_sto = [xi[1].cpu() for xi in test_dataset]

In [18]:
import numpy as np


def visualize_stochastic_trace(y_argmax, y_sto, x, x_hat):
    fig = go.Figure()
    fig.add_trace(go.Heatmap(
        z=y_sto.T,  # Transpose to align categories on y-axis
        x=np.arange(denoiser.max_input_dim),
        y=np.arange(0, cfg.num_classes),
        colorscale='Blues',  # Adjust color scale for probability representation
        colorbar=dict(title="Probability"),
        name='Distribution'
    ))
    fig.add_trace(go.Scatter(x=list(range(len(y_argmax))), y=y_argmax, mode='lines', name='Argmax',
                             line=dict(color='blue', dash='solid')))
    fig.add_trace(go.Scatter(x=list(range(len(x))), y=x, mode='lines', name='Original',
                             line=dict(color='red', dash='solid')))
    fig.add_trace(go.Scatter(x=list(range(len(x_hat))), y=x_hat, mode='lines', name='Reconstructed',
                             line=dict(color='green', dash='dot')))
    return fig

visualize_stochastic_trace(y_test[0].cpu(), y_test_sto[0], x_test[0].cpu(), x_hat_test[0].cpu()).show()

In [19]:
y_test_sto[0]

tensor([[0.0014, 0.0022, 0.0021,  ..., 0.9590, 0.0014, 0.0000],
        [0.0014, 0.0021, 0.0020,  ..., 0.9620, 0.0013, 0.0000],
        [0.0012, 0.0019, 0.0018,  ..., 0.9663, 0.0012, 0.0000],
        ...,
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 1.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 1.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 1.0000]])