<a href="https://colab.research.google.com/github/pojo-25/AdvancedTensorflow/blob/main/motion_exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# A Simple Exercise on Unsupervised ML / Anomaly Detection
*Confidentiality Notice*: Distribution of this material to anyone without explicit written consent of Synnada, Inc. is not allowed.

Copyright Ⓒ Synnada, Inc., All Rights Reserved.

## Context

In this exercise, you will get to develop your own algorithm(s), or apply any unsupervised ML and/or anomaly detection technique(s) of your choice, to crack a (very) low-dimensional toy problem that will give you a small taste of the bigger problems we will be dealing with at Synnada.

## The Problem

We will be dealing with several scenarios where we observe objects whose "usual" motions exhibit some regularity. In every scenario we study, these objects will temporarily deviate from their usual trajectories. For example, they may go off-course or change their speed and/or direction.

The task is to create an algorithm that takes in simple observations (i.e. triples of the form (timestamp, x-coordinate measurement, y-coordinate measurement)) **one by one**, updates its state, and outputs a score value that correlates with how "surprising" the observation is. You will use these scores to announce alerts when the score is above (or below) a threshold of your choice. Your algorithm should have *at least* the following two hyper-parameters:
1. The alerting threshold.
2. The warm-up duration; i.e. the time interval during which your algorithm does not announce *any* alerts. During this time, the algorithm should just take in observations to update its state and learn.

