In [1]:
from datetime import datetime
import polars as pl
import numpy as np
from pyarrow.dataset import write_dataset

In [2]:
rng = np.random.default_rng()

In [3]:
df = pl.read_csv("external_ids.csv", columns=["MaximumPowerValue", "Operating", "EnergyProduction"])
df["dummy"] = pl.repeat(value=True, n=df.height, eager=True)
df["init_ep_value"] = df["MaximumPowerValue"] * pl.Series(rng.uniform(size=df.height))
df["operating_value"] = pl.Series(rng.uniform(size=df.height) > 0.01)
df

MaximumPowerValue,Operating,EnergyProduction,dummy,init_ep_value,operating_value
i64,str,str,bool,f64,bool
10000000,"""oper1""","""ep1""",True,4845100.0,True
15000000,"""oper2""","""ep2""",True,12399000.0,True
5000000,"""oper3""","""ep3""",True,4741500.0,True
10000000,"""oper4""","""ep4""",True,4734800.0,True
15000000,"""oper5""","""ep5""",True,8186500.0,True
5000000,"""oper6""","""ep6""",True,2269600.0,True
10000000,"""oper7""","""ep7""",True,7030900.0,True
15000000,"""oper8""","""ep8""",True,2546100.0,True


In [4]:
def add_timestamp_partition_columns(df):
    df["year"] = df["timestamp"].dt.year()
    df["month"] = df["timestamp"].dt.month()
    df["day"] = df["timestamp"].dt.day()

In [5]:
by = ["year", "month", "day", "id"]

In [6]:
ep_datapoint_counter = 0
op_datapoint_counter = 0
for (month, day) in [(8, 29), (8, 30), (9, 1)]:
    out_eps = []
    out_ops = []
    for hour in range(0, 24):
        print(month, day, hour)
        timestamps = pl.DataFrame({"timestamp": pl.date_range(low=datetime(2022, month, day, hour, 0, 0),
                                                              high=datetime(2022, month, day, hour, 59, 59),
                                                              interval="10s")})
        timestamps["dummy"] = pl.repeat(value=True, n=timestamps.height, eager=True)

        energy_production_deltas = rng.normal(0, 1_0000, size=df.height * timestamps.height)

        ep = df[["init_ep_value", "EnergyProduction", "operating_value", "dummy"]].join(timestamps, left_on=["dummy"],
                                                                                        right_on=["dummy"]).drop(
            "dummy").rename({"EnergyProduction": "id"})

        ep["value"] = energy_production_deltas
        ep["value"] = ep.groupby(["id"], maintain_order=True).apply(lambda x: pl.DataFrame({"value": x["value"].cumsum()}))["value"]
        ep["value"] = ep["value"] + ep["init_ep_value"]
        df["init_ep_value"] = ep.groupby(["id"], maintain_order=True).apply(lambda x: pl.DataFrame({"init_ep_value": x["value"].tail(1)}))[
            "init_ep_value"]

        ep = ep.drop("init_ep_value")
        add_timestamp_partition_columns(ep)

        ep = ep.filter(pl.col("operating_value")).drop("operating_value")
        ep = ep.sort(by)
        ep["datapoint_id"] = ep.get_column("id") + "_" + pl.Series("counter", range(ep_datapoint_counter, ep_datapoint_counter + ep.height)).cast(str)
        ep_datapoint_counter += ep.height

        op = df[["Operating", "operating_value", "dummy"]].join(timestamps, left_on=["dummy"], right_on=["dummy"]).drop(
            "dummy").rename({"Operating": "id", "operating_value": "value"})
        add_timestamp_partition_columns(op)
        op = op.sort(by)
        op["datapoint_id"] = op.get_column("id") + "_" + pl.Series("counter", range(op_datapoint_counter, op_datapoint_counter + op.height)).cast(str)
        op_datapoint_counter += op.height

        out_ops.append(op)
        out_eps.append(ep)

    op_arrow = pl.concat(out_ops, rechunk=True).to_arrow()
    write_dataset(op_arrow, format="parquet", base_dir="timeseries_boolean", partitioning=by, use_threads=False,
                  min_rows_per_group=1073741824, max_partitions=10000, max_rows_per_file=1073741824,
                  existing_data_behavior="overwrite_or_ignore", max_rows_per_group=1073741824)
    op_arrow = None #Cause gc?

    ep_arrow = pl.concat(out_eps, rechunk=True).to_arrow()
    write_dataset(ep_arrow, format="parquet", base_dir="timeseries_double", partitioning=by, use_threads=False,
                  existing_data_behavior="overwrite_or_ignore", min_rows_per_group=1073741824, max_rows_per_file=1073741824,
                  max_rows_per_group=1073741824, max_partitions=10000, max_open_files=10000)

8 29 0
8 29 1
8 29 2
8 29 3
8 29 4
8 29 5
8 29 6
8 29 7
8 29 8
8 29 9
8 29 10
8 29 11
8 29 12
8 29 13
8 29 14
8 29 15
8 29 16
8 29 17
8 29 18
8 29 19
8 29 20
8 29 21
8 29 22
8 29 23
8 30 0
8 30 1
8 30 2
8 30 3
8 30 4
8 30 5
8 30 6
8 30 7
8 30 8
8 30 9
8 30 10
8 30 11
8 30 12
8 30 13
8 30 14
8 30 15
8 30 16
8 30 17
8 30 18
8 30 19
8 30 20
8 30 21
8 30 22
8 30 23
9 1 0
9 1 1
9 1 2
9 1 3
9 1 4
9 1 5
9 1 6
9 1 7
9 1 8
9 1 9
9 1 10
9 1 11
9 1 12
9 1 13
9 1 14
9 1 15
9 1 16
9 1 17
9 1 18
9 1 19
9 1 20
9 1 21
9 1 22
9 1 23
