In [None]:
# uncommment below line to install ruptures if not already installed

# !python -m pip install ruptures

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import ruptures as rpt
from scipy.stats import zscore, linregress, iqr, skew, kurtosis
import numpy as np
import xlsxwriter
import os
from datetime import datetime

# Please enter the below details

In [None]:
penalty = 1 # higher means less cells
dim = 'Cell Name' # dimension you are interested in
dim_net ='Whole Network'
date = 'Date'     # date variable
img_dir = "C:/Users/BronyahJ/images/" # path to store images
source_file_path = "C:\\Users\\BronyahJ\\Downloads\\" # where your source data is located
file_name = "2G_SSV_Acceptance_v1_Query_Result_20230208161517190.xlsx" # source filename
Tech = '2G'
cells_sheet = 0
network_sheet = 1
network_sheet_cells = 2
scale_data = True

# Specify columns you want to delete in the to_delete list

In [None]:
REMOVE_NA = True
to_delete = ['LocalCell Id','Integrity', 'Cell ID', 'Cell CI', 'CellIndex','NR Cell ID','Cell FDD TDD Indication']
KNOWN_NA_VALS = ['NIL', 'NILL', 'NULL', 'NA', '#NA', '#N/A', 'N/A','#VALUE!','#REF!','#DIV/0!','#NUM!','#NAME?','#NULL!','NAN','nan','NaN','/0']

In [None]:
imp_st = datetime.now()

if str(file_name).split(".")[-1] == 'xlsx' or str(file_name).split(".")[-1] == 'xls':
    df = pd.read_excel(source_file_path+file_name,sheet_name=cells_sheet)
    df01 = pd.read_excel(source_file_path+file_name,sheet_name=network_sheet_cells)
    
    
elif str(file_name).split(".")[-1] == 'csv':
    df  = pd.read_csv(source_file_path+file_name,skiprows=6)

imp_end= datetime.now()
print("It took {} time to import data".format(imp_end-imp_st))

In [None]:
if len(to_delete) > 0:
    df = df.drop([x for x in to_delete if x in df.columns], axis=1)
    df01 = df01.drop([x for x in to_delete if x in df01.columns], axis=1)

# Data cleaning and processing

In [None]:
############################################################################################
# Experimental code here. Trying to replace Known values which means NULL / NA to np.nan
# Ideally this should help our analysis.
############################################################################################
for x in [x for x in df.columns if df[x].dtype.kind.lower() in ('o', 's', 'u', 'v')]:
    if len(df[df[x].isin(KNOWN_NA_VALS)]) > 0: #some instances were found with Known NA substitutions
        df.loc[ df[x].isin(KNOWN_NA_VALS), x ] = np.nan
        print("Found some known NA substitutions in {}. Will replace and try to force as numeric".format(x))
        try: #now we will try to see if the column can become numeric
            df[x] = pd.to_numeric(df[x], errors='raise')
        except ValueError as e:
            continue #Column cannot be converted to numeric. Just continue
############################################################################################

############################################################################################
# Experiment 2: If all endings are % or $ or # then we will try to strip these and check if 
#     the column can be converted as a numeric value
############################################################################################
sp_endings = ['%', '$', '#', '£', 'QAR', 'GBP', 'qar', 'gbp', 'usd', 'USD' ,'eur', 'EUR']

for x in [x for x in df.columns if df[x].dtype.kind.lower() in ('o', 's', 'u', 'v')]:
    for sp in sp_endings:
        totals = df[x].astype(str).str.endswith(sp).sum() + df[x].isna().sum()
        if totals == len(df): #Either all entries end with special char or are null
            temp = df[x].astype(str).str.replace(sp, '')
            try:
                temp_numeric = pd.to_numeric(temp, errors='raise')
                df[x] = temp_numeric #if we were able to convert to numeric then we keep this
                                    # in our dataframe. else no change
                print("Modified column {} for special endings {} and changed to numeric".format(
                        x, sp))
            except ValueError as ve:
                continue
