# Visualizing breakpoint transformation output 

## Usage:
Run all the cells to visualize both breakpoint and processed data as well as to run processed data sanity checks.

## Troubleshooting
Sometimes Spark gets stuck or just fails to read the delta lake. Remedy is to restart the kernel or whole Jupyter lab. If restarting doesn't help then Delta Lake is corrupted for some reason and you need to remove source data, processed data or both. Before restarting writer and processor remove checkpoints too.

* Source data in container file structure: */home/developer/shared_with_host/delta-lake-storage/delta-table-\<deltalake_id\>*
* Processed data in container file structure: */home/developer/shared_with_host/delta-lake-storage/delta-table-processed-\<deltalake_id\>*
* Checkpoints in container file structure: */tmp/checkpoint-\<deltalake_id\>*


In [None]:
from datetime import datetime, timedelta
import itertools

from bokeh.models import ColumnDataSource, DatetimeTickFormatter, HoverTool, BoxSelectTool, WheelZoomTool, ResetTool
from bokeh.plotting import figure, output_file, show
import matplotlib.pyplot as plt

from pyspark.sql.functions import isnan

def double_items(items):
    return list(itertools.chain(*([[item, item] for item in items])))


deltalake_id = '1'
window_size = 1000
timeseries_id='A'

In [None]:
#
# Breakpoint
#

df_breakpoint_deltalake = spark.read.format("delta").load(f"/home/developer/shared_with_host/delta-lake-storage/delta-table-{deltalake_id}")
df_breakpoint = df_breakpoint_deltalake.filter(df_breakpoint_deltalake.id == timeseries_id).toPandas().sort_values(by='eventTime')
print(df_breakpoint_deltalake.show())


#
# Equally sampled
#

df_processed_deltalake = spark.read.format("delta").load(f"/home/developer/shared_with_host/delta-lake-storage/delta-table-processed-{deltalake_id}")
df_processed = df_processed_deltalake.filter(df_processed_deltalake.id == timeseries_id).toPandas().sort_values(by='windowStart')
print(df_processed_deltalake.show())


#
# Plotting
#

breakpoint_source = ColumnDataSource(
    data={
        "breakpoint_time": df_breakpoint['eventTime'],
        "breakpoint_data": df_breakpoint['data'],
    }
)

processed_source = ColumnDataSource(
    data={
        "processed_data_window_start": [item + timedelta(seconds=0.5*1/float(window_size)) for item in df_processed["windowStart"]],
        "processed_data": df_processed['value'],
    }
)

processed_point_source = ColumnDataSource(
    data={
        "processed_data_window_start": df_processed["windowStart"],
        "processed_data": df_processed['value'],
    }
)

processed_step_curve = ColumnDataSource(
    data = {
        "processed_data_window_start" : double_items(df_processed["windowStart"])[1:],
        "processed_data": double_items(df_processed['value'])[:-1]
    }
)


p = figure(
    title="Breakpoint data to equally sampled data",
    x_axis_label='Time',
    y_axis_label='value',
    x_axis_type = "datetime",
)

p.plus(x='breakpoint_time', y='breakpoint_data', legend_label='Breakpoint', color='orange', source=breakpoint_source)
p.square(x='processed_data_window_start', y='processed_data', legend_label='Processed point', color='red', source=processed_point_source)
p.line(x='processed_data_window_start', y='processed_data', source=processed_step_curve)

output_file("breakpoint_to_equally_sampled.html")
#p.plot_width=2400
#p.plot_height=1200

formatter = ["%Y-%m-%d %H:%M:%S.%3Ns"]
p.xaxis.formatter=DatetimeTickFormatter(
        seconds=formatter,
        minutes=formatter,
        hours=formatter,
        days=formatter,
        months=formatter,
        years=formatter,
    )

show(p)

plt.figure(figsize=(50, 10))
plt.plot(
    [item + timedelta(seconds=0.5*1/float(window_size)) for item in df_processed["windowStart"]],
    df_processed['value'],
    's'
)

plt.plot(df_breakpoint['eventTime'], df_breakpoint['data'], 'x')
plt.title('Breakpoint to equally sampled')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend(['Breakpoint', 'Equally sampled'])

## Sanity checks

In [None]:
assert df_processed_deltalake.filter(isnan(df_processed_deltalake.value)).count() == 0  # NaNs
assert df_processed_deltalake.filter(df_processed_deltalake.value.isNull()).count() == 0 # Nulls
assert df_processed_deltalake.filter(df_processed_deltalake.id=='A').groupBy("windowStart").count().filter("count > 1").count() == 0  # No duplicates test
assert df_processed_deltalake.filter(df_processed_deltalake.id=='B').groupBy("windowStart").count().filter("count > 1").count() == 0  # No duplicates test