<a href="https://colab.research.google.com/github/yinwongtrains/Door-Opening-Closing/blob/main/A23_Door_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
from bokeh.plotting import figure, show
from bokeh.io import output_notebook
from bokeh.layouts import column


In [2]:
# read the csv file and clean it up.
df = (pd
      .read_csv('A23_door_opening (1).csv')
      .pivot(index=['SNAPSHOT_DATETIME'], columns=['SIGNAL_NAME'], values='SIGNAL_VALUE_N')
      .reset_index()
      .sort_values(by='SNAPSHOT_DATETIME')
)

df['SNAPSHOT_DATETIME'] = pd.to_datetime(df['SNAPSHOT_DATETIME'])

display(df)
print(f'\nCheck number of signals >> {len(df.columns)}')

FileNotFoundError: [Errno 2] No such file or directory: 'A23_door_opening (1).csv'

In [None]:
# USER INPUTS

# define the signal (door and opening/closing)
signal = 'RT4DCRDCT6'
renamed_signal = 'Door Close'

# define start and end times
t1 = '2022-07-01'
t2 = '2022-07-18'

# define the rolling window
rolling_window = 30
frequency = '6H'

t1 = pd.Timestamp(t1)
t2 = pd.Timestamp(t2)

In [None]:

# CALCULATION


df_working = (df[['SNAPSHOT_DATETIME', signal]]
              .rename(columns={signal: renamed_signal})
              .loc[(df['SNAPSHOT_DATETIME'] >= t1) & (df['SNAPSHOT_DATETIME'] <= t2)]
              .loc[(df[signal] < 60)] # eliminate outliers
              .set_index('SNAPSHOT_DATETIME')
)

df_working['SNAPSHOT_DATETIME'] = df_working.index

df_working['Rolling Mean'] = df_working[renamed_signal].rolling(frequency).mean()
df_working['Rolling Median'] = df_working[renamed_signal].rolling(frequency).median()
df_working['Rolling Std'] = df_working[renamed_signal].rolling(frequency).std()

display(df_working)


output_notebook()

# Create a figure object
p = figure(title=f'{renamed_signal}', x_axis_type="datetime", width=1000)

# Add a scatter plot
p.scatter(df_working['SNAPSHOT_DATETIME'], df_working[renamed_signal], color='blue', alpha=0.5, legend_label=f'{renamed_signal}')
p.scatter(df_working['SNAPSHOT_DATETIME'], df_working['Rolling Mean'], color='red', alpha=0.5, legend_label=f'Rolling Mean {frequency}')
p.scatter(df_working['SNAPSHOT_DATETIME'], df_working['Rolling Median'], color='green', alpha=0.5, legend_label=f'Rolling Median {frequency}')
# p.scatter(df_working['SNAPSHOT_DATETIME'], df_working['Rolling Std'], color='orange')

p.legend.label_text_font_size = "10pt"
p.legend.location = "top_left"

#######################

p2 = figure(title=f'{renamed_signal}', x_axis_type="datetime", width=1000)

p2.scatter(df_working['SNAPSHOT_DATETIME'], df_working['Rolling Std'], color='orange', alpha=0.5, legend_label=f'Rolling Std {frequency}')

p2.legend.label_text_font_size = "10pt"
p2.legend.location = "top_left"

########################

# Show the plot
show(column([p, p2]))



**Removing adjacent duplicatess**

In [None]:
df_reduce = (df[['SNAPSHOT_DATETIME', signal]]
              .rename(columns={signal: renamed_signal})
              .loc[(df['SNAPSHOT_DATETIME'] >= t1) & (df['SNAPSHOT_DATETIME'] <= t2)]
              # .loc[(df[signal] < 60)] # eliminate outliers
              .set_index('SNAPSHOT_DATETIME')
)

df_reduce['SNAPSHOT_DATETIME'] = df_reduce.index

# calculate the difference between each value and the previous to identify if it is the same
df_reduce['diff'] = df_reduce[renamed_signal].diff()
df_reduce = df_reduce[df_reduce['diff']!=0]

df_reduce['Rolling Mean'] = df_reduce[renamed_signal].rolling(frequency).mean()
df_reduce['Rolling Median'] = df_reduce[renamed_signal].rolling(frequency).median()
df_reduce['Rolling Std'] = df_reduce[renamed_signal].rolling(frequency).std()

display(df_reduce)

output_notebook()

# Create a figure object
p = figure(title=f'{renamed_signal}', x_axis_type="datetime", width=1000)