############################################################################################
# Same code as above but for string beginnings

for x in [x for x in df.columns if df[x].dtype.kind.lower() in ('o', 's', 'u', 'v')]:
    for sp in sp_endings:
        totals = df[x].astype(str).str.startswith(sp).sum() + df[x].isna().sum()
        if totals == len(df): #Either all entries end with special char or are null
            temp = df[x].astype(str).str.replace(sp, '')
            try:
                temp_numeric = pd.to_numeric(temp, errors='raise')
                df[x] = temp_numeric #if we were able to convert to numeric then we keep this
                                    # in our dataframe. else no change
                print("Modified column {} for special startings {} and changed to numeric".format(
                        x, sp))
            except ValueError as ve:
                continue
############################################################################################
# Now we have to deal with NA values
if REMOVE_NA == True:
    df = df.dropna(axis=0, how='all')
else:
    for x in [x for x in df.columns]:
        if df[x].dtype.kind in ('f', 'c', 'i', 'u'):
            df[x].fillna(df[x].median(),inplace=True)
    df = df.fillna(method='ffill')
    df = df.fillna(method='bfill')
############################################################################################

    
############################################################################################    
# WARNING - DONT REMOVE BELOW WITHOUT UNDERSTANDING OF THE CODE BIT
# This step is mandatory. We will delete any column if it is Completely np.nan
a = df.isna().sum(axis=0)
FULL_NA_COLS = [x for x in a[df.isna().sum(axis=0) == len(df)].index]
df = df.drop(FULL_NA_COLS, axis=1)

In [None]:
############################################################################################
# Experimental code here. Trying to replace Known values which means NULL / NA to np.nan
# Ideally this should help our analysis.
############################################################################################
for x in [x for x in df01.columns if df01[x].dtype.kind.lower() in ('o', 's', 'u', 'v')]:
    if len(df01[df01[x].isin(KNOWN_NA_VALS)]) > 0: #some instances were found with Known NA substitutions
        df01.loc[ df01[x].isin(KNOWN_NA_VALS), x ] = np.nan
        print("Found some known NA substitutions in {}. Will replace and try to force as numeric".format(x))
        try: #now we will try to see if the column can become numeric
            df01[x] = pd.to_numeric(df01[x], errors='raise')
        except ValueError as e:
            continue #Column cannot be converted to numeric. Just continue
############################################################################################

############################################################################################
# Experiment 2: If all endings are % or $ or # then we will try to strip these and check if 
#     the column can be converted as a numeric value
############################################################################################
sp_endings = ['%', '$', '#', '£', 'QAR', 'GBP', 'qar', 'gbp', 'usd', 'USD' ,'eur', 'EUR']

for x in [x for x in df01.columns if df01[x].dtype.kind.lower() in ('o', 's', 'u', 'v')]:
    for sp in sp_endings:
        totals = df01[x].astype(str).str.endswith(sp).sum() + df01[x].isna().sum()
        if totals == len(df01): #Either all entries end with special char or are null
            temp = df01[x].astype(str).str.replace(sp, '')
            try:
                temp_numeric = pd.to_numeric(temp, errors='raise')
                df01[x] = temp_numeric #if we were able to convert to numeric then we keep this
                                    # in our dataframe. else no change
                print("Modified column {} for special endings {} and changed to numeric".format(
                        x, sp))
            except ValueError as ve:
                continue
############################################################################################
# Same code as above but for string beginnings

for x in [x for x in df01.columns if df01[x].dtype.kind.lower() in ('o', 's', 'u', 'v')]:
    for sp in sp_endings:
        totals = df01[x].astype(str).str.startswith(sp).sum() + df01[x].isna().sum()
        if totals == len(df01): #Either all entries end with special char or are null
            temp = df01[x].astype(str).str.replace(sp, '')
            try:
                temp_numeric = pd.to_numeric(temp, errors='raise')
                df01[x] = temp_numeric #if we were able to convert to numeric then we keep this
                                    # in our dataframe. else no change
                print("Modified column {} for special startings {} and changed to numeric".format(
                        x, sp))
            except ValueError as ve:
                continue
