In [1]:
import numpy as np
import scipy.stats as st

In [258]:
class frocc:
  def __init__(self, m, eps, d, seed=None) -> None:
    if (seed != None):
      np.random.seed(seed)
    self.m = m
    self.eps = eps
    self.d = d

    # Generate random vectors from d-dim unit sphere
    # by x/|x|, x ~ N(0,I_d)
    random_vectors = st.multivariate_normal.rvs(np.zeros(d), np.eye(d), m)
    self.random_proj = np.divide(
      random_vectors, 
      np.linalg.norm(random_vectors, axis=1).repeat(d).reshape(m,d)
    )
    self.outlier_intervals = np.array([])
    self.min_values = np.array([])
    self.max_values = np.array([])
    
  def train(self, data):
    N, d_data = data.shape
    if d_data != self.d:
      raise ValueError("Dimensions must match. d = " + str(d) + " != " + str(d_data) + " = d_data.")

    # projection onto unit vector = dot product
    dot_products = self.random_proj @ data.T
    min_values = np.min(dot_products, axis=1)
    max_values = np.max(dot_products, axis=1)

    # Need to min-max normalize and sort projections to
    # create outlier intervals along each projection vector
    scaled_dot_products = np.divide(
      dot_products - min_values.repeat(N).reshape(self.m,N),
      (max_values - min_values).repeat(N).reshape(self.m,N)
    )
    sorted_scaled_dot_products = np.sort(scaled_dot_products, axis=1)

    # Find indices where outlier intervals start
    two_d_interval_indices = np.argwhere(
      np.diff(sorted_scaled_dot_products, axis=1) >= eps
    )
    outlier_break_points = [[ 
      index[1] for index in two_d_interval_indices if index[0] == i 
    ] for i in range(self.m) ]

    self.min_values = min_values
    self.max_values = max_values
    self.outlier_intervals = [[ 
      [
        sorted_scaled_dot_products[i,index],
        sorted_scaled_dot_products[i,index+1]
      ] for index in outlier_break_points[i] 
    ] for i in range(self.m)]

  def test(self, data):
    N, d_data = data.shape
    if d_data != self.d:
      raise ValueError("Dimensions must match. d = " + str(d) + " != " + str(d_data) + " = d_data.")

    # Find projection of testing data and rescale
    # to the same scale as training data
    projection_new_data = self.random_proj @ data.T
    scaled_new_data = np.divide(
      projection_new_data.reshape(self.m,N) -
        self.min_values.repeat(N).reshape(self.m,N),
      (self.max_values - self.min_values).repeat(N).reshape(self.m,N)
    )

    # Outliers are either more extreme than any training data
    # or are within some outlier intervals
    outliers_new_data = np.array([
      [  
        scaled_new_data[i,j] > 1 or 
        scaled_new_data[i,j] < 0 or 
        any([
          scaled_new_data[i,j] > outlier[0] and
          scaled_new_data[i,j] < outlier[1]
          for outlier in outlier_intervals[i] ]) 
        for i in range(self.m)
      ]
      for j in range(N)
    ])
    # print(str(outliers_new_data))
    return np.any(outliers_new_data, axis=1)

In [313]:
# m = number of random projection vectors
# d = data dimension
# N = number of data points
m = 5
d = 20
N = 1000
N_new = 100
eps = 0.1

In [314]:
# Generate some training data
np.random.seed(1000)
data = st.multivariate_normal.rvs(np.zeros(d), np.eye(d), size = N)

# And some testing data
new_data = st.multivariate_normal.rvs(np.ones(d) * 3, np.eye(d), size = N_new)

In [315]:
detector = frocc(m, eps, d, seed=1234)
detector.train(data)
detector.test(new_data).mean()

0.92