# Process Mining on OPC UA Network Traffic 

This notebook presents a detailed exploration of process mining techniques applied to OPC UA (Open Platform Communications Unified Architecture) network traffic. OPC UA is a machine-to-machine communication protocol for industrial automation, and this analysis aims to uncover hidden patterns, and process flows within the network traffic data.


## 1. Dependencies and Imports

Before we begin the analysis, we need to install and import necessary libraries and modules. The following cells handle this setup.


In [None]:
!pip install pm4py
!pip install graphviz
!pip install pydot

### Python Libraries

In [None]:
# Import necessary libraries
import json
import datetime
import psutil
import time
import csv
import matplotlib.pyplot as plt
import pm4py
import pandas as pd
import re
import networkx as nx
import numpy as np
from numpy.polynomial.polynomial import Polynomial
from pm4py.util import constants
from IPython.display import display, Image
from pm4py.algo.filtering.dfg import dfg_filtering
from pm4py.visualization.dfg import visualizer as dfg_visualizer
from pm4py.algo.evaluation.generalization import algorithm as generalization_evaluator
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_evaluator
from pm4py.algo.conformance.tokenreplay.diagnostics import root_cause_analysis
from pm4py.visualization.decisiontree import visualizer as dt_vis
from pm4py.algo.transformation.log_to_features import algorithm as log_to_features
from pm4py.algo.conformance.alignments.petri_net import algorithm as alignments
from pm4py.algo.conformance import alignments as conformance_alignments
from pm4py.algo.discovery.log_skeleton import algorithm as lsk_discovery
from pm4py.algo.conformance.log_skeleton import algorithm as lsk_conformance
from pm4py.algo.conformance.tokenreplay.diagnostics import duration_diagnostics
from pm4py.algo.organizational_mining.sna import util
from pm4py.algo.conformance.tokenreplay import algorithm as token_based_replay
from pm4py.statistics.traces.generic.log import case_arrival
from pm4py.algo.organizational_mining.local_diagnostics import algorithm as local_diagnostics
from pm4py.objects.conversion.process_tree import converter as pt_converter
from pm4py.algo.discovery.temporal_profile import algorithm as temporal_profile_discovery
from pm4py.algo.conformance.temporal_profile import algorithm as temporal_profile_conformance
from pm4py.visualization.petri_net import visualizer as pn_visualizer
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest
from pm4py.statistics.traces.generic.log import case_statistics
from pm4py.visualization.graphs import visualizer as graphs_visualizer

# Define the input file path
input_file = './opcua.json'


## 2. Understanding the Network Traffic Data

In [None]:
# Load the provided JSON data
with open(input_file, "r") as f:
    opcua_json = json.load(f)

# Extract source and destination IP addresses and ports
comm_pairs = []

for entry in data:
    layers = entry.get('_source', {}).get('layers', {})
    
    # Extract IP and port information
    src_ip = layers.get('ip', {}).get('ip.src')
    dst_ip = layers.get('ip', {}).get('ip.dst')
    src_port = layers.get('tcp', {}).get('tcp.srcport')
    dst_port = layers.get('tcp', {}).get('tcp.dstport')
    
    if src_ip and dst_ip and src_port and dst_port:
        comm_pairs.append((src_ip + ':' + src_port, dst_ip + ':' + dst_port))

# Create a new graph for the modified hierarchy
G_modified = nx.Graph()

# Add edges from port (IP:port) to its associated IP
for src, dst in comm_pairs:
    src_ip, src_port = src.split(':')
    dst_ip, dst_port = dst.split(':')
    
    # Add edge from port (IP:port) to its IP
    G_modified.add_edge(src, src_ip)
    G_modified.add_edge(dst, dst_ip)
    
    # Add edge between source port and destination port
    G_modified.add_edge(src, dst)

    
# Assign colors to nodes
modified_node_colors = []
for node in G_modified.nodes():
    if ':' in node:  # This means it's a port (IP:port combination)
        modified_node_colors.append('#b2b2b2')  # Color for ports
    else:
        modified_node_colors.append('#44AA99')  # Color for IP addresses

modified_node_sizes = [2500 if ':' in node else 8000 for node in G_modified.nodes()]

# Define labels for the nodes
labels = {}
for node in G_modified.nodes():
    if ':' in node:  # This means it's a port (IP:port combination)
        labels[node] = node.split(':')[1]  # Only keep the port number
    else:
        labels[node] = node  # IP address