############################################################################################
# Now we have to deal with NA values
if REMOVE_NA == True:
    df01 = df01.dropna(axis=0, how='all')
else:
    for x in [x for x in df01.columns]:
        if df01[x].dtype.kind in ('f', 'c', 'i', 'u'):
            df01[x].fillna(df01[x].median(),inplace=True)
    df01 = df01.fillna(method='ffill')
    df01 = df01.fillna(method='bfill')
############################################################################################

    
############################################################################################    
# WARNING - DONT REMOVE BELOW WITHOUT UNDERSTANDING OF THE CODE BIT
# This step is mandatory. We will delete any column if it is Completely np.nan
a = df01.isna().sum(axis=0)
FULL_NA_COLS = [x for x in a[df01.isna().sum(axis=0) == len(df01)].index]
df01 = df01.drop(FULL_NA_COLS, axis=1)

In [None]:
df.reset_index(inplace=True,drop=True)
df01.reset_index(inplace=True,drop=True)

In [None]:
df01.head()

In [None]:
df_test = df01.pivot_table(index=date, aggfunc=np.nanmean)

In [None]:
df_test.reset_index(inplace=True,drop=True)

In [None]:
df_test

In [None]:
df_test1 = df01.pivot_table(index=date, aggfunc=np.nanstd)

In [None]:
df_test1.reset_index(inplace=True,drop=True)

In [None]:
df_test1

In [None]:
df01 = df01.dropna(axis=0, how='any')
df01.reset_index(inplace=True,drop=True)

In [None]:
df01.isnull().any().sum()

In [None]:
df01.shape

In [None]:
df02=df01.copy(deep=True)

In [None]:
kpi_net = []
kpi_net_2 = []
kpi_net_3 = []
kpi_net_4 = []
kpi_net_5 = []

for x in [x for x in df02.columns if df02[x].dtype.kind.lower() not in ('o', 's', 'u', 'v','m')]:
    
    
    if scale_data:
        for k in  np.arange(0,len(df02[x])):
            df02[x].iloc[k] -= df02[x].min()
            if (df02[x].max()-df02[x].min()) > 0:
                df02[x].iloc[k] /= (df02[x].max()-df02[x].min())
            else:
                df02[x].iloc[k] = 0
                
            
                  
        

                
            
    high_count = 0
    low_count = 0
    IQR = iqr(df02[x].values, nan_policy='omit')
    Q1 = np.nanpercentile(df02[x].values,25)
    Q3 = np.nanpercentile(df02[x].values,75)
    low_limit = Q1 - 1.5*IQR
    high_limit = Q3 + 1.5*IQR
    sk = skew(df02[x].values, nan_policy='omit')
    kurt = kurtosis(df02[x].values, nan_policy='omit')
    
    #high_limit = np.nanmean(df02[x].values) - (3*np.nanstd(df02[x].values))
    #low_limit = np.nanmean(df02[x].values) + (3*np.nanstd(df02[x].values))
    
    if Q3 == Q1:
        
        for i in np.arange(0,df02[x].shape[0]):
            if df02[x][i] > high_limit:
                high_count = high_count + 1
                
            elif df02[x][i] < low_limit:
                low_count = low_count + 1
    
    else:
        
        for i in np.arange(0,df02[x].shape[0]):
            if (df02[x][i] >= high_limit) and (low_limit != high_limit) :
                high_count = high_count + 1
                
            elif (df02[x][i] <= low_limit) and (low_limit != high_limit):
                low_count = low_count + 1
    
    
    
    
    kpi_net_3.append(x)
    kpi_net.append(high_count)
    kpi_net_2.append(low_count)
    kpi_net_4.append(sk)
    kpi_net_5.append(kurt)
    
            
        
