In [None]:
# filename: output.ipynb
# purpose: generate output data

# OHT noise, normal and outlier data generation

### Outlier detection method
- Outlier data will be detected and generated based on Moving Average and Moving Standard Deviation 

### Processing flow
- Read Raw table, ohtraw from duckdb filedb, which was created in previous step, parse notebook
- Create a Work table, work in memory duckdb with additional columns MVAVG_,MVSTD_,MVSIG_,FLAG columns.
- Calculate Moving Average, Standard Devidation
- Calculate Sigma value of the column data based on Moving Average and Standard Deviation
- Calcuate FLAG column value based on Sigma value
- Fetch the work table into work dataframe
- Split noise and normal dataframe from work dataframe based on FLAG value
- Create outlier dataframe based on normal dataframe
- Update outlier dataframe by applying outlier pattern
- Calculate Moving Average, Standard Deviation, Sigma value for outlier data
- Prepare mix dataset with normal and outlier for ML 
- Save noise, normal, outler, mix dataframe to duckdb tables for later graphing.
- Save noise, normal, outler, mix csvfle
- Check csvfile size 

In [None]:
# packages
import time
import pathlib
import textwrap
import pandas as pd

import humanfriendly as human
import duckdb

import ohtconf as conf
import ohtcomm as comm

## Main

In [None]:
mainstart = time.time()

### Prepare in-memory work table base on in-file raw table 

In [None]:
# open in-memory db

con = duckdb.connect(database=":memory:")

In [None]:
# create in-memory work table

query = "DROP TABLE IF EXISTS work"
con.execute(query)

coldef = ""
for name, dtype in zip(conf.COLUMN_NAMES, conf.COLUMN_DBTYPES):
    if coldef:
        coldef += ", " + name + " " + dtype
    else:
        coldef += name + " " + dtype

for col in conf.COLUMN_GRAPH:
    coldef += ", " + conf.MVAVG + col + " " + "FLOAT"
    coldef += ", " + conf.MVSTD + col + " " + "FLOAT"
    coldef += ", " + conf.MVSIG + col + " " + "INTEGER"

coldef += ", " + f"{conf.COLUMN_FLAG}  INTEGER"

query = f"CREATE TABLE work ( {coldef} )"
# print(textwrap.fill(query, width=120))

con.execute(query)

In [None]:
# attach in-file db, raw table was prepared in the previous step, parse

con.execute(f"ATTACH DATABASE '{conf.DBFILE}' AS filedb (READ_ONLY)")

In [None]:
# insert into in-memory work table from in-file raw table with additional Moving Avg,Std and Flag=0

coldef = ", ".join(conf.COLUMN_NAMES)

for col in conf.COLUMN_GRAPH:
    coldef += (
        ", "
        + f"AVG({col}) OVER (ORDER BY {conf.COLUMN_NAMES[0]} ROWS BETWEEN {conf.POINTS['MOVING']} PRECEDING AND CURRENT ROW) AS {conf.MVAVG}{col}"
    )
    coldef += (
        ", "
        + f"STDDEV({col}) OVER (ORDER BY {conf.COLUMN_NAMES[0]} ROWS BETWEEN {conf.POINTS['MOVING']} PRECEDING AND CURRENT ROW) AS {conf.MVSTD}{col}"
    )
    coldef += ", 0"  # mvsig_
coldef += ", 0"  # flag

query = f"INSERT INTO work SELECT {coldef} FROM filedb.{conf.TABNAME_RAW} ORDER BY {conf.COLUMN_NAMES[0]}"
# print(textwrap.fill(query, width=120))

con.execute(query)

In [None]:
# detach filedb

con.execute("DETACH DATABASE filedb")

In [None]:
# Update NULL value after window function
query = f"SELECT * FROM work ORDER BY {conf.COLUMN_NAMES[0]}"
dfwork = con.execute(query).df()
dfwork.bfill(inplace=True)

# recreate work table base one work dataframe
con.execute("DROP TABLE IF EXISTS work")
con.execute("CREATE TABLE work AS SELECT * FROM dfwork")

In [None]:
# calculate Sigma value