# Add a scatter plot
p.scatter(df_reduce['SNAPSHOT_DATETIME'], df_reduce[renamed_signal], color='blue', alpha=0.5, legend_label=f'{renamed_signal}')
p.scatter(df_reduce['SNAPSHOT_DATETIME'], df_reduce['Rolling Mean'], color='red', alpha=0.5, legend_label=f'Rolling Mean {frequency}')
p.scatter(df_reduce['SNAPSHOT_DATETIME'], df_reduce['Rolling Median'], color='green', alpha=0.5, legend_label=f'Rolling Median {frequency}')
# p.scatter(df_working['SNAPSHOT_DATETIME'], df_working['Rolling Std'], color='orange')

p.legend.label_text_font_size = "10pt"
p.legend.location = "top_left"

#######################

p2 = figure(title=f'{renamed_signal}', x_axis_type="datetime", width=1000)

p2.scatter(df_reduce['SNAPSHOT_DATETIME'], df_reduce['Rolling Std'], color='orange', alpha=0.5, legend_label=f'Rolling Std {frequency}')

p2.legend.label_text_font_size = "10pt"
p2.legend.location = "top_left"

########################

# Show the plot
show(column([p, p2]))

**Repeat on door closing time**

In [None]:

def run_everything(df_input, signal, t1, t2, frequency, calc_method, plot):

  # USER INPUTS


  if signal[8] == ['C']:
    renamed_signal = 'Door Close'
  elif signal[8] == ['O']:
    renamed_signal = 'Door Open'
  else:
    renamed_signal = 'Unknown Signal'

  t1 = pd.Timestamp(t1)
  t2 = pd.Timestamp(t2)

  df_working = reduce_to_rolling(df_input, renamed_signal)
  df_working = calc_stats(df_working, renamed_signal, calc_method)

  if plot == 1:
    plot_results(df_working, renamed_signal)

  return df_working

def reduce_to_rolling(df_input, renamed_signal):
  df_working = df_input.copy()

  df_working = (df[['SNAPSHOT_DATETIME', signal]]
                .rename(columns={signal: renamed_signal})
                .loc[(df['SNAPSHOT_DATETIME'] >= t1) & (df['SNAPSHOT_DATETIME'] <= t2)]
                # .loc[(df[signal] < 60)] # eliminate outliers
                .set_index('SNAPSHOT_DATETIME')
  )

  df_working['SNAPSHOT_DATETIME'] = df_working.index

  # calculate the difference between each value and the previous to identify if it is the same
  df_working['diff'] = df_working[renamed_signal].diff()
  df_working = df_working[df_working['diff']!=0]

  return df_working

def calc_stats(df_input, renamed_signal, calc_method):
  df_working = df_input.copy()

  if calc_method == 'rolling':
    df_working['Rolling Mean'] = df_working[renamed_signal].rolling(frequency).mean()
    df_working['Rolling Median'] = df_working[renamed_signal].rolling(frequency).median()
    df_working['Rolling Std'] = df_working[renamed_signal].rolling(frequency).std()

  if calc_method == 'grouping':
    df_working = (df_working
                  .groupby(pd.Grouper(freq=frequency)).agg({renamed_signal: ['mean', 'median', 'std']})
                  # .reset_index(drop=True)
    )
    # df_working.columns = ['Grouped Mean', 'Grouped Median', 'Grouped Std']

  return df_working

def plot_results(df_input, renamed_signal):
  output_notebook()

  # Create a figure object
  p = figure(title=f'{renamed_signal}', x_axis_type="datetime", height=300, width=1000)

  # Add a scatter plot
  p.scatter(df_input['SNAPSHOT_DATETIME'], df_input[renamed_signal], color='blue', alpha=0.5, legend_label=f'{renamed_signal}')
  p.scatter(df_input['SNAPSHOT_DATETIME'], df_input['Rolling Mean'], color='red', alpha=0.5, legend_label=f'Rolling Mean {frequency}')
  p.scatter(df_input['SNAPSHOT_DATETIME'], df_input['Rolling Median'], color='green', alpha=0.5, legend_label=f'Rolling Median {frequency}')

  p.legend.label_text_font_size = "10pt"
  p.legend.location = "top_left"

  #######################

  p2 = figure(title=f'{renamed_signal}', x_axis_type="datetime", height=300, width=1000)

  p2.scatter(df_input['SNAPSHOT_DATETIME'], df_input['Rolling Std'], color='orange', alpha=0.5, legend_label=f'Rolling Std {frequency}')

  p2.legend.label_text_font_size = "10pt"
  p2.legend.location = "top_left"

  ########################

  # Show the plot
  show(column([p, p2]))

In [None]:
# define start and end times
signal = 'RT4DCRDCT6'
t1 = '2022-07-01'
t2 = '2022-07-18'
frequency = '6H'
calc_method = 'grouping'
plot = 0

df_out = run_everything(df, signal, t1, t2, frequency, calc_method, plot)
display(df_out)