Once you have your algorithm (or algorithms; if you work on multiple approaches) ready, apply your favorite hyper-parameter sweeping/optimization technique to explore the false positive/negative trade-off of your algorithm(s). We suggest you to treat the alerting threshold separately from the other hyper-parameters and compute a threshold-independent metric such as the [AUC](https://en.wikipedia.org/wiki/Receiver_operating_characteristic#:~:text=ROC%20curve%2C%20or%20%22-,AUC,-%22%20(%22area%20under%20curve) to guide your search.

Finally, do not forget to comment on the computational (time and memory) complexity and/or scalability of your approach(es).

## The Notebook

This notebook starts with various function definitions, which enable us to construct and visualize the scenarios we will study. We **strongly** suggest you to take a look at them for three reasons:
1. You will be able to use these functions to construct scenarios of your own in order to test your algorithms and push them to their limits.
2. You can use the animation code as a template to construct useful visualizations when debugging/analyzing your algorithms.
3. Studying our code should give you an idea on our style of software engineering here at Synnada.

Next, the notebook offers three example scenarios you can use to get going.

Finally, we have a section that contains the skeleton code you will use to develop your algorithm and run simulations.

## Warnings and Suggestions

Although we are confident that you are already aware of the usual mistakes to steer away from, it is probably still worth to make a few remarks/suggestions:
- Avoid overfitting to specific trajectory shapes. You can test whether this happens by constructing new scenarios.
- If you decide to do hyper-parameter optimization, and do not already have a favorite library for this purpose, we suggest you to check out [Optuna](https://optuna.org).
- Explore how your algorithm(s) behave(s) as you increase the warm-up duration. Tweak the scenarios so that the object spends more time in its "usual" motion and vary your warm-up duration. You should be able to improve your false positive/negative characteristics by lengthening your warm-up duration. If that is not the case, you may be doing something wrong.
- Explore the noise tolerance of your techniques. How do things change with increasing/decreasing noise? If what you see during your analysis is surprising, you may be doing something wrong.
- Explore how your algorithm(s) react as you change the decision boundary (i.e. the alerting threshold). If what you see surprises you, there may be a bug somewhere.

In [None]:
from typing import Tuple, Dict, List, Callable, Any, Optional, Union

import numpy as np

# Type definition for functions that operate on NumPy arrays:
UFunc = Callable[[np.ndarray], np.ndarray]

def generate_sampling_times(duration: float,                            
                            *,
                            avg_sampling_interv: float = 1.0,
                            include_first: bool = True,
                            include_last: bool = False) -> np.ndarray:
  # This function returns an array of timestamps with Poisson arrivals.

  # The argument "duration" specifies the total simulation time.
  # The argument "avg_sampling_interv" specifies the average sampling interval.
  # Arguments "include_first" and "include_last" specify whether we include
  # initial and final timestamps in the result.

  # Validate arguments:
  if duration <= 0:
    raise ValueError('Argument "duration" has to be positive!')  
  if avg_sampling_interv <= 0:
    raise ValueError('Argument "avg_sampling_interv" has to be positive!')

  # We can simulate Poisson arrivals by accumulating exponential intervals.
  ts, times = 0.0, []
  if include_first:
    times.append(0.0)
  # Generate samples until we hit the finish timestamp:
  while ts < duration:
    ts += np.random.exponential(avg_sampling_interv)
    if ts <= duration:
      times.append(ts)
  # If applicable, add the final timestamp:
  if include_last and not (times and (times[-1] == duration)):
    times.append(duration)
  return np.array(times)

def add_noise(trace: np.ndarray,
              noise_magnitude: float,
              *,
              inplace: bool = False) -> np.ndarray:
  # This function superimposes additive white Gaussian noise on a given array
  # of measurements (i.e. [(ts, x, y), ...]). Both coordinates couple with the
  # noise independently.

  # The argument "trace" denotes the array of measurements.
  # The argument "noise_magnitude" specifies the noise standard deviation.
  # The argument "inplace" denotes whether the addition occurs in-place.

  # Validate arguments:
  if noise_magnitude < 0:
    raise ValueError('Argument "noise_magnitude" has to be non-negative!')  

  # Operate on a copy if necessary:
  if not inplace:
    trace = trace.copy()
  if noise_magnitude > 0:
    rows, cols = trace.shape
    view_shape = (rows, cols - 1)
    # Add noise to all columns except the first, which contains timestamps:
    trace[:, 1: ] += np.random.normal(0.0, noise_magnitude, view_shape)
  return trace

def create_template_trace(duration: float,
                          position_gen: UFunc,
                          *,
                          start_ts: float = 0.0,
                          noise_magnitude: float = 0.0,
                          **kwargs) -> np.ndarray:
  # This function generates an array of triples (i.e. [(ts, x, y), ...]) where
  # entries denote positional measurements of an object moving on some curve.

  # The argument "position_gen" is a function that takes in an array of times
  # (starting with zero) and generates an array of noise-free observations on
  # the curve.
  # The argument "start_ts" denotes the initial timestamp to use as an offset.  
  # The argument "noise_magnitude" specifies the magnitude of additive white
  # Gaussian noise in our observations.
  # See the function "generate_sampling_times" for keyword-arguments.

  # Generate timestamps:
  times = generate_sampling_times(duration, **kwargs)
  # Generate the trace without measurement noise:    
  trace = position_gen(times)
  # Offset timestamps appropriately:
  trace[:, 0] += start_ts
  # Add measurement noise if applicable:
  return add_noise(trace, noise_magnitude, inplace = True)

def create_circular_trace(duration: float,
                          *,
                          init_pos: Tuple[float, float] = (1.0, 0.0),
                          speed: float = 1.0,                          
                          **kwargs) -> np.ndarray:
  # This function computes an array of triples (i.e. [(ts, x, y), ...]) where
  # entries denote positional measurements of an object in uniform circular
  # motion around the origin.

  # The argument "init_pos" specifies the starting position.
  # The argument "speed" specifies the speed of the object. Negative (positive)
  # values imply (counter-)clockwise motion.
  # See the function "create_template_trace" for keyword-arguments.

  # Calculate initial radius and angle:
  init_x, init_y = init_pos
  radius = np.sqrt(init_x ** 2 + init_y ** 2)
  angle = np.arctan2(init_y, init_x)
  omega = speed / radius
  # Verify that we sample frequently enough:
  if kwargs.get('avg_sampling_interv', 1.0) > abs(np.pi / omega):
    raise ValueError('Argument "arg_sampling_interv" implies sub-Nyquist sampling!')

  # Define the function that generates the curve:
  def circle(times: np.ndarray) -> np.ndarray:
    x = radius * np.cos(angle + omega * times)
    y = radius * np.sin(angle + omega * times)
    return np.vstack((times, x, y)).T

  return create_template_trace(duration, circle, **kwargs)

def create_linear_trace(duration: float,
                        *,
                        init_pos: Tuple[float, float] = (0.0, 0.0),
                        velocity: Tuple[float, float] = (1.0, 0.0),
                        **kwargs) -> np.ndarray:
  # This function computes an array of triples (i.e. [(ts, x, y), ...]) where
  # entries denote positional measurements of an object in uniform linear
  # motion.

  # The argument "init_pos" specifies the starting position.
  # The argument "velocity" specifies the (vector) velocity of the object. 
  # See the function "create_template_trace" for keyword-arguments.

  # Define the function that generates the curve:
  def line(times: np.ndarray) -> np.ndarray:
    x = init_pos[0] + velocity[0] * times
    y = init_pos[1] + velocity[1] * times
    return np.vstack((times, x, y)).T

  return create_template_trace(duration, line, **kwargs)

def concat_traces(trace_configs: Tuple[str, Dict[str, Any]],
                  **kwargs) -> np.ndarray:
  # This function creates a composite trace by concatenating multiple simple
  # traces. As before, it outputs an array of triples (i.e. [(ts, x, y), ...]).
  # Each constituent simple trace begins where its predecessor ends.

  # The argument "trace_configs" contains the arguments to use when generating
  # constituent trace/segments.
  # See the function "create_template_trace" for keyword-arguments. These
  # arguments apply to all constituent traces as default values in case the
  # argument "trace_configs" does not specify things with fine granularity.

  # Example usage:
  #
  # concat_traces(trace_configs = [
  #   ('circular', {'duration': 6.0 * np.pi, 'init_pos': (1.0, 0.0)}),
  #   ('circular', {'duration': 6.0 * np.pi, 'speed': anomaly_speed}),
  #   ('circular', {'duration': 3.0 * np.pi})
  #  ],
  #  avg_sampling_interv = 0.1,
  #  noise_magnitude = 0.02
  # )

  def use_trace(*, data: np.ndarray, start_ts: float, **kwargs) -> np.ndarray:
    # This function enables us to use a given trace as a component of the
    # composite trace we are building.
    data = data.copy()
    data[:, 0] += start_ts
    return data

  # We support the following elementary traces:
  trace_dict = {
    'circular': create_circular_trace,
    'linear': create_linear_trace,
    'immediate': use_trace
  }
  segments, current_ts, current_pos = [], 0.0, (0.0, 0.0)
  # Use given trace configurations to generate elementary segments:  
  for idx, (shape, args) in enumerate(trace_configs):
    # Construct the trace configuration for the current segment:
    spec = {**kwargs, **args}
    spec.setdefault('start_ts', current_ts)
    spec.setdefault('init_pos', current_pos)
    spec.setdefault('include_first', idx == 0)
    spec.setdefault('include_last', True)
    # Save the applicable noise magnitude for the current segment:
    noise_mag = spec.pop('noise_magnitude', 0.0)
    # Note that we generate the current segment without noise first. We have to
    # do this; because concatenating noisy segments introduces drifts. Once we
    # compute a "noiseless" endpoint for the segment, we can superimpose noise.
    segment = trace_dict[shape](**spec)
    # Save the endpoint of this segment, it will be the next initial point:
    current_ts, *current_pos = segment[-1, :]
    # Superimpose noise on the current segment:
    segment = add_noise(segment, noise_mag, inplace = True)
    segments.append(segment)

  return np.concatenate(segments)

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
from matplotlib import animation
plt.rcParams["animation.html"] = "jshtml"

def create_animation(trace: np.ndarray,
                     min_x: float, max_x: float,
                     min_y: float, max_y: float):
  # This function creates an animation of the given trace, which is a list of
  # triples of the form [(ts, x, y), ...] where every entry corresponds to a
  # positional observation of an object moving on some curve.

  # Setup the baseline figure, the axis and the limits:
  fig = plt.figure()
  ax = plt.gca()
  ax.set_xlim((min_x, max_x))
  ax.set_ylim((min_y, max_y))
  ax.set_aspect('equal', 'box')
  txt_title = fig.suptitle('')
  # Initialize an empty scatter plot:
  sc_history = plt.scatter([], [], c = 'red')
  sc_current = plt.scatter([], [], c = 'black')

  def draw_frame(frame_idx: int):
    # This function runs every frame. Using ever-growing array slices, we
    # progressively show more samples every frame.
    sc_history.set_offsets(trace[: frame_idx, 1: ])
    sc_current.set_offsets(trace[frame_idx, 1: ])
    txt_title.set_text('Observation #{0:4d}'.format(frame_idx))
    return (sc_history, sc_current)

  # Create the actual animation object:
  result = animation.FuncAnimation(fig,
                                   draw_frame,
                                   frames = trace.shape[0],
                                   interval = 100,
                                   blit = True)
  # The plotting library spuriously plots the baseline figure, inhibit that:
  plt.close()
  return result

## Scenarios

Below, we define functions that generate observational datasets for three example scenarios (i.e. functions `scenario_XXX`). These functions return two arrays:
- The observations,
- A mask indicating which observations are anomalous.

**Do not** interpret/use these masks as supervision signals (i.e. labels). We are generating this information so that you can quantify how well your algorithms do -- not for training.

In [None]:
def create_square(**kwargs) -> np.ndarray:
  # This function creates the trace of an object moving on the unit square with
  # unit speed.
  # See the functions "generate_sampling_times" and "create_template_trace" for
  # keyword-arguments.
  return concat_traces(trace_configs = [
      ('linear', {'duration': 1.0, 'init_pos': (1.0, 0.0), 'velocity': (0.0, 1.0)}),
      ('linear', {'duration': 2.0, 'velocity': (-1.0, 0.0)}),
      ('linear', {'duration': 2.0, 'velocity': (0.0, -1.0)}),
      ('linear', {'duration': 2.0, 'velocity': (1.0, 0.0)}),
      ('linear', {'duration': 1.0, 'velocity': (0.0, 1.0)})
    ],
    **kwargs
  )

ScenarioResult = Tuple[np.ndarray, np.ndarray]

def compute_labels(times: np.ndarray,
                   regular_duration: float,
                   anomaly_duration: float) -> ScenarioResult:
  # This function returns a boolean array indicating whether observations in
  # the given synthetic dataset are anomalous or not.
  # The argument "times" denotes the array of observation timestamps.
  # Arguments "regular_duration" and "anomaly_duration" specify the time
  # interval for which the object temporarily deviates from its regular course.
  return np.logical_and(times < regular_duration + anomaly_duration,
                        times > regular_duration)

def scenario_change_speed(regular_cycles: int,
                          new_speed: float,
                          **kwargs) -> ScenarioResult:
  # This function constructs a scenario where the object first makes several
  # tours on the unit circle, then changes its speed for a while, and finally
  # reverts back to its normal motion.

  # The argument "regular_cycles" specifies how many cycles the object goes
  # through before the anomalous section of the trace.
  # The argument "new_speed" specifies the speed in the anomalous section of
  # the trace.
  # See the functions "generate_sampling_times" and "create_template_trace" for
  # keyword-arguments.
  anomaly_duration = np.pi
  regular_duration = 2.0 * np.pi * regular_cycles
  trace = concat_traces(trace_configs = [
      ('circular', {'duration': regular_duration, 'init_pos': (1.0, 0.0)}),
      ('circular', {'duration': anomaly_duration, 'speed': new_speed}),
      ('circular', {'duration': 2.0 * np.pi})
    ],
    **kwargs
  )
  return trace, compute_labels(trace[:, 0], regular_duration, anomaly_duration)
  
def scenario_off_circle(regular_cycles: int, **kwargs) -> ScenarioResult:
  # This function constructs a scenario where the object first makes several
  # tours on the unit circle, then leaves the unit circle and moves on the unit
  # square for a while, and finally reverts back to its normal motion.
  
  # The argument "regular_cycles" specifies how many cycles the object goes
  # through before the anomalous section of the trace.
  # See the functions "generate_sampling_times" and "create_template_trace" for
  # keyword-arguments.
  anomaly_duration = 8.0
  regular_duration = 2.0 * np.pi * regular_cycles
  trace = concat_traces(trace_configs = [
      ('circular', {'duration': regular_duration, 'init_pos': (1.0, 0.0)}),
      ('immediate', {'data': create_square(**{**kwargs, 'noise_magnitude': 0.0})}),
      ('circular', {'duration': 2.0 * np.pi})
    ],
    **kwargs
  )
  return trace, compute_labels(trace[:, 0], regular_duration, anomaly_duration)
  
def scenario_off_square(regular_cycles: int, **kwargs) -> ScenarioResult:
  # This function constructs a scenario where the object first makes several
  # tours on the unit square, then leaves the unit square and moves on the unit
  # circle for a while, and finally reverts back to its normal motion.

  # The argument "regular_cycles" specifies how many cycles the object goes
  # through before the anomalous section of the trace.
  # See the functions "generate_sampling_times" and "create_template_trace" for
  # keyword-arguments.
  anomaly_duration = 2.0 * np.pi
  regular_duration = 8.0 * regular_cycles  
  make_square = lambda: create_square(**{**kwargs, 'noise_magnitude': 0.0})  
  trace = concat_traces(trace_configs = [
      *[('immediate', {'data': make_square()}) for _ in range(regular_cycles)],
      ('circular', {'duration': anomaly_duration}),
      ('immediate', {'data': make_square()})
    ],
    **kwargs
  )
  return trace, compute_labels(trace[:, 0], regular_duration, anomaly_duration)

In [None]:
# Create three example scenarios:
trace_rev, anomalies_rev = scenario_change_speed(regular_cycles = 5,
                                                 new_speed = -1.0,
                                                 avg_sampling_interv = 0.1,
                                                 noise_magnitude = 0.02)
trace_cs, anomalies_cs = scenario_off_circle(regular_cycles = 5,
                                             avg_sampling_interv = 0.1,
                                             noise_magnitude = 0.02)
trace_sc, anomalies_sc = scenario_off_square(regular_cycles = 5,
                                             avg_sampling_interv = 0.1,
                                             noise_magnitude = 0.02)

In [None]:
# You can use this function to visualize a scenario:
create_animation(trace_sc,
                 min_x = -1.1,
                 max_x = 1.1,
                 min_y = -1.1,
                 max_y = 1.1)

## Skeleton Code

Below, we define the base class you will derive from when formulating your algorithms. There are also a few utility functions to help get things going.

Here is some food for thought, just in case it helps:
- Both parametric and non-parametric approaches are applicable here, each having its advantages and drawbacks. Consider experimenting with both.
- For parametric approaches, figuring out the appropriate update rule for your parameters (in the `update` call) constitutes the core challenge.
- For non-parametric approaches, you will probably store the observations you have seen so far in a data structure, and query this data structure to compute scores. The choice, implementation and/or the usage of this data structure constitutes the core challenge.
- Do not forget that you need not reinvent the wheel; feel free to use any Python library of your choice to make your life easy.

We are purposefully refraining from providing an entire simulation/development environment because we want to see **your genuine approach in structuring your work**. You will probably need to:
- Create workflows to evaluate/compare candidate algorithms without choosing threshold values (for example, via a metric such as AUC).
- Create an evaluation framework to choose the best hyper-parameters for a given algorithm. We suggest designing this framework so that you can use multiple datasets when evaluating algorithms.

In [None]:
import abc

# You will extend this class to implement algorithm(s).
class BaseAlgorithm(abc.ABC):
  def __init__(self, warmup_duration: float):
    self.last_ts = -float('inf')
    self.warmup_ts = warmup_duration

  def process(self, ts: float, *values: float) -> Optional[float]:
    # This function processes every incoming observation. The function returns
    # NaN scores until the algorithm warms up.

    # Verify arguments:
    if ts < self.last_ts:
      raise ValueError('Non-monotonic timestamp, check the data!')
    # Save the timestamp when warm-up will complete:
    if self.last_ts == -float('inf'):
      self.warmup_ts += ts
    score = self.update(ts, *values)
    self.last_ts = ts    
    if ts >= self.warmup_ts:
      return score
    return float('nan')

  @abc.abstractmethod
  def update(self, ts: float, *values: float) -> float:
    # Override this function to process every incoming observation.
    raise NotImplementedError()

# Below, we give a memoryless algorithm that overfits to the unit circle.
# NOTE: This is an example of what NOT to do! We are just providing this to
#       demonstrate things.
class StupidAlgorithm(BaseAlgorithm):
  def update(self, ts: float, *values: float) -> float:
    radius = np.sqrt(np.dot(values, values))
    return abs(radius - 1.0)

def get_scores(trace: np.ndarray, algo: BaseAlgorithm) -> np.ndarray:
  # This function runs the given trace (the argument "trace") through the given
  # algorithm (the argument "algo") and outputs an array of scores.
  return np.asarray([algo.process(*observation) for observation in trace])

def get_alerts(scores: np.ndarray, threshold: float) -> np.ndarray:
  # This function compares the given score array (the argument "scores") with
  # the given threshold value (the argument "threshold") to decide alerts.
  return scores >= threshold

def compute_fpnr(trace: np.ndarray,
                 anomalies: np.ndarray,
                 algo: BaseAlgorithm,
                 threshold: float) -> Tuple[float, float]:
  # This function computes the false positive rate and the false negative rate
  # of the given algorithm (the argument "algo") for the given dataset (the
  # argument "trace") under a threshold value (the argument "threshold").
  # The argument "anomalies" give the ground truth we use in our calculations.
  scores = get_scores(trace, algo)
  alerts = get_alerts(scores, threshold)
  regulars = np.logical_not(anomalies)
  false_positives = np.logical_and(alerts, regulars)
  false_negatives = np.logical_and(np.logical_not(alerts), anomalies)
  fpr = np.sum(false_positives) / np.sum(regulars)
  fnr = np.sum(false_negatives) / np.sum(anomalies)
  return fpr, fnr

# See that the example algorithm works OK for the scenario we overfitted it to:
print(compute_fpnr(trace_cs, anomalies_cs, StupidAlgorithm(warmup_duration = 1.0), 0.025))
# But it fails miserably in one of the other scenarios:
print(compute_fpnr(trace_sc, anomalies_sc, StupidAlgorithm(warmup_duration = 1.0), 0.025))

(0.16414141414141414, 0.2127659574468085)
(0.7728155339805826, 0.6964285714285714)


## Bonus

We did not consider expiring observations (i.e. aging observations we want to neglect or de-emphasize) so far. To account for this, we would need to add another hyper-parameter (e.g. `memory_duration` or `half_life`) to our algorithm(s).

A simple yet typical approach is to maintain a queue of active (i.e. recent enough with respect to this new hyper-parameter) observations and update our algorithms' state whenever an observation expires. One can do this in the `update` function by consulting this queue and doing the appropriate calculations to "unlearn" observations we remove from the queue.

IRL, expiring aging observations is important for two reasons:
1. You can not implement non-parametric techniques in practice unless you limit the memory.
2. Drifts and/or slow structural changes in systems/datasets require us to expire aging observations to make certain techniques adaptable.

For example, imagine a scenario where:
- The object slowly drifts towards some direction during its regular motion,
- After some time, it suddenly reverts to its original course.

In order to recognize such behaviors as anomalous with elementary techniques, expiring aging observations using the above approach can come in handy.

If you had fun working on the above challenge, feel free to construct such scenarios and explore how you can account for expiring observations.