kpis_df = pd.DataFrame([kpi_net_3,kpi_net,kpi_net_2,kpi_net_4,kpi_net_5], index=['kpi', 'high','low','skew','kurt']).T  

In [None]:
cells_ssv = []
kpi =[]
value = []
status = []
network_avg = []


for x in [x for x in df.columns if df[x].dtype.kind.lower() not in ('o', 's', 'u', 'v','m')]:
    
    df_cell = df.pivot_table(index=dim, columns=date, values=x, aggfunc=np.mean)
    
    for i in np.arange(0,df_cell.shape[0]):
        
        high=df_test[x][0] + (3 * df_test1[x][0])
        low= df_test[x][0] - (3 * df_test1[x][0])
        
        kpi_df_high = kpis_df[kpis_df['kpi']==x].iloc[0,1]
        kpi_df_low = kpis_df[kpis_df['kpi']==x].iloc[0,2]
        kpi_df_skew = kpis_df[kpis_df['kpi']==x].iloc[0,3]
        kpi_df_kurt = kpis_df[kpis_df['kpi']==x].iloc[0,4]
        
        thd_high =  df_test[x][0] + (2 * df_test1[x][0])
        thd_low =   df_test[x][0] - (2 * df_test1[x][0])
        
        
        algo = rpt.Pelt(model="rbf").fit(df_cell.iloc[i,:].values)
        result = algo.predict(pen=penalty)
        
        
        if len(result)>1:
            cell_name = str(df_cell.iloc[i,:].head(0)).split(',')[1]
            kpi_name = x
            #TODO make sure u check for length of result before doing being line
            value_avg = np.nanmean(df_cell.iloc[i,:][result[-2]:result[-1]]) #changing result[0] to result[-2] JB
            
            
            if ((kpi_df_high > kpi_df_low and kpi_df_skew>=0) or (kpi_df_high==0 and kpi_df_low==0 and kpi_df_skew>=0)):
                if value_avg > thd_high:
                    stat = 'failed'
                else:
                    stat = 'passed'
                    
            
            elif (kpi_df_high > kpi_df_low and kpi_df_skew<0 and kpi_df_kurt<=0):
                if value_avg > thd_high:
                    stat = 'failed'
                else:
                    stat = 'passed'
                
            
            else:
                if value_avg < thd_low:
                    stat = 'failed'
                else:
                    stat = 'passed'
                    
                    
            cells_ssv.append(cell_name)
            kpi.append(kpi_name)
            value.append(value_avg)
            network_avg.append(df_test[x][0])
            status.append(stat)
            
            
            
        else:
            cell_name = str(df_cell.iloc[i,:].head(0)).split(',')[1]
            kpi_name = x
            value_avg = np.nanmean(df_cell.iloc[i,:][df_cell.iloc[i,:]<=high][df_cell.iloc[i,:]>=low])
           
            
            if ((kpi_df_high > kpi_df_low and kpi_df_skew>=0) or (kpi_df_high==0 and kpi_df_low==0 and kpi_df_skew>=0)):
                if (value_avg > thd_high):
                    stat = 'failed'
                else:
                    stat = 'passed'
                    
            else:
                if (value_avg < thd_low):
                    stat = 'failed'
                else:
                    stat = 'passed'
                    
            cells_ssv.append(cell_name)
            kpi.append(kpi_name)
            value.append(value_avg)
            network_avg.append(df_test[x][0])
            status.append(stat)
            



ssv_table = pd.DataFrame([cells_ssv,kpi,value,network_avg,status], index=['cells', 'kpi','value','net_avg','status']).T

In [None]:
ssv_table

In [None]:
ssv_table.to_csv(Tech+'_ssv_result.csv')