coldef = ""
for col in conf.COLUMN_GRAPH:
    setdef = textwrap.dedent(f"""{conf.MVSIG}{col} = CASE 
                  WHEN {col} >= ({col} - 1 * {conf.MVSTD}{col}) AND {col} <= ({col} + 1 * {conf.MVSTD}{col})  then 1
                  WHEN {col} >= ({col} - 2 * {conf.MVSTD}{col}) AND {col} <= ({col} + 2 * {conf.MVSTD}{col})  then 2
                  WHEN {col} >= ({col} - 3 * {conf.MVSTD}{col}) AND {col} <= ({col} + 3 * {conf.MVSTD}{col})  then 3
                  WHEN {col} >= ({col} - 4 * {conf.MVSTD}{col}) AND {col} <= ({col} + 4 * {conf.MVSTD}{col})  then 4
                  WHEN {col} >= ({col} - 5 * {conf.MVSTD}{col}) AND {col} <= ({col} + 5 * {conf.MVSTD}{col})  then 5
                  ELSE 6
                  END""")
    if not coldef:
        coldef = setdef
    else:
        coldef += f", {setdef}"

query = f"UPDATE work SET {coldef}"
# print(textwrap.fill(query, width=120))

con.execute(query)

In [None]:
# calculate Flag=1 based on configured sigma value

coldef = ""
for col in conf.COLUMN_GRAPH:
    if not coldef:
        coldef = f"{conf.MVSIG}{col} >= {conf.SIGMA_NOISE}"
    else:
        coldef += f" OR {conf.MVSIG}{col} >= {conf.SIGMA_NOISE}"

query = f"UPDATE work SET {conf.COLUMN_FLAG}=1 WHERE {coldef}"
# print(textwrap.fill(query, width=120))

con.execute(query)

### Prepare noise, normal dataframe

In [None]:
# fetch table from work table based on Flag

dfnoise = con.execute(f"SELECT * FROM work WHERE {conf.COLUMN_FLAG}!=0 ORDER BY {conf.COLUMN_NAMES[0]}").df()
dfnorm = con.execute(f"SELECT * FROM work WHERE {conf.COLUMN_FLAG}=0 ORDER BY {conf.COLUMN_NAMES[0]}").df()

# round float type value
dfnoise = dfnoise.round(1)
dfnorm = dfnorm.round(1)

print(f"row count, noise={len(dfnoise)}, normal={len(dfnorm)}, noise ratio={len(dfnoise)/(len(dfnoise)+len(dfnorm))}")

In [None]:
# set float display format
pd.set_option("display.float_format", "{:.1f}".format)

In [None]:
dfnoise.head()

In [None]:
dfnorm.head()

### Prepare outlier dataframe from normal dataframe

In [None]:
# choose dfoutl data in dfnorm

# reset datetm value: drop any duplicates and missing.
# keep datetm uniqueness in between normal and outlier to merge later to keep moving avg
dfnorm[conf.COLUMN_NAMES[0]] = pd.date_range(start="2024-01-01", periods=len(dfnorm), freq="100ms")

# take outlier candidates in splitted in datetm as moving avg is different in datetm range.
alls = [dfnorm.iloc[i : i + conf.POINTS["PATTERN"]] for i in range(0, len(dfnorm), conf.POINTS["PATTERN"])]
outls = alls[::4]

# index after split by datetm ordering
dfoutl = pd.concat(outls)
dfoutl = dfoutl.sort_values(by=conf.COLUMN_NAMES[0])
dfoutl = dfoutl.reset_index(drop=True)

# filter dfnorm to keep only rows where datetm is not in dfoutl
dfnorm = dfnorm[~dfnorm[conf.COLUMN_NAMES[0]].isin(dfoutl[conf.COLUMN_NAMES[0]])]

# index after split by datetm ordering
dfnorm = dfnorm.sort_values(by=conf.COLUMN_NAMES[0])
dfnorm = dfnorm.reset_index(drop=True)

print(f"row count, outlier={len(dfoutl)}, normal={len(dfnorm)}, outlier ratio={len(dfoutl)/(len(dfoutl)+len(dfnorm))}")

In [None]:
# Update outlier
_start = time.time()

dfoutl = comm.gen_outlier(dfoutl)

# round float type value
dfoutl = dfoutl.round(1)

_elapsed = time.time() - _start
print(f"get_outlier elapsed time: {human.format_timespan(_elapsed)}")
# 8 min 17 sec, 2_031_674 rows, INPUT_MAXSIZE=400MB, include DATETM on csvfile
# 14 min 16 sec, 3_280_186 rows, INPUT_MAXSIZE=650MB, exclude DATETM on csvfile

