# Digital Twin of 5G network - Bachelor Thesis
## Dataset Creation - David Truhlar, 2025

In [79]:
import pandas as pd
from prometheus_api_client import PrometheusConnect
from datetime import datetime, timedelta
import os
import re
import pytz
from pathlib import Path
from metrics_config import metrics

In [80]:
# Global variables
PROMETHEUS_PORT = 9090                                                                            # Port for Prometheus metrics

STEP = "1s"                                                                                   # Time step for the simulation
TIMEDELTA_SECONDS = 10                                                                            # Time delta for the simulation
LOCAL_TZ = pytz.timezone("Europe/Bratislava")                                                     # TZ
LOG_DIR = "../log/"                                                                               # Directory with logs
LOG_PATTERN = re.compile(r"(\d{2}/\d{2} \d{2}:\d{2}:\d{2}\.\d{3}):\s+\[(\w+)\]\s+(\w+):\s*(.+)")  # Regex pattern for log lines
ENCODING = "utf-8"                                                                                # Encoding for log files

In [81]:
# Create a connection to prometheus
try:
    prom = PrometheusConnect(url=f"http://localhost:{PROMETHEUS_PORT}", disable_ssl=True)
except Exception as e:
    print(f"Error connecting to Prometheus: {e}")

In [82]:
# Time 
end_time = datetime.now()
start_time = end_time - timedelta(seconds=TIMEDELTA_SECONDS)

In [83]:
# from datetime import datetime

# # Input range (UTC)
# start_str = "2025-04-11 10:22:00"
# end_str = "2025-04-11 10:32:59"

# # Parse as UTC datetime
# start_utc = datetime.fromisoformat(start_str)
# end_utc = datetime.fromisoformat(end_str)

# # Convert to local time with microseconds
# start_local = start_utc.astimezone().replace(microsecond=0)
# end_local = end_utc.astimezone().replace(microsecond=0)

# print("Start time:", start_utc)
# print("End time:", end_utc)

# end_time = end_utc
# start_time = start_utc

### Fetch and process metrics from Prometheus

In [84]:
# Create an empty list to hold metric DataFrames
df_list = []

# Iterate through metric groups
for group, metric_list in metrics.items():
    for metric in metric_list:
        try:
            response = prom.custom_query_range(
                metric, start_time=start_time, end_time=end_time, step=STEP
            )

            if not response:
                print(f"⚠️ No data: {group}/{metric}")
                continue

            for entry in response:
                base_metric_name = entry["metric"].get("__name__", metric)

                if "values" in entry and isinstance(entry["values"], list):
                    extracted_values = [
                        (
                            datetime.utcfromtimestamp(int(ts))
                            .replace(tzinfo=pytz.utc)
                            .astimezone(LOCAL_TZ),
                            float(val)
                        )
                        for ts, val in entry["values"]
                    ]

                    metric_df = pd.DataFrame(extracted_values, columns=["timestamp", "value"])
                    metric_df["metric_name"] = base_metric_name
                    metric_df["group"] = group

                    df_list.append(metric_df)
                else:
                    print(f"⚠️ No valid values found: {group}/{metric}")

        except Exception as e:
            print(f"❌ Error fetching {group}/{metric}: {e}")

# Combine all metrics into one DataFrame
if df_list:
    final_df = pd.concat(df_list, ignore_index=True)
    final_df['timestamp'] = final_df['timestamp'].astype(str).str.replace(r'\+\d{2}:\d{2}', '', regex=True)
    final_df["timestamp"] = pd.to_datetime(final_df["timestamp"])
else:
    print("❌ No data collected for any metric.")


#### Process and extract logs from Open5gs functions

In [85]:
log_data = []

# Define allowed applications
allowed_applications = {"amf", "upf", "smf", "udm", "gmm"}

for log_path in Path(LOG_DIR).glob("*.log"):
    try:
        with open(log_path, "r", encoding=ENCODING, errors="ignore") as f:
            for line in f:
                match = LOG_PATTERN.match(line)
                if match:
                    timestamp_str, application, log_level, log_message = match.groups()

                    # Filter by allowed applications
                    if application.lower() not in allowed_applications:
                        continue

                    # Convert timestamp to datetime (add missing year)
                    log_timestamp = datetime.strptime(timestamp_str, "%m/%d %H:%M:%S.%f")
                    log_timestamp = log_timestamp.replace(year=start_time.year, microsecond=0)

                    # Keep only logs after Prometheus start time
                    if log_timestamp > start_time and log_timestamp < end_time:
                        log_data.append({
                            "timestamp": log_timestamp,
                            "application": application,
                            "log_level": log_level,
                            "log_message": log_message
                        })
    except Exception as e:
        print(f"❌ Failed to process {log_path.name}: {e}")

