In [None]:
%run shared.ipynb

In [None]:
filter_noisy_events_log = pm4py.filter_event_attribute_values(
    log,
    "concept:name",
    {"subscribed", "unsubscribed", "referenced", "pinned", "unpinned"},
    retain=False,
)
filter_endpoints_and_noisy_events_log = filter_end_activities(
    filter_noisy_events_log, {"completed", "not_planned"}
)
filter_event_attributes_log = filter_event_attribute_values(
    log, "concept:name", {"created", "labeled", "closed"}, level="event"
)
# filter_trace_attributes_log = filter_trace_attribute_values(log, 'case:Label', {'React Core Team'})
filter_directly_follows_log = filter_directly_follows_relation(
    log, [("closed", "commented")], retain=True
)
filter_top_variants_log = filter_variants_top_k(log, 10)
filter_endpoints_and_noisy_events_and_top_variants_log = filter_variants_top_k(
    filter_endpoints_and_noisy_events_log, 10
)
filter_time_log = pm4py.filter_time_range(
    log, "2013-01-01 00:00:00", "2025-01-31 00:00:00", mode="traces_contained"
)

In [None]:
cell_log = log

case_durations = get_all_case_durations(cell_log)
median_case_duration = get_median_case_duration(cell_log)
print(f"Median case duration: {median_case_duration // 60 // 60 // 24} days")
single_case_duration = get_case_duration(
    cell_log, cell_log["case:concept:name"].iloc[0]
)

case_arrival_average = get_case_arrival_average(cell_log)
print(
    f"Average distance between the arrival of two consecutive cases: {case_arrival_average // 60 // 60} hours"
)

case_dispersion_ratio = get_case_dispersion_avg(cell_log)
print(
    f"Average distance between the finishing of two consecutive cases: {case_dispersion_ratio // 60 // 60} hours"
)

# TODO: Plot distribution to show how many events it takes to complete?
activity_position_summary = get_activity_position_summary(cell_log, "completed")

In [None]:
cell_log = log

view_events_distribution_graph(cell_log, distr_type="days_week")
view_events_distribution_graph(cell_log, distr_type="hours")
view_events_per_time_graph(cell_log)
view_case_duration_graph(cell_log)

if SAVE_VIS:
    save_vis_events_distribution_graph(
        cell_log, distr_type="days_week", file_path="events_over_days_of_week.png"
    )
    save_vis_events_distribution_graph(
        cell_log, distr_type="hours", file_path="events_over_hour_of_day.png"
    )
    save_vis_events_per_time_graph(cell_log, file_path="events_over_time.png")
    save_vis_case_duration_graph(cell_log, file_path="case_duration.png")

In [None]:
%matplotlib inline
cell_log = log

case_durations = cell_log.groupby("case:concept:name")["time:timestamp"].agg(
    ["min", "max"]
)
case_durations.columns = ["start_time", "end_time"]

case_durations = case_durations.sort_values(by="start_time")

event_points = pandas.DataFrame(
    {
        "timestamp": pandas.concat(
            [case_durations["start_time"], case_durations["end_time"]]
        ),
        "change": [1] * len(case_durations)
        + [-1] * len(case_durations),  # +1 for start, -1 for end
    }
)

event_points = event_points.sort_values(by="timestamp")
event_points["active_cases"] = event_points["change"].cumsum()


plt.figure(figsize=(10, 5))
plt.plot(
    event_points["timestamp"],
    event_points["active_cases"],
    marker="o",
    linestyle="-",
    color="blue",
)
plt.xlabel("Time")
plt.ylabel("Active Cases")
plt.title("Active Cases Over Time")
plt.grid()
plt.savefig("active_cases_over_time.png")
plt.show()


plt.figure(figsize=(10, 5))
plt.hist(log["time:timestamp"], bins=200, color="blue", alpha=0.7, edgecolor="black")
plt.xlabel("Time")
plt.ylabel("Number of Events")
plt.title("Histogram of Events Over Time")
plt.savefig("histogram_events_over_time.png")
plt.grid()

# Show the plot
plt.show()

In [None]:
cell_log = log

view_dotted_chart(cell_log, show_legend=False)

if SAVE_VIS:
    save_vis_dotted_chart(
        cell_log, show_legend=False, file_path="dotted_line_chart.png"
    )

In [None]:
cell_log = filter_endpoints_and_noisy_events_and_top_variants_log

noise_threshold = 0
petri_net, initial_marking, final_marking = discover_petri_net_inductive(
    cell_log, noise_threshold=noise_threshold
)
view_petri_net(petri_net, initial_marking, final_marking)
fitness = fitness_token_based_replay(log, petri_net, initial_marking, final_marking)
print("Fitness check: ")
print(json.dumps(fitness, indent=4))
soundness = check_soundness(petri_net, initial_marking, final_marking)