In [None]:
# recreate work table base one dfoutl
con.execute("DROP TABLE IF EXISTS work")
con.execute("CREATE TABLE work AS SELECT * FROM dfoutl WHERE 1=0")

In [None]:
# insert into in-memory work table from dfoutl with calculated Moving Avg,Std and asis Flag

coldef = ", ".join(conf.COLUMN_NAMES)

for col in conf.COLUMN_GRAPH:
    coldef += (
        ", "
        + f"AVG({col}) OVER (ORDER BY {conf.COLUMN_NAMES[0]} ROWS BETWEEN {conf.POINTS['MOVING']} PRECEDING AND CURRENT ROW) AS {conf.MVAVG}{col}"
    )
    coldef += (
        ", "
        + f"STDDEV({col}) OVER (ORDER BY {conf.COLUMN_NAMES[0]} ROWS BETWEEN {conf.POINTS['MOVING']} PRECEDING AND CURRENT ROW) AS {conf.MVSTD}{col}"
    )
    coldef += ", 0"  # mvsig_
coldef += ", FLAG"  # flag

query = f"INSERT INTO work SELECT {coldef} FROM dfoutl ORDER BY {conf.COLUMN_NAMES[0]}"
# print(textwrap.fill(query, width=120))

con.execute(query)

In [None]:
# Update NULL value after window function
query = f"SELECT * FROM work ORDER BY {conf.COLUMN_NAMES[0]}"
dfoutl = con.execute(query).df()
dfoutl.bfill(inplace=True)

# recreate work table base one work dataframe
con.execute("DROP TABLE IF EXISTS work")
con.execute("CREATE TABLE work AS SELECT * FROM dfoutl")

In [None]:
# calculate Sigma value

coldef = ""
for col in conf.COLUMN_GRAPH:
    setdef = textwrap.dedent(f"""{conf.MVSIG}{col} = CASE 
                  WHEN {col} >= ({col} - 1 * {conf.MVSTD}{col}) AND {col} <= ({col} + 1 * {conf.MVSTD}{col})  then 1
                  WHEN {col} >= ({col} - 2 * {conf.MVSTD}{col}) AND {col} <= ({col} + 2 * {conf.MVSTD}{col})  then 2
                  WHEN {col} >= ({col} - 3 * {conf.MVSTD}{col}) AND {col} <= ({col} + 3 * {conf.MVSTD}{col})  then 3
                  WHEN {col} >= ({col} - 4 * {conf.MVSTD}{col}) AND {col} <= ({col} + 4 * {conf.MVSTD}{col})  then 4
                  WHEN {col} >= ({col} - 5 * {conf.MVSTD}{col}) AND {col} <= ({col} + 5 * {conf.MVSTD}{col})  then 5
                  ELSE 6
                  END""")
    if not coldef:
        coldef = setdef
    else:
        coldef += f", {setdef}"

query = f"UPDATE work SET {coldef}"
# print(textwrap.fill(query, width=120))

con.execute(query)

In [None]:
dfoutl = con.execute(f"SELECT * FROM work ORDER BY {conf.COLUMN_NAMES[0]}").df()

In [None]:
# outlier first pattern range data
dfoutl.iloc[conf.POINTS["MOVING"] : conf.POINTS["MOVING"] + conf.POINTS["PATTERN"]]

In [None]:
# normal the first pattern range data
dfnorm.iloc[conf.POINTS["MOVING"] : conf.POINTS["MOVING"] + conf.POINTS["PATTERN"]]

### Prepare mix dataframe from normal and outlier dataframe

In [None]:
# prepare mix dataframe based on normal and outlier dataframe
dfnorm["FLAG"] = 0

dfmix = pd.concat([dfnorm, dfoutl])
dfmix = dfmix.sort_values(by=conf.COLUMN_NAMES[0])
dfmix = dfmix.reset_index(drop=True)

In [None]:
# recreate work table base one dfmix
con.execute("DROP TABLE IF EXISTS work")
con.execute("CREATE TABLE work AS SELECT * FROM dfmix WHERE 1=0")

In [None]:
# insert into in-memory work table from dfmix with calculated Moving Avg,Std and asis Flag value