log_data = pd.DataFrame(log_data)

#### Aggregate and transform metrics data


In [86]:
# 🔹 Aggregate metrics (choose appropriate aggregation: mean, sum, max, etc.)
data_agg = final_df.groupby(["timestamp", "metric_name"])["value"].mean().reset_index()
data_pivot = data_agg.pivot(index="timestamp", columns="metric_name", values="value")

# Flatten column names
data_pivot.columns = [f"{col}_value" for col in data_pivot.columns]

# Reset index to bring timestamp back
data_pivot.reset_index(inplace=True)

In [87]:
# csv_file = "test_merged_data3.csv"

# # Determine whether to write header
# write_header = not os.path.exists(csv_file) or os.path.getsize(csv_file) == 0

# # Try filtering only new records if file exists and is not empty
# if not write_header:
#     try:
#         last_timestamp = pd.read_csv(csv_file, usecols=["timestamp"])["timestamp"].max()
#         data_pivot = data_pivot[data_pivot["timestamp"] > last_timestamp]
#     except Exception as e:
#         print(f"⚠️ Issue reading existing CSV: {e}. Proceeding without filtering.")

# # Append new data
# if not data_pivot.empty:
#     data_pivot.to_csv(csv_file, mode="a", index=False, header=write_header)

In [88]:
# Define function to classify log messages
patterns = {
    "remove": re.compile(r"\b(Removed|Deregister|De-register|Implicit De-registered)\b", re.IGNORECASE),
    "refused": re.compile(r"\b(refused|connection refused)\b", re.IGNORECASE),
    "number_of_sessions_or_ues": re.compile(r"\b(Number of (gNBs|AMF-UEs|AMF-Sessions|gNB-UEs))\b", re.IGNORECASE),
    "registration": re.compile(r"\b(Registration request|InitialUEMessage|Added|Unknown UE by SUCI|SUCI)\b", re.IGNORECASE),
    "error": re.compile(r"\b(ERROR)\b", re.IGNORECASE),
    "warning": re.compile(r"\b(WARNING)\b", re.IGNORECASE),
}

def classify_log_message(message):
    if not isinstance(message, str):
        return "nothing"
    for label, pattern in patterns.items():
        if pattern.search(message):
            return label
    return "nothing"

# Ensure log_message column exists before applying classification
if "log_message" in log_data.columns:
	log_data["log_type"] = log_data["log_message"].apply(classify_log_message)

In [89]:
logs_short = log_data[["timestamp", "application", "log_type"]] \
	if not log_data.empty \
	else pd.DataFrame(columns=["timestamp", "application", "log_type"])

In [90]:
# Convert logs_short timestamp to UTC datetime (assumes naive timestamps are UTC)
logs_short["timestamp"] = pd.to_datetime(logs_short["timestamp"], errors='coerce').dt.tz_localize("UTC")

# Filter logs after the UTC start_time (also localized to UTC for comparison)
logs_short = logs_short[logs_short["timestamp"] >= start_time.replace(tzinfo=pytz.utc)]

# Drop timezone info to match Prometheus format
logs_short["timestamp"] = logs_short["timestamp"].dt.tz_localize(None)

In [91]:
# Merge logs with metrics
merged_data = pd.merge(data_pivot, logs_short, on="timestamp", how="outer")

In [92]:
# Fill missing values in 'application' and 'log_type' columns with 0
merged_data["application"] = merged_data["application"].fillna(0)
merged_data["log_type"] = merged_data["log_type"].fillna(0)

  merged_data["application"] = merged_data["application"].fillna(0).astype(str)  # Ensure 'application' is a string
  merged_data["log_type"] = merged_data["log_type"].fillna(0).astype(str)        # Ensure 'log_type' is a string


In [93]:
# Read the current_uc value
current_uc = None

with open("../current_uc.txt", "r") as f:
    current_uc = f.read().strip()

# Add current_uc as a new column to the merged_data DataFrame
if not merged_data.empty:
    merged_data["current_uc"] = current_uc

In [94]:
csv_file = "running_data.csv"

# Determine whether to write header
write_header = not os.path.exists(csv_file) or os.path.getsize(csv_file) == 0

# Try filtering only new records if file exists and is not empty
if not write_header:
    try:
        last_timestamp = pd.read_csv(csv_file, usecols=["timestamp"])["timestamp"].max()
        merged_data = merged_data[merged_data["timestamp"] > last_timestamp]
    except Exception as e:
        print(f"⚠️ Issue reading existing CSV: {e}. Proceeding without filtering.")

# Append new data
if not merged_data.empty:
    merged_data.to_csv(csv_file, mode="a", index=False, header=write_header)