# Convert the graph to a directed graph
G_directed = nx.DiGraph(G_modified)

# Calculate the frequency of communications between ports
edge_freq = {}
for src, dst in comm_pairs:
    if (src, dst) in edge_freq:
        edge_freq[(src, dst)] += 1
    else:
        edge_freq[(src, dst)] = 1

# Remove directed edges between port (IP:port) and its IP address
for src, dst in comm_pairs:
    src_ip, src_port = src.split(':')
    dst_ip, dst_port = dst.split(':')
    
    # Remove directed edge from port (IP:port) to its IP
    if G_directed.has_edge(src, src_ip):
        G_directed.remove_edge(src, src_ip)
    if G_directed.has_edge(dst, dst_ip):
        G_directed.remove_edge(dst, dst_ip)

# Draw the modified directed network graph with frequency edge labels
plt.figure(figsize=(19, 8))
pos_modified = nx.spring_layout(G_modified, k=0.5, iterations=1000)  # Positioning of nodes
nx.draw(
    G_directed,
    pos_modified,
    labels=labels,
    node_size=modified_node_sizes,
    node_color=modified_node_colors,
    font_size=10,
    width=0.5,
    edge_color="black",
    font_color='whitesmoke',
    arrowsize=20,
    connectionstyle="arc3,rad=0.1"
)
nx.draw_networkx_edge_labels(G_directed, pos_modified, edge_labels=edge_freq, font_size=9)

plt.savefig("network_graph.pdf", bbox_inches="tight")
plt.show()

In [None]:
# Calculate key metrics
num_nodes = G_directed.number_of_nodes()
num_edges = G_directed.number_of_edges()
density = nx.density(G_directed)
degree_centrality = nx.degree_centrality(G_directed)

# Identify the top 5 nodes with the highest degree centrality
top_degree_centrality = sorted(degree_centrality.items(), key=lambda x: x[1], reverse=True)[:5]

# Display the calculated metrics
num_nodes, num_edges, density, top_degree_centrality



## 3. OPCUA Packet Analyzer Definition

The `OPCUAPacketAnalyzer` class is responsible for analyzing OPC UA packets. It extracts relevant information from the packets and prepares the data for further processing.