coldef = ", ".join(conf.COLUMN_NAMES)

for col in conf.COLUMN_GRAPH:
    coldef += (
        ", "
        + f"AVG({col}) OVER (ORDER BY {conf.COLUMN_NAMES[0]} ROWS BETWEEN {conf.POINTS['MOVING']} PRECEDING AND CURRENT ROW) AS {conf.MVAVG}{col}"
    )
    coldef += (
        ", "
        + f"STDDEV({col}) OVER (ORDER BY {conf.COLUMN_NAMES[0]} ROWS BETWEEN {conf.POINTS['MOVING']} PRECEDING AND CURRENT ROW) AS {conf.MVSTD}{col}"
    )
    coldef += ", 0"  # mvsig_
coldef += ", FLAG"  # flag

query = f"INSERT INTO work SELECT {coldef} FROM dfmix ORDER BY {conf.COLUMN_NAMES[0]}"
# print(textwrap.fill(query, width=120))

con.execute(query)

In [None]:
# Update NULL value after window function
query = f"SELECT * FROM work ORDER BY {conf.COLUMN_NAMES[0]}"
dfmix = con.execute(query).df()
dfmix.bfill(inplace=True)

# recreate work table base one work dataframe
con.execute("DROP TABLE IF EXISTS work")
con.execute("CREATE TABLE work AS SELECT * FROM dfmix")

In [None]:
# calculate Sigma value

coldef = ""
for col in conf.COLUMN_GRAPH:
    setdef = textwrap.dedent(f"""{conf.MVSIG}{col} = CASE 
                  WHEN {col} >= ({col} - 1 * {conf.MVSTD}{col}) AND {col} <= ({col} + 1 * {conf.MVSTD}{col})  then 1
                  WHEN {col} >= ({col} - 2 * {conf.MVSTD}{col}) AND {col} <= ({col} + 2 * {conf.MVSTD}{col})  then 2
                  WHEN {col} >= ({col} - 3 * {conf.MVSTD}{col}) AND {col} <= ({col} + 3 * {conf.MVSTD}{col})  then 3
                  WHEN {col} >= ({col} - 4 * {conf.MVSTD}{col}) AND {col} <= ({col} + 4 * {conf.MVSTD}{col})  then 4
                  WHEN {col} >= ({col} - 5 * {conf.MVSTD}{col}) AND {col} <= ({col} + 5 * {conf.MVSTD}{col})  then 5
                  ELSE 6
                  END""")
    if not coldef:
        coldef = setdef
    else:
        coldef += f", {setdef}"

query = f"UPDATE work SET {coldef}"
# print(textwrap.fill(query, width=120))

con.execute(query)

In [None]:
# calculate Sigma value

coldef = ""
for col in conf.COLUMN_GRAPH:
    setdef = textwrap.dedent(f"""{conf.MVSIG}{col} = CASE 
                  WHEN {col} >= ({col} - 1 * {conf.MVSTD}{col}) AND {col} <= ({col} + 1 * {conf.MVSTD}{col})  then 1
                  WHEN {col} >= ({col} - 2 * {conf.MVSTD}{col}) AND {col} <= ({col} + 2 * {conf.MVSTD}{col})  then 2
                  WHEN {col} >= ({col} - 3 * {conf.MVSTD}{col}) AND {col} <= ({col} + 3 * {conf.MVSTD}{col})  then 3
                  WHEN {col} >= ({col} - 4 * {conf.MVSTD}{col}) AND {col} <= ({col} + 4 * {conf.MVSTD}{col})  then 4
                  WHEN {col} >= ({col} - 5 * {conf.MVSTD}{col}) AND {col} <= ({col} + 5 * {conf.MVSTD}{col})  then 5
                  ELSE 6
                  END""")
    if not coldef:
        coldef = setdef
    else:
        coldef += f", {setdef}"

query = f"UPDATE work SET {coldef}"
# print(textwrap.fill(query, width=120))

con.execute(query)

In [None]:
dfmix = con.execute(f"SELECT * FROM work ORDER BY {conf.COLUMN_NAMES[0]}").df()

# round float type value
dfmix = dfmix.round(1)

In [None]:
# check data
print(
    f"row count, mix={len(dfmix)}, normal={len(dfmix[dfmix['FLAG']==0])}, outlier={len(dfmix[dfmix['FLAG']!=0])}, outlier ratio={len(dfmix[dfmix['FLAG']!=0])/len(dfmix)}"
)
dfmix.head()

