In [1]:
# https://gist.github.com/tomjemmett/c167376e5b6464ec1c00975be2d7864e

import numpy as np
from collections import namedtuple

def seven_point_one_side_mean(relative_to_mean):
    # pad the vector with 6 zero's at the beginning
    vp = np.insert(relative_to_mean, 0, [0] * 6)
    
    return [
      np.all(vp[i + 6] == vp[i:(i + 6)])# and (vp[i + 6] != 0)
      for i in range(len(relative_to_mean))
    ]

def seven_point_trend(values):
    # edge case: len(values) < 7
    if len(values) < 7:
      return np.zeros_like(values).tolist()

    diff = ([0] * 6) + np.diff(values).tolist()

    all_same = lambda x: np.sign(x) if abs(x) == 6 else 0

    return [
        all_same(sum(diff[i:(i + 6)]))
        for i, v in enumerate(values)
    ]

def part_of_seven_trend(values):
    # pad the vector with 6 zero's at the end
    vp = np.insert(values, len(values), [0] * 6)

    return [
      np.any(np.abs(vp[i:(i + 7)]) == 1)
      for i in range(len(values))
    ]

def two_in_three(close_to_limits, relative_to_mean):
  if len(close_to_limits) == 0:
    return []
  # pad the vectors with two 0 at start, two 0 at end
  close_to_limits_pad = np.pad(close_to_limits, 2, "constant", constant_values=False)
  relative_to_mean_pad = np.pad(relative_to_mean, 2, "constant", constant_values=0) # relative to mean

  return [
      np.any([
          sum(close_to_limits_pad[j:(j+3)]) >= 2 and abs(sum(relative_to_mean_pad[j:(j+3)])) == 3
          for j in range(i, i+3)
      ])
      for i in range(len(close_to_limits))
  ]

def part_of_two_in_three(two_in_three, close_to_limits):
  return [
    i and j
    for i, j in zip(close_to_limits, two_in_three)
  ]

def special_cause_flag(values, outside_limits, close_to_limits, relative_to_mean):
    return (
        outside_limits |
        part_of_seven_trend(seven_point_one_side_mean(relative_to_mean)) |
        part_of_seven_trend(seven_point_trend(values)) |
        part_of_two_in_three(two_in_three(close_to_limits, relative_to_mean), close_to_limits)
    )

def spc_x_calc(values, fix_after_n_points = None):
    fix_values = values[:fix_after_n_points]
    # constant
    limit = 2.66

    mean = np.mean(fix_values)
    mr = np.abs(np.diff(fix_values))
    amr = np.mean(mr)

    # screen for outliers
    mr = mr[mr < 3.267 * amr]
    amr = np.mean(mr)

    lpl = mean - (limit * amr)
    upl = mean + (limit * amr)
    
    # identify near lower/upper process limits
    nlpl = mean - (limit * 2 / 3 * amr)
    nupl = mean + (limit * 2 / 3 * amr)

    # identify any points which are outside the upper or lower process limits
    outside_limits = (values < lpl) | (values > upl)
    # identify whether a point is above or below the mean
    relative_to_mean = np.sign(values - mean)

    # identify if a point is between the near process limits and process limits
    close_to_limits = ~outside_limits & ((values < nlpl) | (values > nupl))

    spc_return_type = namedtuple("spc_x", [
        "values",
        "mean",
        "lpl",
        "upl",
        "outside_limits",
        "relative_to_mean",
        "close_to_limits",
        "special_cause_flag"
    ])

    return spc_return_type(
        values,
        mean,
        lpl,
        upl,
        outside_limits,
        relative_to_mean,
        close_to_limits,
        special_cause_flag(values, outside_limits, close_to_limits, relative_to_mean)
    )

In [2]:
spc_x_calc([1,2,3,3,2,4,5,8])

spc_x(values=[1, 2, 3, 3, 2, 4, 5, 8], mean=3.5, lpl=0.07999999999999963, upl=6.92, outside_limits=array([False, False, False, False, False, False, False,  True]), relative_to_mean=array([-1., -1., -1., -1., -1.,  1.,  1.,  1.]), close_to_limits=array([ True, False, False, False, False, False, False, False]), special_cause_flag=array([False,  True,  True,  True,  True,  True,  True,  True]))

In [17]:
import pandas as pd
import numpy as np

# Create a pandas dataframe with a date column and a data column
# The data column will have random integers between 0 and 100

date_rng = pd.date_range(start='1/1/2020', end='1/10/2020', freq='D')
df = pd.DataFrame(date_rng, columns=['date'])
df['data'] = np.random.randint(0,100,size=(len(date_rng)))

# Add outliers
df.loc[9,'data'] = 850
df.loc[6,'data'] = 530

print(df)

        date  data
0 2020-01-01    39
1 2020-01-02    32
2 2020-01-03    55
3 2020-01-04    90
4 2020-01-05    34
5 2020-01-06    62
6 2020-01-07   530
7 2020-01-08    94
8 2020-01-09    79
9 2020-01-10   850


In [28]:
import pandas as pd
import numpy as np

def spc_x_calc_pandas(df, values_col, fix_after_n_points = None):
    values = df[values_col].values
    fix_values = values[:fix_after_n_points]
    # constant
    limit = 2.66

    mean = np.mean(fix_values)
    mr = np.abs(np.diff(fix_values))
    amr = np.mean(mr)

    # screen for outliers
    mr = mr[mr < 3.267 * amr]
    amr = np.mean(mr)

    lpl = mean - (limit * amr)
    upl = mean + (limit * amr)
    
    # identify near lower/upper process limits
    nlpl = mean - (limit * 2 / 3 * amr)
    nupl = mean + (limit * 2 / 3 * amr)

    # identify any points which are outside the upper or lower process limits
    outside_limits = (values < lpl) | (values > upl)
    # identify whether a point is above or below the mean
    relative_to_mean = np.sign(values - mean)

    # identify if a point is between the near process limits and process limits
    close_to_limits = ~outside_limits & ((values < nlpl) | (values > nupl))

    # create output pandas dataframe from numpy calculations
    output_df = df
    output_df['outside_limits'] = outside_limits
    output_df['relative_to_mean'] = relative_to_mean
    output_df['close_to_limits'] = close_to_limits
    output_df['special_cause_flag'] = special_cause_flag(values, outside_limits, close_to_limits, relative_to_mean)

    # create named tuple of mean, upper, and lower limits
    spc_return_type = namedtuple("spc_x", [
        "mean",
        "lpl",
        "upl"
    ])

    return output_df, spc_return_type(
        mean,
        lpl,
        upl
    )

spc = spc_x_calc_pandas(df, 'data')
spc[0]

Unnamed: 0,date,data,outside_limits,relative_to_mean,close_to_limits,special_cause_flag
0,2020-01-01,39,False,-1.0,False,False
1,2020-01-02,32,False,-1.0,False,False
2,2020-01-03,55,False,-1.0,False,False
3,2020-01-04,90,False,-1.0,False,False
4,2020-01-05,34,False,-1.0,False,False
5,2020-01-06,62,False,-1.0,False,False
6,2020-01-07,530,False,1.0,True,False
7,2020-01-08,94,False,-1.0,False,False
8,2020-01-09,79,False,-1.0,False,False
9,2020-01-10,850,True,1.0,False,True


In [26]:
mean = spc[1][0]
print(mean)

186.5