gviz_frequency = petri_net_visualizer.apply(
    petri_net,
    initial_marking,
    final_marking,
    variant=petri_net_visualizer.Variants.FREQUENCY,
    log=cell_log,
)
petri_net_visualizer.view(gviz_frequency)

gviz_performance = petri_net_visualizer.apply(
    petri_net,
    initial_marking,
    final_marking,
    variant=petri_net_visualizer.Variants.PERFORMANCE,
    log=log,
)
petri_net_visualizer.view(gviz_performance)


if SAVE_VIS:
    save_vis_petri_net(
        petri_net, initial_marking, final_marking, file_path="petri_net.png"
    )
    petri_net_visualizer.save(
        gviz_frequency, output_file_path="petri_net_frequency.png"
    )
    petri_net_visualizer.save(
        gviz_performance, output_file_path="petri_net_performance.png"
    )

In [None]:
cell_log = filter_endpoints_and_noisy_events_and_top_variants_log

noise_threshold = 0
bpmn_diagram = discover_bpmn_inductive(cell_log, noise_threshold=noise_threshold)
view_bpmn(bpmn_diagram)

if SAVE_VIS:
    save_vis_bpmn(bpmn_diagram, file_path="bpmn.png")

In [None]:
cell_log = filter_endpoints_and_noisy_events_and_top_variants_log

# Discover the frequency DFG using activites and paths to filter
activities = get_event_attribute_values(cell_log, "concept:name")
frequency_dfg, start_activities, end_activities = discover_dfg(cell_log)
activities_perc = 0.99
paths_perc = 0.99
max_num_edges = 100
frequency_dfg, start_activities, end_activities, activities = (
    filter_dfg_on_activities_percentage(
        frequency_dfg, start_activities, end_activities, activities, activities_perc
    )
)
frequency_dfg, start_activities, end_activities, activities = (
    filter_dfg_on_paths_percentage(
        frequency_dfg, start_activities, end_activities, activities, paths_perc
    )
)
view_dfg(
    frequency_dfg,
    start_activities,
    end_activities,
    max_num_edges=max_num_edges,
    rankdir="LR",
)

# Discover the performance DFG (does not support activites and paths filtering)
performance_dfg, start_activities, end_activities = discover_performance_dfg(cell_log)

# Uuse the frequency DFG to filter the performance DFG
removal_list = []
for edge in performance_dfg:
    if edge not in frequency_dfg:
        removal_list.append(edge)

for edge in removal_list:
    if edge in performance_dfg:
        del performance_dfg[edge]

view_performance_dfg(
    performance_dfg,
    start_activities,
    end_activities,
    aggregation_measure="sum",
    rankdir="LR",
)
view_performance_dfg(
    performance_dfg,
    start_activities,
    end_activities,
    aggregation_measure="median",
    rankdir="LR",
)

# dfg_time = clean_dfg_time.apply(cell_log)
# gviz = timeline_gviz_generator.apply(frequency_dfg, dfg_time, parameters={'start_activities': start_activities, "end_activities": end_activities})
# dfg_visualizer.view(gviz)

if SAVE_VIS:
    save_vis_dfg(
        frequency_dfg,
        start_activities,
        end_activities,
        max_num_edges=max_num_edges,
        rankdir="TB",
        file_path="frequency_dfg.png",
    )
    save_vis_performance_dfg(
        performance_dfg,
        start_activities,
        end_activities,
        aggregation_measure="sum",
        rankdir="TB",
        file_path="performance_dfg_sum.png",
    )
    save_vis_performance_dfg(
        performance_dfg,
        start_activities,
        end_activities,
        aggregation_measure="median",
        rankdir="TB",
        file_path="performance_dfg_median.png",
    )

In [None]:
cell_log = filter_endpoints_and_noisy_events_log

# Set up properties (this is similar to what your get_all_case_durations function does)
properties = {
    "business_hours": False,
    # include other properties if needed
}

# Get the cases description. This returns a dictionary where each key is a case ID and each value is a dictionary
# containing keys like 'startTime', 'endTime', 'caseDuration', etc.
cases_description = case_statistics.get_cases_description(
    cell_log, parameters=properties
)

# Prepare lists for plotting
start_times = []
durations = []

for case_id, desc in cases_description.items():
    # Make sure the description contains the needed fields
    if "startTime" in desc and "caseDuration" in desc:
        start_times.append(desc["startTime"])
        durations.append(desc["caseDuration"] // 60 // 60 // 24)  # typically in seconds

# Plot the durations over time
plt.figure(figsize=(10, 6))
plt.scatter(start_times, durations, alpha=0.6)
plt.xlabel("Case Start Time")
plt.ylabel("Case Duration (days)")
plt.title("Case Durations Over Time")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()