### Save dataframe into duckdb table

In [None]:
# save noise
_start = time.time()

comm.save_dftab(dfnoise, conf.TABNAME_NOISE)

_elapsed = time.time() - _start
print(f"save db, noise elapsed time: {human.format_timespan(_elapsed)}")

In [None]:
# save normal
_start = time.time()

comm.save_dftab(dfnorm, conf.TABNAME_NORM)

_elapsed = time.time() - _start
print(f"save db, norm elapsed time: {human.format_timespan(_elapsed)}")

In [None]:
# save outlier
_start = time.time()

comm.save_dftab(dfoutl, conf.TABNAME_OUTL)

_elapsed = time.time() - _start
print(f"save db, outl elapsed time: {human.format_timespan(_elapsed)}")

In [None]:
# save mix
_start = time.time()

comm.save_dftab(dfmix, conf.TABNAME_MIX)

_elapsed = time.time() - _start
print(f"save db, mix elapsed time: {human.format_timespan(_elapsed)}")

In [None]:
# close in-memory db
con.close()

### Save dataframe into csvfile

In [None]:
# save noise
_start = time.time()

comm.save_csvfile(dfnoise[conf.COLUMN_NAMES], conf.FILENAME_NOISE, conf.DIROUT)  # only noise

_elapsed = time.time() - _start
print(f"save csvfile, noise elapsed time: {human.format_timespan(_elapsed)}")  # no files for conf.INPUT_MAXSIZE=400 MB

In [None]:
# save normal
_start = time.time()

comm.save_csvfile(dfnorm[conf.COLUMN_NAMES], conf.FILENAME_NORM, conf.DIROUT)

_elapsed = time.time() - _start
print(f"save csvfile, norm elapsed time: {human.format_timespan(_elapsed)}")
#  8 min. 52 sec, 170 files, 374 MB for conf.INPUT_MAXSIZE=400 MB, exclude DATETM
# 11 min. 49 sec, 274 files, 380 MB for conf.INPUT_MAXSIZE=650 MB, include DATETM

In [None]:
# save outlier
_start = time.time()

comm.save_csvfile(dfoutl[conf.COLUMN_NAMES], conf.FILENAME_OUTL, conf.DIROUT)

_elapsed = time.time() - _start
print(f"save csvfile, outl elapsed time: {human.format_timespan(_elapsed)}")
# 2 min. 57 sec, 57 files 126 MB for conf.INPUT_MAXSIZE=400 MB
# 3 min. 58 sec. 72 files 128 MB for conf.INPUT_MAXSIZE=650 MB

In [None]:
# save mix
_start = time.time()

comm.save_csvfile(dfmix[conf.COLUMN_NAMES + ["FLAG"]], conf.FILENAME_MIX, conf.DIROUT)  # with FLAG column

_elapsed = time.time() - _start
print(f"save csvfile, mix elapsed time: {human.format_timespan(_elapsed)}")

### Check row count & file size

In [None]:
# display row count for check
print(f"row count noise={len(dfnoise)}, norm={len(dfnorm)}, outl={len(dfoutl)}, mix={len(dfmix)}")

In [None]:
# display file size for check

fileinfo = dict()  # count, size
for afile in [conf.FILENAME_NOISE, conf.FILENAME_NORM, conf.FILENAME_OUTL, conf.FILENAME_MIX]:
    basename = pathlib.Path(afile).stem
    adir = str(pathlib.Path(conf.DIROUT) / basename)
    files = comm.get_multifiles_indir(adir, "*.csv")
    sizes = comm.get_multifiles_size(files)

    fileinfo[basename] = [len(files), sum(sizes)]

total_count, total_size = 0, 0
for basename, count_size in fileinfo.items():
    print(f"output {basename} files={count_size[0]}, size={human.format_size(count_size[1])}")
    total_count = total_count + count_size[0]
    total_size = total_size + count_size[1]

print(f"total files={total_count}, size={human.format_size(total_size)}")

In [None]:
_elapsed = time.time() - mainstart
print(f"main elapsed time: {human.format_timespan(_elapsed)}")
# 3 min.  for conf.INPUT_MAXSIZE=400MB, when include DATETM

## eof