In [2]:
import numpy as np

In [3]:
#Optwin
from river.base import DriftDetector
#from scipy.stats import norm
from scipy.stats import t as t_stat
from scipy.optimize import fsolve
#import scipy.stats
#import math

import warnings
warnings.filterwarnings('ignore', 'The iteration is not making good progress')

In [4]:
class Optwin_river(DriftDetector):
    #pre-compute optimal cut for all possible window sizes
    def pre_compute_cuts(self, opt_cut, opt_phi, t_stats, t_stats_warning):
        if len(opt_cut) != 0 and len(opt_phi) != 0 and len(t_stats) != 0 and len(t_stats_warning) != 0:
            return opt_cut, opt_phi, t_stats, t_stats_warning
        self.W = []
        for i in range(self.w_lenght_max+1):
            if i < self.w_lenght_min:
                opt_cut.append(0)
                opt_phi.append(0)
            else:
                optimal_cut = fsolve(self.t_test, (len(self.W)-30)/len(self.W))
                optimal_cut = math.floor(optimal_cut[0]*len(self.W)) #parse to integer
                #phi_opt = scipy.stats.f.ppf(q=self.confidence_two_tailes, dfn=optimal_cut-1, dfd=len(self.W)-optimal_cut-1) 
                phi_opt = scipy.stats.f.ppf(q=self.confidence, dfn=optimal_cut-1, dfd=len(self.W)-optimal_cut-1) 
                opt_cut.append(optimal_cut)
                opt_phi.append(phi_opt)
            self.W.append(1)
        self.W = []
        
        for i in range(self.w_lenght_max+1):
            if i < self.w_lenght_min:
                t_stats.append(0.0)
                t_stats_warning.append(0.0)
            else:
                t_stats.append(self.t_score(i))
                t_stats_warning.append(self.t_score_warning(i))
        
        return opt_cut, opt_phi, t_stats, t_stats_warning
    
    def insert_to_W(self, x):
        self.W.append(x)
        
        #add new value to running stdev
        if self.running_average:
            self.stdev_new, self.summation_new, self.count_new, self.S_new = self.add_running_stdev(self.summation_new, self.count_new, self.S_new, [x])
        
        #check if window is too large
        if len(self.W) > self.w_lenght_max:
            pop = self.W.pop(0)
            #remove excedent value from running stdev
            if self.running_average:
                self.stdev_h, self.summation_h, self.count_h, self.S_h = self.pop_from_running_stdev(self.summation_h, self.count_h, self.S_h, [pop])
                #walk with sliding window
                self.stdev_new, self.summation_new, self.count_new, self.S_new = self.pop_from_running_stdev(self.summation_new, self.count_new, self.S_new, [self.W[self.last_opt_cut]])
                self.stdev_h, self.summation_h, self.count_h, self.S_h = self.add_running_stdev(self.summation_h, self.count_h, self.S_h, [self.W[self.last_opt_cut]])
        
        self.itt += 1
        return
    
    #https://stats.stackexchange.com/questions/24878/computation-of-new-standard-deviation-using-old-standard-deviation-after-change
    def add_running_stdev(self, summation, count, S, x):
        
        summation += sum(x)
        count += len(x)
        S += sum([i*i for i in x])
                
        if (count > 1 and S > 0):
            stdev = math.sqrt((count*S) - (summation*summation)) / count
            return stdev, summation, count, S
        else:
            return 0, summation, count, S
        
    def pop_from_running_stdev(self,summation, count, S, x):
        summation -= sum(x)
        count -= len(x)
        S -= sum([i*i for i in x])
        
        if (count > 1 and S > 0):
            stdev = math.sqrt((count*S) - (summation*summation)) / count
            return stdev, summation, count, S
        else:
            return 0, summation, count, S 
    
    #add new element to window
    def update(self, x):
        #add new element to window
        self.iteration += 1
        self.insert_to_W(x)
        self.delay = 0
        
        #check if window is too small
        if len(self.W) < self.w_lenght_min:
            self._drift_detected = False
            self.in_warning_zone = False
            return False
        
        #check optimal window cut and phi
        if self.pre_compute_optimal_cut:
            #get pre-calculated optimal window cut and phi
            optimal_cut = self.opt_cut[len(self.W)]
            phi_opt = self.opt_phi[len(self.W)]
        else:
            #calculate optimal window cut and phi in real-time
            if len(self.W) < len(self.opt_cut): #has already been calculated
                optimal_cut = self.opt_cut[len(self.W)]
                phi_opt = self.opt_phi[len(self.W)]
            else:
                optimal_cut = fsolve(self.t_test, (len(self.W)-30)/len(self.W))
                optimal_cut = math.floor(optimal_cut[0]*len(self.W)) #parse to integer
                phi_opt = scipy.stats.f.ppf(q=self.confidence, dfn=optimal_cut-1, dfd=len(self.W)-optimal_cut-1) 
                self.opt_cut.append(optimal_cut)
                self.opt_phi.append(phi_opt)
                     
        #update stdev and avg
        if self.running_average:
            #update running stdev and avg
            if optimal_cut > self.last_opt_cut: #remove elements from window_new and add them to window_h
                self.stdev_new, self.summation_new, self.count_new, self.S_new = self.pop_from_running_stdev(self.summation_new, self.count_new, self.S_new, self.W[self.last_opt_cut:optimal_cut])
                self.stdev_h, self.summation_h, self.count_h, self.S_h = self.add_running_stdev(self.summation_h, self.count_h, self.S_h, self.W[self.last_opt_cut:optimal_cut])
            elif optimal_cut < self.last_opt_cut: #remove elements from window_h and add them to window_new
                self.stdev_h, self.summation_h, self.count_h, self.S_h = self.pop_from_running_stdev(self.summation_h, self.count_h, self.S_h, self.W[optimal_cut:self.last_opt_cut])
                self.stdev_new, self.summation_new, self.count_new, self.S_new = self.add_running_stdev(self.summation_new, self.count_new, self.S_new, self.W[optimal_cut:self.last_opt_cut])
            avg_h = self.summation_h / self.count_h
            avg_new = self.summation_new / self.count_new
            stdev_h = self.stdev_h
            stdev_new = self.stdev_new
            self.last_opt_cut = optimal_cut
        else:
            #recalculate stdev and avg
            stdev_h = scipy.stats.tstd(self.W[:optimal_cut])
            stdev_new = scipy.stats.tstd(self.W[optimal_cut:])
            avg_h = sum(self.W[:optimal_cut]) / optimal_cut
            avg_new = sum(self.W[optimal_cut:]) / (len(self.W[optimal_cut:]))
            self.last_opt_cut = optimal_cut
        
        #add minimal noise to stdev
        stdev_h += self.minimum_noise
        stdev_new += self.minimum_noise
        
        #check only one side of f and t-test (if the loss decreases it means that the model is learning, not that a concept drift occurred
        #f-test
        
        if (stdev_new*stdev_new/(stdev_h*stdev_h)) > phi_opt:
            self.drift_reaction("f")
            return True
        
        '''if avg_h - avg_new < 0:
                self.drift_reaction("f")
                #self.insert_to_W(x)
                return True
            else:
                self.empty_window()
                self.insert_to_W(x)
                self._drift_detected = False
                self.in_warning_zone = False
                return False'''
        
        #check t-stat
        if self.pre_compute_optimal_cut:
            t_stat = self.t_stats[len(self.W)]
            t_stat_warning = self.t_stats_warning[len(self.W)]
        else:
            t_stat = self.t_score(len(self.W))
            t_stat_warning = self.t_score_warning(len(self.W))
                
        #t-test
        t_test_result = (avg_new-avg_h) / (math.sqrt((stdev_new/(len(self.W)-optimal_cut))+(stdev_h/optimal_cut)))
        if  t_test_result > t_stat:
            self.drift_reaction("t")
            #self.insert_to_W(x)
            return True
        elif t_test_result > t_stat_warning:
            self.in_warning_zone = True
            return False
        
        self._drift_detected = False
        self.in_warning_zone = False
        return False
    
    def drift_reaction(self, drift_type):            
        self.drift_type.append(drift_type)
        self.drifts.append(self.iteration)                
        self.drift_detected_last_it = True
        self._drift_detected = True
        self.empty_window()
        
        return True
        
    def reset(self):
        self.empty_window()
        self.drift_detected_last_it = False
        self._drift_detected = False
        self._reset()
    
    def empty_window(self):
        self.W = []
        self.stdev_new = 0
        self.summation_new = 0
        self.count_new = 0
        self.S_new = 0
        self.stdev_h = 0
        self.summation_h = 0
        self.count_h = 0
        self.S_h = 0
        self.last_opt_cut = 0
            
    def detected_change(self):
        return self.drift_detected_last_it
    
    def get_length_estimation(self):
        self.estimation = len(self.W)
        return len(self.W)
    
    def detected_warning_zone(self):
        return self.in_warning_zone
    
    def __init__(self, confidence_final = 0.999, rigor = 0.5, empty_w=True, w_lenght_max = 50000, w_lenght_min = 30, minimum_noise = 1e-6, opt_cut = [], opt_phi = [], t_stats = [], t_stats_warning = [], pre_compute_optimal_cut = True, running_average = True):
        #init variables
        super().__init__()
        self.confidence_final = confidence_final #confidence value chosen by user
        self.rigor = rigor #rigorousness of drift identification
        self.w_lenght_max = w_lenght_max #maximum window size
        self.w_lenght_min = w_lenght_min #minimum window size
        self.minimum_noise = minimum_noise #noise to be added to stdev in case it is 0
        self.pre_compute_optimal_cut = pre_compute_optimal_cut #pre_compute all possible window sizes?
        self.running_average = running_average #calculate stdev and avg with running sliding window?
        self.empty_w = empty_w #empty window when drift is detected
        
        
        self.W = [] #window
        self.opt_cut = opt_cut #pre-calculated optimal cut for all possible windows
        self.opt_phi = opt_phi #pre-calculated optimal phi for all possible windows
        self.t_stats = t_stats
        self.t_stats_warning = t_stats_warning
        self.last_opt_cut = 0
        self.drifts = [] #drifts identified 
        self.drift_type = [] #types of drifts identified
        self.iteration = 0 #current iteration step
        self.confidence = pow(self.confidence_final, 1/2) #confidence used on the t-test
        self.confidence_warning = 0.95
        #self.confidence_two_tailes = 1-(1-self.confidence)/2 #confidence used on the f-test
        self.t_score = lambda w_lenght : t_stat.ppf(self.confidence, df=w_lenght-2) #t_value to achieve desired confidence
        self.t_score_warning = lambda w_lenght : t_stat.ppf(self.confidence_warning, df=w_lenght-2) #t_value to achieve desired confidence
        #self.f_test = lambda n : scipy.stats.f.ppf(q=self.confidence_two_tailes, dfn=(n*len(self.W))-1, dfd=len(self.W)-(n*len(self.W))-1) #f-test formula
        self.f_test = lambda n : scipy.stats.f.ppf(q=self.confidence, dfn=(n*len(self.W))-1, dfd=len(self.W)-(n*len(self.W))-1) #f-test formula
        self.t_test = lambda n : self.rigor - (self.t_score(len(self.W)) * np.sqrt((1/(len(self.W)*n))+((1* self.f_test(n))/((1-n)*len(self.W))))) #t-test formula (stdev_h=1 because it is cancelled during solution)
    
        #Running stdev and avg
        self.stdev_new = 0
        self.summation_new = 0
        self.count_new = 0
        self.S_new = 0
        self.stdev_h = 0
        self.summation_h = 0
        self.count_h = 0
        self.S_h = 0
        
        self.itt = 0
    
        self.drift_detected_last_it = False
        self.in_concept_change = False
        self.in_warning_zone = False
        self.estimation = 0.0
        self.delay = 0.0
        self.sequence_drifts = 0
        self.sequence_no_drifts = 0
    
        #pre-compute optimal cut for all possible window sizes (if True)
        if self.pre_compute_optimal_cut:
            self.opt_cut, self.opt_phi, self.t_stats, self.t_stats_warning = self.pre_compute_cuts(self.opt_cut, self.opt_phi, self.t_stats, self.t_stats_warning)
            
        if len(self.opt_cut) == 0:
            self.opt_cut = [0 for i in range(w_lenght_min)]
            self.opt_phi = [0 for i in range(w_lenght_min)]
            self.t_stats = [0.0 for i in range(w_lenght_min)]
            self.t_stats_warning = [0.0 for i in range(w_lenght_min)]
            
        if len(self.opt_cut) >= w_lenght_max and len(self.opt_phi) >= w_lenght_max:
            self.pre_compute_optimal_cut = True
            