In [50]:
def get_change_points(series, threshold, min_observations_below, min_distance_change_streaks):
    """Gets a list of changepoints from a series of observations based on a threshold. 
    
    A change point is registered if the series values are below the threshold for the duration of 'min_observations_below'. 
    Change points will only be flagged if there is at least a distance of 'min_distance_change_streaks' between streaks of observations below the change point.
    
    Args:
        series: Pandas series with observations.
        
    Returns:
        List of change points.
    """
    # for each row, get whether its value is of threshold or lower
    series_below_threshold = series <= threshold

    # do an accumulative count of how many values below the threshold have been observed
    # restarts at 0 as soon as one value > threshold is observed
    below_threshold_counts = series_below_threshold * (series_below_threshold.groupby((series_below_threshold != series_below_threshold.shift()).cumsum()).cumcount() + 1)
    
    # reset the index of below_threshold_counts to 0...n instead of trace counts
    true_indices_series = below_threshold_counts.index
    below_threshold_counts = below_threshold_counts.reset_index(drop=True)

    # store change points as indices
    change_points = []
    
    # ix when the last streak ended
    last_change_streak_ended = None

    for index, streak_count in below_threshold_counts.iteritems():
        if streak_count < min_observations_below:
            continue
        
        # check if the streak is exactly the minimum number of observations
        if streak_count == min_observations_below:
            
            integer_index_candidate = index - min_observations_below + 1
            change_point_candidate = true_indices_series[integer_index_candidate]
            
            # definitely enter the change point if this is the first streak that was seen
            if last_change_streak_ended is None:
                change_points.append(change_point_candidate)
            else:
                # get distance to last streak end
                distance_to_last_streak = integer_index_candidate - last_change_streak_ended - 1

                if distance_to_last_streak >= min_distance_change_streaks:
                    change_points.append(change_point_candidate)

        # update the end of the last change streak, if the current streak count exceeds the min observations below threshold
        if streak_count >= min_observations_below:
            last_change_streak_ended = index
    
    return change_points

In [51]:
import pandas as pd
import numpy as np

In [52]:
series_list = [0, 0, 1, 0, 0, 1, 1] # 1 cp at index 60 (=iloc 3)

series = pd.Series(series_list, index=np.linspace(0, 200, len(series_list)))
display(series)

get_change_points(series, threshold=0.05, min_observations_below=1, min_distance_change_streaks=1)

0.000000      0
33.333333     0
66.666667     1
100.000000    0
133.333333    0
166.666667    1
200.000000    1
dtype: int64

[0.0, 100.0]

0     0
1     0
2     0
3     1
4     2
5     3
6     0
7     1
8     0
9     1
10    2
11    0
12    0
13    0
14    1
15    2
16    3
17    0
18    1
19    0
20    1
21    2
dtype: int64

[28.57142857142857]