In [1]:
import random
import csv
from random import randint
from datetime import datetime, timedelta, time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [2]:
phase_list = [" L1", " L2", " L3"]

voltage = "Spänning"

active_power_consumption = "P14"

active_energy_consumption = "A14"

reactive_power_consumption = "Q12"

reactive_energy_consumption = "R12"

active_power_production = "P23"

active_energy_production = "A23"

reactive_power_production = "Q34"

reactive_energy_production = "R34"

In [3]:
fieldnames = [
    "series_id",
    "location_id",
    "meter_id",
    "time",
    "value",
    "value_status",
    "insert_time",
    "extract_time",
]

In [4]:
# [(location id,meter id)]
def generate_lid_mid(isRandom, size):
    if isRandom:

        return [
            (
                "".join(["{}".format(random.randint(0, 9)) for _ in range(18)]),
                "".join(["{}".format(random.randint(0, 9)) for _ in range(16)]),
            )
            for _ in range(size)
        ]
    else:
        return [
            (
                random.randint(800000000000000000, 900000000000000000),
                random.randint(6000000000000000, 7000000000000000),
            )
            for _ in range(size)
        ]

In [5]:
class GenState:
    def __init__(self):
        # total iteration that will run
        # in total 19 * 10 * total_event (19 series id, 10 meter-location ids)
        # series id can vary based on day or nigh (consumption or production)
        # per hour 4 events are generated. for 1 day 96
        # for 7 days 672
        self.total_event = 672

        # start deviation after 100 iteration
        self.deviation_start = 5

        # time each deviation occurs
        self.voltage_dev = 0
        self.voltage_alt = 0
        self.voltage_phase_zero = 0
        # for pattern 3
        # higher value, expect higher alerts
        self.power_zero_dev = 130

        # the deviation duration in iteration
        # keep value low, atleast 2 for 2 timestamps in sequence
        self.deviation_duration_count = 3

        # offset so that two deviation iteration are not side by side
        self.offset = 2

        # there will be deviation_time times a deviation will occur
        self.deviation_time = (
            self.power_zero_dev
            + self.voltage_dev
            + self.voltage_alt
            + self.voltage_phase_zero
        )
        if (self.deviation_duration_count * self.deviation_time > self.total_event) or (
            self.deviation_duration_count > self.total_event
        ):
            raise Exception("cannot have more deviation than total events")

        # randomly generate meter id and location id
        self.is_mid_lid_random = False

        # size of  meter and location ids
        self.size_mid_lid = 1000

        # fault meters
        self.faulty_meters_count = 1000

        # output file path
        self.file_cm_values = "data/generated_cm_values"
        self.file_cm_events = "data/generated_cm_events"

        if self.faulty_meters_count > self.size_mid_lid:
            raise Exception("cannot have more faulty meters than total meters")

        self.lid_mid = generate_lid_mid(self.is_mid_lid_random, self.size_mid_lid)
        self.fault_meters = sorted(
            random.sample(self.lid_mid, self.faulty_meters_count)
        )

        # day start at 9 and end at 17
        self.day_start = 8
        self.day_end = 17

        possible_values = list(
            range(
                self.deviation_start,
                self.total_event - self.deviation_duration_count,
                self.deviation_duration_count + self.offset,
            )
        )

        print(f"poss type {len(possible_values)}")
        print(f"dev type {self.deviation_time}")

        # deviation iteration value on when to trigger a deviation
        self.deviation_points = sorted(
            random.sample(possible_values, self.deviation_time)
        )

        self.random_faulty_meters_subset = random.sample(
            self.fault_meters, random.randint(1, self.faulty_meters_count // 2)
        )

    def get_total_event(self):
        return self.total_event

    def get_deviation_start(self):
        return self.deviation_start

    def get_voltage_dev(self):
        return self.voltage_dev

    def get_power_zero_dev(self):
        return self.power_zero_dev

    def get_voltage_alt(self):
        return self.voltage_alt

    def get_voltage_phase_zero(self):
        return self.voltage_phase_zero

    def get_deviation_duration_count(self):
        return self.deviation_duration_count

    def get_deviation_time(self):
        return self.deviation_time

    def get_offset(self):
        return self.offset

    def get_is_mid_lid_random(self):
        return self.is_mid_lid_random

    def get_size_mid_lid(self):
        return self.size_mid_lid

    def get_faulty_meters_count(self):
        return self.faulty_meters_count

    def get_file_cm_values(self):
        return self.file_cm_values

    def get_file_cm_events(self):
        return self.file_cm_events

    def get_lid_mid(self):
        return self.lid_mid

    def get_fault_meters(self):
        return self.fault_meters

    def get_day_start(self):
        return self.day_start

    def get_day_end(self):
        return self.day_end

    def get_deviation_points(self):
        return self.deviation_points

    def get_random_faulty_meters_subset(self):
        return self.random_faulty_meters_subset

    def regen_random_faulty_meters_subset(self):
        self.random_faulty_meters_subset = random.sample(
            self.fault_meters, random.randint(1, self.faulty_meters_count // 2)
        )

In [6]:
app_state = GenState()

print(f"total event to generate {app_state.get_total_event()}")



print(f"single deviation size in iteration {app_state.get_deviation_duration_count()}")


print(
    f"total deviation size in iteration {app_state.get_deviation_duration_count() * app_state.get_deviation_time()}"
)



print(f"day start {app_state.get_day_start()}")



print(f"day end {app_state.get_day_end()}")
print(f"deviation starting points {app_state.get_deviation_points()}")



print(f"fault meters {app_state.get_fault_meters()}")
print(f"random subset of faulty meters {app_state.get_random_faulty_meters_subset()}")

poss type 133
dev type 130
total event to generate 672
single deviation size in iteration 3
total deviation size in iteration 390
day start 8
day end 17
deviation starting points [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145, 150, 155, 160, 165, 170, 175, 180, 185, 190, 195, 200, 205, 210, 215, 220, 225, 230, 235, 240, 245, 250, 255, 260, 265, 275, 280, 285, 295, 300, 305, 310, 315, 320, 325, 330, 335, 340, 345, 350, 355, 360, 365, 370, 375, 380, 385, 390, 395, 400, 405, 410, 415, 420, 425, 430, 435, 440, 445, 450, 455, 465, 470, 475, 480, 485, 490, 495, 500, 505, 510, 515, 520, 525, 530, 535, 540, 545, 550, 555, 560, 565, 570, 575, 580, 585, 590, 595, 600, 605, 610, 615, 620, 625, 630, 635, 640, 645, 650, 655, 660, 665]
fault meters [(800037627866899328, 6445019590877522), (800060513980099938, 6691365253383676), (800187471108761232, 6296563554298617), (800220835099366031, 6756556426069738), (800264477003388

In [7]:
def get_value(sid, val_type):
    ret_val: int
    if sid.startswith("Spänning"):
        match (val_type):
            case "voltage-high":
                ret_val = round(random.uniform(241, 246), 0)
            case "voltage-low":
                ret_val = round(random.uniform(229, 234), 0)
            case "voltage-zero" | "p-zero":
                ret_val = 0.0
            case _ if val_type == "l1" and sid == "Spänning L1":
                ret_val = 0.0
            case _ if val_type == "l2" and sid == "Spänning L2":
                ret_val = 0.0
            case _ if val_type == "l3" and sid == "Spänning L3":
                ret_val = 0.0
            case _:
                ret_val = round(random.uniform(235, 240), 0)
    elif sid.startswith("P14"):  # active-power-consumption/kW
        match (val_type):
            case "power-high":
                ret_val = round(random.uniform(2, 3), 3)
            case "power-low":
                ret_val = round(random.uniform(0.01, 1), 3)
            case "power-zero" | "p-zero":
                ret_val = 0.0
            # case "P14 L1" | "P14 L2" | "P14 L3":
            case _:
                ret_val = round(random.uniform(1, 2), 3)
    elif sid.startswith("P23"):  # active-power-production/kW
        match (val_type):
            case "power-high":
                ret_val = round(random.uniform(2, 3), 3)
            case "power-low":
                ret_val = round(random.uniform(0.01, 1), 3)
            case "power-zero" | "p-zero":
                ret_val = 0.0
            # case "P23 L1" | "P23 L2" | "P23 L3":
            case _:
                ret_val = round(random.uniform(1, 2), 3)
    elif sid.startswith("A14"):  # active-energy-consumption/kWh
        match (val_type):
            case "energy-high":
                ret_val = round(random.uniform(16, 19), 3)
            case "energy-low":
                ret_val = round(random.uniform(5, 8), 3)
            case "energy-zero" | "p-zero":
                ret_val = 0.0
            case _:
                ret_val = round(random.uniform(14, 16), 3)
    elif sid.startswith("A23"):  # active-energy-production/kWh
        match (val_type):
            case "energy-high":
                ret_val = round(random.uniform(16, 19), 3)
            case "energy-low":
                ret_val = round(random.uniform(5, 8), 3)
            case "energy-zero" | "p-zero":
                ret_val = 0.0
            case _:
                ret_val = round(random.uniform(14, 16), 3)
    elif sid.startswith("Q12"):  # reactive-power-consumption/kVAr
        match (val_type):
            case "power-high":
                ret_val = round(random.uniform(2, 3), 3)
            case "power-low":
                ret_val = round(random.uniform(0.01, 1), 3)
            case "power-zero" | "p-zero":
                ret_val = 0.0
            # case "Q12 L1" | "Q12 L2" | "Q12 L3":
            case _:
                ret_val = round(random.uniform(1, 2), 3)
    elif sid.startswith("Q34"):  # reactive-power-production/kVAr
        match (val_type):
            case "power-high":
                ret_val = round(random.uniform(2, 3), 3)
            case "power-low":
                ret_val = round(random.uniform(0.01, 1), 3)
            case "power-zero" | "p-zero":
                ret_val = 0.0
            # case "Q34 L1" | "Q34 L2" | "Q34 L3":
            case _:
                ret_val = round(random.uniform(1, 2), 3)
    elif sid.startswith("R12"):  # reactive-energy-consumption/kVArh
        match (val_type):
            case "energy-high":
                ret_val = round(random.uniform(16, 19), 3)
            case "energy-low":
                ret_val = round(random.uniform(5, 8), 3)
            case "energy-zero" | "p-zero":
                ret_val = 0.0
            case _:
                ret_val = round(random.uniform(14, 16), 3)
    elif sid.startswith("R34"):  # reactive-energy-production/kVArh
        match (val_type):
            case "energy-high":
                ret_val = round(random.uniform(16, 19), 3)
            case "energy-low":
                ret_val = round(random.uniform(5, 8), 3)
            case "energy-zero" | "p-zero":
                ret_val = 0.0
            case _:
                ret_val = round(random.uniform(14, 16), 3)
    else:
        print("unknow series id")
    return ret_val

In [8]:
def is_day_night(timestamp, app_state: GenState):
    start = time(app_state.get_day_start())
    end = time(app_state.get_day_end())

    return start <= timestamp <= end

In [9]:
def random_event_type_generator(isDay):
    tup = set()

    tup.add(voltage + phase_list[0])
    tup.add(voltage + phase_list[1])
    tup.add(voltage + phase_list[2])

    if isDay:
        tup.add(active_power_consumption + phase_list[0])
        tup.add(active_power_consumption + phase_list[1])
        tup.add(active_power_consumption + phase_list[2])

        tup.add(reactive_power_consumption + phase_list[0])
        tup.add(reactive_power_consumption + phase_list[1])
        tup.add(reactive_power_consumption + phase_list[2])

        tup.add(active_energy_consumption)
        tup.add(reactive_energy_consumption)

    else:
        tup.add(active_power_production + phase_list[0])
        tup.add(active_power_production + phase_list[1])
        tup.add(active_power_production + phase_list[2])

        tup.add(reactive_power_production + phase_list[0])
        tup.add(reactive_power_production + phase_list[1])
        tup.add(reactive_power_production + phase_list[2])

        tup.add(active_energy_production)
        tup.add(reactive_energy_production)

    return tup

In [10]:
def generate_cm_event(time, dev_type, app_state: GenState):
    for lid, mid in app_state.get_lid_mid():
        if not any(
            int(x[1]) == int(mid) for x in app_state.get_random_faulty_meters_subset()
        ):
            if random.choice((True, False)):
                yield {
                    "location_id": lid,
                    "meter_id": mid,
                    "event_tstamp": time,
                    "event_name": f"Current limit exceeded {random.choice(("L1", "L2", "L3"))}",
                }
            else:
                yield {}
        else:
            if dev_type == "p-zero":
                yield {
                    "location_id": lid,
                    "meter_id": mid,
                    "event_tstamp": time,
                    "event_name": f"Current limit exceeded {random.choice(("L1", "L2", "L3"))}",
                }
            else:
                yield {}

In [11]:
def generate_event(time, dev_type, is_day, app_state: GenState):
    for lid, mid in app_state.get_lid_mid():

        if not (
            any(
                int(x[1]) == int(mid)
                for x in app_state.get_random_faulty_meters_subset()
            )
        ):
            dev_type_pass = "norm"
        else:
            dev_type_pass = dev_type

        for x in random_event_type_generator(is_day):

            yield {
                "series_id": x,
                "location_id": lid,
                "meter_id": mid,
                "time": time,
                "value": get_value(x, dev_type_pass),
                "value_status": "Correct",
            }

In [12]:
def generate_readings(
    start_time,
    app_state: GenState,
):
    current_time = start_time
    with open(
        app_state.get_file_cm_values(), mode="w", newline="", encoding="utf8"
    ) as file_for_cm_values, open(
        app_state.get_file_cm_events(), mode="w", newline="", encoding="utf8"
    ) as file_for_cm_events:
        dev_count = 0
        dev_type = "norm"
        main_dev_mode = "none"
        tmp_var = "none"
        for itr in range(app_state.get_total_event()):
            is_day: bool
            if is_day_night(current_time.time(), app_state):
                is_day = True
            else:
                is_day = False

            if itr in app_state.get_deviation_points():
                if app_state.voltage_dev != 0:
                    app_state.voltage_dev -= 1
                    if random.choice((True, False)):
                        dev_type = "voltage-high"
                    else:
                        dev_type = "voltage-low"
                elif app_state.power_zero_dev != 0:
                    app_state.power_zero_dev -= 1
                    main_dev_mode = "power-zero"
                    app_state.regen_random_faulty_meters_subset()
                elif app_state.voltage_alt != 0:
                    app_state.voltage_alt -= 1
                    main_dev_mode = "voltage-alt"
                elif app_state.voltage_phase_zero != 0:
                    app_state.voltage_phase_zero -= 1
                    ch = random.choice(("l1", "l2", "l3"))
                    if tmp_var == "l1":
                        dev_type = "l2"
                    elif tmp_var == "l2":
                        dev_type = "l3"
                    else:
                        dev_type = "l1"
                    tmp_var = ch

            if main_dev_mode == "voltage-alt":
                if dev_type == "voltage-zero":
                    dev_type = "voltage-low"
                else:
                    dev_type = "voltage-zero"
            elif main_dev_mode == "power-zero":
                dev_type = "p-zero"

            event_gen = generate_event(current_time, dev_type, is_day, app_state)

            cm_event_gen = generate_cm_event(
                current_time - timedelta(minutes=random.randint(1, 14)),
                dev_type,
                app_state,
            )

            if dev_type != "norm":
                dev_count += 1
                if dev_count == app_state.get_deviation_duration_count():

                    dev_type = "norm"
                    main_dev_mode = "none"

                    dev_count = 0
            # series id multplied by unique meters ids
            # voltage * 3 +
            # (P14 * 3 + Q12 * 3 + A14 + R12) +
            # (P23 * 3 + Q34 * 3 + A23 + R34)
            v = (3 + 3 + 3 + 1 + 1) * app_state.get_size_mid_lid()
            # print("size",v)
            for _ in range(v):
                event = next(event_gen)
                # print(event)
                file_for_cm_values.write(
                    f"series_id={event['series_id']},location_id={event['location_id']},meter_id={event['meter_id']},time={event['time']},value={event['value']},value_status={event['value_status']}\n"
                )

            for _ in range(app_state.get_size_mid_lid()):
                cm_event = next(cm_event_gen)
                if cm_event:
                    # print(cm_event)
                    file_for_cm_events.write(
                        f"location_id={cm_event['location_id']},meter_id={cm_event['meter_id']},event_tstamp={cm_event['event_tstamp']},event_name={cm_event['event_name']}\n"
                    )

            current_time += timedelta(minutes=15)

In [13]:
# User-defined start time
start_time_str = "2025-01-15 00:00:00"
start_time = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")

# Generate readings
generate_readings(start_time, app_state)

# print(start_time.time())

In [14]:
def plot_single_axis(selected_series_id, selected_location_id):
    chunk_size = 10000
    time_data, value_data = [], []

    # with milliseconds
    # .strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

    dateparse = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f")
    # , parse_dates=["time"], date_parser=dateparse

    for chunk in pd.read_csv(app_state.get_file_cm_values(), chunksize=chunk_size):
        filtered_chunk = chunk[
            (selected_series_id == chunk["series_id"])
            & (selected_location_id == chunk["location_id"])
        ]

        time_data.extend(filtered_chunk["time"])
        value_data.extend(filtered_chunk["value"])

    # Plot the data
    plt.figure(figsize=(12, 6))
    plt.plot(
        time_data,
        value_data,
        marker="o",
        linestyle="-",
        markersize=3,
        color="b",
        alpha=0.7,
    )

    # Formatting the plot
    plt.xlabel("Time")
    plt.ylabel("Value")
    plt.title(
        f"Plot of 'Value' Over 'Time' (Series ID: {selected_series_id}, Location ID: {selected_location_id})"
    )
    plt.xticks(rotation=45)
    plt.grid()
    plt.show()

In [15]:
def plot_dual_axis(selected_series_ids, selected_location_id, file_path):
    chunk_size = 10000
    time_series_data = {
        series_id: {"time": [], "value": []} for series_id in selected_series_ids
    }

    # Initialize storage for each series ID
    for series_id in selected_series_ids:
        time_series_data[series_id] = {"time": [], "value": []}

    # Read CSV in chunks
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        filtered_chunk = chunk[chunk["location_id"] == selected_location_id]

        for series_id in selected_series_ids:
            series_data = filtered_chunk[filtered_chunk["series_id"] == series_id]
            time_series_data[series_id]["time"].extend(series_data["time"])
            time_series_data[series_id]["value"].extend(series_data["value"])

    fig, ax1 = plt.subplots(figsize=(12, 6))

    # 1st axis
    colors_one = ["tab:red", "tab:blue", "tab:green"]
    ax1.set_xlabel("Time")
    ax1.set_ylabel(f"Value Voltages", color=colors_one[0])
    ax1.tick_params(axis="y", labelcolor=colors_one[0])
    for i, series_id_one in enumerate(selected_series_ids[:3]):
        color = colors_one[i % len(colors_one)]
        ax1.plot(
            time_series_data[series_id_one]["time"],
            time_series_data[series_id_one]["value"],
            color=color,
            label=f"Series {series_id_one}",
        )

    # 2nd axis
    colors_two = ["tab:orange", "tab:purple"]
    ax2 = ax1.twinx()
    ax2.set_ylabel(f"Value Power", color=colors_two[0])
    ax2.tick_params(axis="y", labelcolor=colors_two[0])
    for i, series_id_two in enumerate(selected_series_ids[3:]):
        color = colors_two[i % len(colors_two)]
        ax2.plot(
            time_series_data[series_id_two]["time"],
            time_series_data[series_id_two]["value"],
            color=color,
            label=f"Series {series_id_two}",
        )

    ax1.legend()
    ax2.legend()
    fig.tight_layout()
    plt.show()

In [16]:
selected_series_ids = ["Spänning L1", "Spänning L2", "Spänning L3", "P14 L1"]
target_location_id = app_state.get_fault_meters()[0][0]
# plot_dual_axis(selected_series_ids, target_location_id, "data/generated_cm_values.csv")