In [None]:
class OPCUAPacketAnalyzer:
    def __init__(self, input_file, num_packets):
        """
        Initialize the OPCUAPacketAnalyzer with the input file and the number of packets to analyze.

        Args:
            input_file (str): The path to the input JSON file.
            num_packets (int): The number of packets to analyze.
        """
        self.input_file = input_file
        self.opc_packets = self.load_data()[:num_packets]
        
    def load_data(self):
        """
        Load data from the JSON file.

        Returns:
            list: List of JSON data records.
        """
        with open(self.input_file, 'r') as file:
            return json.load(file)

    def extract_tcp_data(self, packet):
        """
        Extract TCP data from a packet.

        Args:
            packet (dict): A packet dictionary.

        Returns:
            list: List of extracted data.
        """
        return self.extract_data(packet, 'tcp', ['tcp.srcport', 'tcp.dstport'])

    def extract_ip_data(self, packet):
        """
        Extract IP data from a packet.

        Args:
            packet (dict): A packet dictionary.

        Returns:
            list: List of extracted data.
        """
        return self.extract_data(packet, 'ip', ['ip.src', 'ip.dst'])

    def extract_eth_data(self, packet):
        """
        Extract Ethernet data from a packet.

        Args:
            packet (dict): A packet dictionary.

        Returns:
            list: List of extracted data.
        """
        return self.extract_data(packet, 'eth', ['eth.src', 'eth.dst'])
            
    def extract_data(self, packet, layer, fields):
        """
        Extract data from a packet for a specified layer and fields.

        Args:
            packet (dict): A packet dictionary.
            layer (str): The layer to extract data from.
            fields (list): List of field names to extract.

        Returns:
            list: List of extracted data.
        """
        timestamp_str = packet["_source"]["layers"]["frame"]["frame.time"]
        timestamp = datetime.datetime.strptime(timestamp_str, "%b %d, %Y %H:%M:%S.%f000 %Z")
        return [timestamp] + [packet["_source"]["layers"][layer].get(field, '') for field in fields]

    def write_csv(self, filename, rows):
        """
        Write data to a CSV file.

        Args:
            filename (str): The name of the CSV file to write.
            rows (list): List of rows to write to the CSV file.
        """
        with open(filename, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerows(rows)

    def analyze_packets(self):
        """
        Analyze OPC UA packets and write the results to CSV files.
        """
        matching_ip_ids = self.match_request_handles()
        transport_layer_log, network_layer_log, eth_layer_log, app_layer_log = [], [], [], []
        transport_layer_log.append(["time:timestamp", "case:concept:name", "concept:name"])
        network_layer_log.append(["time:timestamp", "case:concept:name", "concept:name"])
        eth_layer_log.append(["time:timestamp", "case:concept:name", "concept:name"])
        app_layer_log.append(["time:timestamp", "case:concept:name", "org:resource", "concept:name"])

        for packet in self.opc_packets:
            eth_layer_log.append(self.extract_eth_data(packet))
            network_layer_log.append(self.extract_ip_data(packet))
            transport_layer_log.append(self.extract_tcp_data(packet))
        
        matched_array = self.match_request_handles()
        tuples = self.add_case_id(matched_array)
        app_layer_log = self.transform_tuples(app_layer_log, tuples)
        
        self.write_csv('link_layer.csv', eth_layer_log)
        self.write_csv('network_layer.csv', network_layer_log)
        self.write_csv('transport_layer.csv', transport_layer_log)
        self.write_csv('app_layer.csv', app_layer_log)

    def match_request_handles(self):
        """
        Match request handles in OPC UA packets and create matched arrays.

        Returns:
            list: List of matched arrays.
        """
        request_array, response_array, matched_array = [], [], []

        for packet in self.opc_packets:
            if packet["_source"]["layers"]["opcua"]["opcua.transport.type"] == "MSG":
                encodable_object = packet["_source"]["layers"]["opcua"]["OpcUa Service : Encodeable Object"]
                connection_type, header = self.find_connection_type(encodable_object)
                if header:
                    request_handle = packet["_source"]["layers"]["opcua"]["OpcUa Service : Encodeable Object"][connection_type][f'{header}: {header}']["opcua.RequestHandle"]

                    # Extracting additional details
                    ip_src = packet["_source"]["layers"]["ip"]["ip.src"]
                    ip_dst = packet["_source"]["layers"]["ip"]["ip.dst"]
                    tcp_srcport = packet["_source"]["layers"]["tcp"]["tcp.srcport"]
                    tcp_dstport = packet["_source"]["layers"]["tcp"]["tcp.dstport"]
                    timestamp_str = packet["_source"]["layers"]["frame"]["frame.time"]
                    timestamp = datetime.datetime.strptime(timestamp_str, "%b %d, %Y %H:%M:%S.%f000 %Z")
                    nodes = self.get_nodes(encodable_object, connection_type)
                    
                    (request_array if header == "RequestHeader" else response_array).append([request_handle, ip_src, tcp_srcport, ip_dst, tcp_dstport, timestamp, connection_type, nodes])

        matched_array = [(x[5], f"{x[1]}:{x[2]}", f"{x[3]}:{x[4]}", x[6], [{k: v for k, v in zip(x[7], y[7])}]) for x in request_array for y in response_array if x[0] == y[0]]
        matched_array = sorted(matched_array, key=lambda x: x[0])
        return matched_array
    
    def find_connection_type(self, encodable_object):
        """
        Find the connection type and header in the encodable object.

        Args:
            encodable_object (dict): The encodable object dictionary.

        Returns:
            tuple: A tuple containing the connection type and header.
        """
        key_list = ["ReadRequest", "ReadResponse", "WriteRequest", "WriteResponse", "SubscribeRequest", "SubscribeResponse", "PublishRequest", "PublishResponse"]
        for key in key_list:
            if key in encodable_object.keys():
                header = "RequestHeader" if 'Request' in key else "ResponseHeader"
                return key, header
        return None, None
    
    def get_nodes(self, encodable_object, connection_type):
        """
        Extract node information based on the connection type.

        Args:
            encodable_object (dict): The encodable object dictionary.
            connection_type (str): The connection type.

        Returns:
            list: List of extracted node information.
        """
        nodes = []
        if connection_type == "ReadRequest" and "NodesToRead: Array of ReadValueId" in encodable_object[connection_type]:
            array_size = int(encodable_object[connection_type]["NodesToRead: Array of ReadValueId"]["opcua.variant.ArraySize"])
            for i in range(array_size):
                obj = encodable_object[connection_type]["NodesToRead: Array of ReadValueId"][f"[{i}]: ReadValueId"]["NodeId: NodeId"]
                node = obj["opcua.nodeid.string"] if "opcua.nodeid.string" in obj else obj["opcua.nodeid.numeric"]
                node = ".".join(node.split(".")[-2:]) if "." in node else node
                nodes.append(node)
        elif connection_type == "ReadResponse" and "Results: Array of DataValue" in encodable_object[connection_type]:
            array_size = int(encodable_object[connection_type]["Results: Array of DataValue"]["opcua.variant.ArraySize"])
            for i in range(array_size):
                obj = encodable_object[connection_type]["Results: Array of DataValue"][f"[{i}]: DataValue"]
                key = obj["Value: Variant"][list(obj["Value: Variant"].keys())[1]] if "Value: Variant" in obj else None
                nodes.append(key)
        elif connection_type == "WriteRequest" and "NodesToWrite: Array of WriteValue" in encodable_object[connection_type]:
            array_size = int(encodable_object[connection_type]["NodesToWrite: Array of WriteValue"]["opcua.variant.ArraySize"])
            for i in range(array_size):
                obj = encodable_object[connection_type]["NodesToWrite: Array of WriteValue"][f"[{i}]: WriteValue"]["NodeId: NodeId"]
                node = obj["opcua.nodeid.string"] if "opcua.nodeid.string" in obj else obj["opcua.nodeid.numeric"]
                node = ".".join(node.split(".")[-2:]) if "." in node else node
                nodes.append(node)
        elif connection_type == "WriteResponse" and "Results: Array of StatusCode" in encodable_object[connection_type]:
            array_size = int(encodable_object[connection_type]["Results: Array of StatusCode"]["opcua.variant.ArraySize"])
            for _ in range(array_size):
                nodes.append(encodable_object[connection_type]["Results: Array of StatusCode"]["opcua.Results"])
        return nodes

    def add_case_id(self, matched_array):
        """
        Add case IDs to the matched array.

        Args:
            matched_array (list): List of matched arrays.

        Returns:
            list: List of matched arrays with added case IDs.
        """
        ip_to_case_id = {}
        new_matched_array = []

        for tup in matched_array:
            ip = tup[2].split(":")[0]
            dictionaries = tup[4]

            # Check if any dictionary contains a key with "CAN_PRODUCE_ITEM_ID"
            case_id = None
            for dictionary in dictionaries:
                if "CanProduce.CAN_PRODUCE_ITEM_ID" in dictionary.keys():
                    case_id = dictionary.get("CanProduce.CAN_PRODUCE_ITEM_ID")
                    ip_to_case_id[ip] = case_id  # Store the case id for this IP
                    break

            # If we didn't find a "CAN_PRODUCE_ITEM_ID", use the previously stored case id for this IP
            if case_id is None:
                case_id = ip_to_case_id.get(ip, None)

            # Create a new tuple with the added case id
            new_tup = (*tup, case_id)
            new_matched_array.append(new_tup)

        return new_matched_array

    def transform_tuples(self, app_layer_log, tuples_list):
        """
        Transform tuples and add them to the app layer log.

        Args:
            app_layer_log (list): List of app layer log entries.
            tuples_list (list): List of tuples to transform and add.

        Returns:
            list: Updated app layer log.
        """
        for tup in tuples_list:
            # Extract relevant information
            timestamp = tup[0]
            case_id = tup[5]
            ip_address = tup[2].split(":")[0]
            request_type = tup[3]
            request_keys = [str(k) for k in tup[4][0].keys()]

            # Form the new list with the extracted information
            new_list = [timestamp, case_id, ip_address, request_type + " (" + ", \n".join(request_keys) + ")"]
            app_layer_log.append(new_list)

        return app_layer_log

    def check_format(self, string, pattern):
        """
        Check if a string matches a specified pattern.

        Args:
            string (str): The string to check.
            pattern (str): The pattern to match.

        Returns:
            bool: True if the string matches the pattern, False otherwise.
        """
        return re.match(pattern, string) is not None

    def run(self):
        """
        Entry point to run the packet analysis.
        """
        start_time = time.time()  # Start the timer
        
        self.analyze_packets()
        
        end_time = time.time()  # End the timer
        cpu_usage = psutil.cpu_percent()
        ram_usage = psutil.virtual_memory().percent
        print(f"Time taken for packet analysis with {len(self.opc_packets)} packets: {end_time - start_time:.2f} seconds")
        print(f"CPU usage: {cpu_usage}%")
        print(f"RAM usage: {ram_usage}%")



## 4. Running the Packet Analyzer

With the `OPCUAPacketAnalyzer` class defined, we can now instantiate it and run the analysis on our input data.


In [None]:
# Create an instance of OPCUAPacketAnalyzer
analyzer = OPCUAPacketAnalyzer(input_file, len(json.load(open(input_file))))

# Run the packet analysis
analyzer.run()



## 5. Data Transformation and Process Visualization

Once the packets are analyzed, we transform the data into a format suitable for process mining. We then visualize the directly-follows graph for the event log.


In [None]:
# Read data from the CSV file into a DataFrame
dataframe = pd.read_csv("app_layer.csv", sep=",")

# Format the DataFrame for PM4Py
dataframe = pm4py.format_dataframe(dataframe, case_id="case:concept:name", activity_key="concept:name",
                                   timestamp_key="time:timestamp")

# Convert the formatted DataFrame to an event log
event_log = pm4py.convert_to_event_log(dataframe)


### Directly-Follows-Graph

In [None]:
# Discover the Directly Follows Graph from the event log
_df, _sa, _ea = pm4py.discover_directly_follows_graph(event_log)

# Save the Directly Follows Graph as an SVG file
pm4py.save_vis_dfg(_df, _sa, _ea, 'perf_dfg.svg')

# View the Directly Follows Graph as a PNG image
pm4py.view_dfg(_df, _sa, _ea, format="png")


### Inductive Miner

In [None]:
# Discover a Petri net using the inductive miner algorithm with a noise threshold
im_net, im_im, im_fm = pm4py.discover_petri_net_inductive(event_log, noise_threshold=0.9)

# Create a visualization of the Petri net
gviz = pn_visualizer.apply(im_net, im_im, im_fm, log=event_log)

# View the Petri net visualization
pm4py.visualization.petri_net.visualizer.view(gviz)


### Heuristic Miner

In [None]:
# Discover a Petri net using the heuristics miner algorithm with a dependency threshold
hm_net, hm_im, hm_fm = pm4py.discover_petri_net_heuristics(event_log, dependency_threshold=0.9)

# Generate a visualization of the Petri net with frequency-based variant and PNG format
viz_variant = pm4py.visualization.petri_net.visualizer.Variants.FREQUENCY
viz_parameters = {"format": "png"}
gviz = pm4py.visualization.petri_net.visualizer.apply(hm_net, hm_im, hm_fm, variant=viz_variant, parameters=viz_parameters)

# View the Petri net visualization
pm4py.visualization.petri_net.visualizer.view(gviz)


### Alpha Miner

In [None]:
# Discover a Petri net using the alpha miner algorithm
am_net, am_im, am_fm = pm4py.discover_petri_net_alpha(event_log)

# Define visualization parameters
parameters = {pn_visualizer.Variants.FREQUENCY.value.Parameters.FORMAT: "png"}

# Generate the visualization of the Petri net using the alpha miner variant and specified parameters
gviz = pn_visualizer.apply(am_net, am_im, am_fm, parameters=parameters, variant=pn_visualizer.Variants.FREQUENCY, log=event_log)

# Save the Petri net visualization as a PDF file
pn_visualizer.save(gviz, "alpha-miner.pdf")

# View the Petri net visualization
pm4py.visualization.petri_net.visualizer.view(gviz)


### BPMN 2.0

In [None]:
# Discover a BPMN model using the inductive miner algorithm with a noise threshold
bpmn_model = pm4py.discover_bpmn_inductive(event_log, noise_threshold=0.98)

# Write the BPMN model to a BPMN file
pm4py.write_bpmn(bpmn_model, "model.bpmn")

# View the BPMN model
pm4py.view_bpmn(bpmn_model)


### Role Mining

In [None]:
# Discover organizational roles from the event log
roles = pm4py.discover_organizational_roles(event_log)

# Print the discovered organizational roles
roles


 ## 6. Performance Evaluation
 
We first run the `OPCUAPacketAnalyzer` with varying packet loads to measure the performance in terms of elapsed time, CPU usage, and RAM usage. Then, we pPlot the performance results to visually represent how the system performs as the packet load increases.

### Run OPCUAPacketAnalyzer with different packet loads

In [None]:
# Initialize a list to store performance results
performance_results = []

# Get the total number of packets from the input file
total_packets = len(json.load(open(input_file)))

# Define the increment for the number of packets
increment = 1000

# Iterate over packet counts from 'increment' to 'total_packets' with steps of 'increment'
for i in range(increment, total_packets + increment, increment):
    # Create an instance of OPCUAPacketAnalyzer with a specific number of packets
    analyzer = OPCUAPacketAnalyzer(input_file, num_packets=i)
    
    # Start the timer
    start_time = time.time()
    
    # Run the packet analysis
    analyzer.run()
    
    # End the timer
    end_time = time.time()
    
    # Measure CPU and RAM usage
    cpu_usage = psutil.cpu_percent()
    ram_usage = psutil.virtual_memory().percent
    
    # Calculate elapsed time
    elapsed_time = end_time - start_time
    
    # Append performance metrics to the results list
    performance_results.append([i, elapsed_time, cpu_usage, ram_usage])


### Plot performance results

In [None]:
# Create a DataFrame from performance_results
df = pd.DataFrame(performance_results, columns=['Packets', 'Time', 'CPU Usage', 'RAM Usage'])

# Plotting Time
packets_array = df['Packets'].values
time_taken = df['Time'].values

# Fit the polynomial regression again
coeffs_time = np.polyfit(packets_array, time_taken, 2)
p_time = Polynomial(coeffs_time[::-1])

# Create a figure and axis
fig, ax1 = plt.figure(figsize=(16, 8)), plt.gca()

# Time vs Packets plot
ax1.scatter(df['Packets'], df['Time'], color='black', label='Actual Time Data', s=60)
ax1.plot(df['Packets'], p_time(packets_array), color='black', label='Polynomial Regression of Time', linewidth=2)
ax1.set_xlabel('Number of Packets', fontsize=14, weight='bold')
ax1.set_ylabel('Time Taken (seconds)', fontsize=14, weight='bold')
ax1.tick_params(axis='y', labelsize=12)
ax1.tick_params(axis='x', labelsize=12)
ax1.grid(True, which='both', linestyle='--', linewidth=0.5)

# Create a second y-axis for CPU and RAM Usage
ax2 = ax1.twinx()
ax2.plot(df['Packets'], df['CPU Usage'], 'g-.', marker='o', label='CPU Usage (%)', linewidth=2, markersize=8)
ax2.plot(df['Packets'], df['RAM Usage'], 'm-.', marker='x', label='RAM Usage (%)', color='purple', linewidth=2, markersize=8)
ax2.set_ylabel('Usage (%)', fontsize=14, weight='bold')
ax2.set_ylim(0, 100)
ax2.tick_params(axis='y', labelsize=14)

# Adjusting legend to fit all descriptions
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax2.legend(lines + lines2, labels + labels2, loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=4, frameon=False, fontsize=14)

# Save the figure as a PDF file
fig.savefig("opcua_performance_analysis.pdf", bbox_inches='tight')

# Display the plot
plt.tight_layout()
plt.show()


### Print the polynomial regression coefficients

In [None]:
# Refactor and pretty print polynomial coefficients
for i, coeff in enumerate(coeffs_time):
    print(f"coeffs_time[{i}]: {coeff}")


### Exemplify the duration for 1,000,000 OPC UA packets

In [None]:
def polynomial_regression_formula(x, coeffs):
    """
    Given x (number of packets), compute y (time) using the polynomial regression formula.

    Parameters:
        x (float): The number of packets.
        coeffs (tuple): Coefficients (a, b, c) of the polynomial regression model.

    Returns:
        float: Predicted time.
    """
    a, b, c = coeffs
    return a * x**2 + b * x + c

# Example usage: Predict the time required for 1,000,000 packets
predicted_time = polynomial_regression_formula(1000000, coeffs_time)
print(f"Predicted time for 1,000,000 packets: {predicted_time} seconds")


## 7. Quality Evaluation

Next, we define and use functions to evaluate and visualize the quality of a Petri net. The `evaluate_petri_net`function assesses the Petri net's fitness, precision, generalization, and simplicity using various mining techniques (inductive, heuristic, or alpha) and thresholds. Last, we visualize these results with line charts.

### Define quality functions for different process discovery methods

In [None]:
def evaluate_petri_net(miner, log, threshold):
    """
    Evaluate the quality criteria of a Petri net using specified mining algorithm and threshold.

    Parameters:
        miner (str): Mining algorithm, one of "inductive", "heuristic", or "alpha".
        log (event_log): Event log to be used for evaluation.
        threshold (float): Threshold for mining algorithm (dependency_threshold for "inductive",
                           noise_threshold for "heuristic").

    Returns:
        None
    """
    if miner == "inductive":
        net, im, fm = pm4py.discover_petri_net_heuristics(log, dependency_threshold=threshold)
    elif miner == "heuristic":
        net, im, fm = pm4py.discover_petri_net_inductive(log, noise_threshold=threshold)
    else:
        net, im, fm = pm4py.discover_petri_net_alpha(log)

    # Calculate fitness using token-based replay
    tbr_fitness = pm4py.fitness_token_based_replay(log, net, im, fm)
    fitness = round(tbr_fitness['perc_fit_traces'], 4)

    # Calculate precision using token-based replay
    etc_prec = pm4py.precision_token_based_replay(log, net, im, fm)
    precision = round(etc_prec * 100, 4)

    # Calculate generalization
    gen = generalization_evaluator.apply(log, net, im, fm)
    generalization = round(gen * 100, 4)

    # Calculate simplicity
    simp = simplicity_evaluator.apply(net)
    simplicity = round(simp * 100, 4)

    # Print the evaluation results
    print(f"Fitness: {fitness}%")
    print(f"Precision: {precision}%")
    print(f"Generalization: {generalization}%")
    print(f"Simplicity: {simplicity}%")

    # Store the evaluation results in the data list
    data.append((miner, threshold, fitness, precision, generalization, simplicity))


### Run the quality assessment for the inductive and heuristic miners with different thresholds

In [None]:
data = []
thresholds = [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
miners = ["inductive", "heuristic"]

# Evaluate Petri nets for different mining algorithms and thresholds
for miner in miners:
    for threshold in thresholds:
        print(f"{miner}: {threshold}")
        evaluate_petri_net(miner, event_log, threshold)

# Append evaluation results for the "alpha" miner with fixed values
for threshold in thresholds:
    data.append(("alpha", threshold, 0.0, 19.438, 87.0583, 77.777))


### Visualize the results for each miner

In [None]:
def pm_quality_plot(data, name):
    """
    Create a quality plot for Petri nets based on evaluation metrics.

    Parameters:
        data (list): List of tuples containing evaluation data for different thresholds.
        name (str): Name of the output plot file (e.g., "quality_plot").

    Returns:
        None
    """
    # Extracting metrics for plotting
    thresholds = [entry[0] for entry in data]
    fitness = [entry[1] for entry in data]
    precision = [entry[2] for entry in data]
    generalization = [entry[3] for entry in data]
    simplicity = [entry[4] for entry in data]

    # Plotting
    fig, ax = plt.subplots(figsize=(8, 8))

    # Miner plot
    ax.set_ylim(0, 100)
    ax.plot(thresholds, fitness, marker='o', label='Fitness', color='#44AA99', linewidth=5)
    ax.plot(thresholds, precision, marker='o', label='Precision', color='#DDCC77', linewidth=5)
    ax.plot(thresholds, generalization, marker='o', label='Generalization', color='#88CCEE', linewidth=5)
    ax.plot(thresholds, simplicity, marker='o', label='Simplicity', color='#CC6677', linewidth=5)
    ax.set_xlabel('Dependency Threshold', fontsize=17, weight='bold')
    ax.set_ylabel('Evaluation Metric (%)', fontsize=17, weight='bold')
    ax.tick_params(axis='y', labelsize=15)
    ax.tick_params(axis='x', labelsize=15)
    ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=2, frameon=False, fontsize=17)
    ax.grid(True)

    plt.tight_layout()
    plt.savefig(f"{name}.pdf", bbox_inches="tight")
    plt.show()


In [None]:
# Separate data into different categories based on mining algorithms
inductive_data = [entry[1:] for entry in data if entry[0] == 'inductive']
heuristic_data = [entry[1:] for entry in data if entry[0] == 'heuristic']
alpha_data = [entry[1:] for entry in data if entry[0] == 'alpha']

# Generate quality plots for different mining algorithms
pm_quality_plot(inductive_data, "inductive")
pm_quality_plot(heuristic_data, "heuristic")
pm_quality_plot(alpha_data, "alpha")
