In [2]:
from datetime import datetime
from collections import namedtuple
import os
import pandas as pd
import io
import warnings
import logging
from IPython.display import display

## export csv
### parce_filename:

In [None]:
"""
file format
SYSTEMCPUSTATISTICSCORELEVEL-2022-04-22-10-10-00.csv
"""
filename_tuple = namedtuple('filename', ['table', 'date', 'extension'])

def parce_filename(name: str) -> namedtuple:
    fname, extension = name.split(".")
    if extension != "csv":
        return filename_tuple("", datetime.now(), extension)
    name_parts = fname.split("-")
    table = name_parts[0]
    date = datetime.strptime("-".join(name_parts[1:]), "%Y-%m-%d-%H-%M-%S")
    return filename_tuple(table,date,extension)   

# parce_filename("SYSTEMCPUSTATISTICSCORELEVEL-2022-04-22-10-10-00.csv")
# parce_filename("SYSTEMCPUSTATISTICSCORELEVEL-2022-04-22-10-10-00.csvq")

### selectFiles: selecting files based on dir location and time period

In [None]:
def selectFiles(dir: str, dateStart: datetime, dateStop: datetime) -> list:
    """
    selecting files based on dir location and time period
    """
    fileNames = []
    for filename in os.listdir(dir):
        if os.path.isdir(os.path.join(dir, filename)):
            continue

        filename_info = parce_filename(filename)

        if filename_info.extension != "csv":
            print("not stat file", filename)
            continue
        
        if not dateStart < filename_info.date < dateStop:
            continue
        
        fileNames.append(filename)
    return fileNames

In [None]:
def selectSortFiles(dir: str, dateStart: datetime, dateStop: datetime) -> dict:
    """
    selecting files based on dir location and time period
    and puting them into dict based on table name
    """
    timeStart = datetime.now()
    fileNames = {}
    for filename in os.listdir(dir):
        if os.path.isdir(os.path.join(dir, filename)):
            continue

        filename_info = parce_filename(filename)

        if filename_info.extension != "csv":
            print("not stat file", filename)
            continue
        
        if not dateStart < filename_info.date < dateStop:
            continue
        
        tabledata = fileNames.get(filename_info.table, [])
        if len(tabledata) == 0:
            fileNames[filename_info.table] = tabledata
        tabledata.append(filename)
    print("selectSortFiles time=", datetime.now()-timeStart)
    return fileNames



In [None]:
# wraper on panda for alarm parsing
def pd_read_csv(filename, dtype='unicode'):
    dateparse = lambda x: pd.to_datetime(x, format="%m/%d/%Y %H:%M:%S:%f")
    return pd.read_csv(filename, error_bad_lines=False,
                       sep=',', header=0, index_col=6,
                       parse_dates=['Event Time'],
                       date_parser=dateparse, 
                       dtype=dtype)


In [None]:
# wraper on panda
def pd_ems(filename, dtype=None):
    dateparse_ems = lambda x: pd.to_datetime(x, unit='ms').tz_localize('UTC').tz_convert('Europe/Kiev')
    return pd.read_csv(filename, error_bad_lines=False,
                       sep=',', header=0, index_col=0,
                       parse_dates=['#timeofcollection'],
                       date_parser=dateparse_ems, 
#                        for autoconverting types
                       dtype=dtype, na_values=''
                      )

def read_ems_csv(filename, dtype=None, tz: str = "UTC"):
    """
    read csv file from EMS to df
    """
    dateparse_ems = lambda x: pd.to_datetime(x, unit='ms').tz_localize('UTC').tz_convert(tz)
    return pd.read_csv(filename, on_bad_lines="skip",
                       sep=',', header=0, index_col=0,
                       parse_dates=['#timeofcollection'],
                       date_parser=dateparse_ems, 
#                        for autoconverting types
                       dtype=dtype, na_values=''
                      )


### categories:

In [14]:
# what fields to mark as category. done manualy
# TODO: generate this dict from file mccPerfObjects.csv or like PORT_STATS.xml
cat_all = set(['name',
 'operatingSystems_grid',
 'resourceid',
 'zone_name',
 'protocols_grid',
 'category',
 'applications_grid',
 'diameterInterfaceType_intfType',
 'diameterInterfaceType_nodeName',
 'diameterInterfaceType_peerName',
 'serviceDataContainerUsageStatsNew_ratingGroupName',
 'serviceDataContainerUsageStatsNew_serviceIdentifier',
 'epdgApn_name',
 'apn_name',
 'wagApn_name',
 'sgwApn_name',
 'gtpMessage_gwName',
 'gtpMessage_messageType',
 'gw_name',
 'qci',
 'arp',
 'gw_type',
 'peer_chassis',
 'peer_name',
 'buffer_chassis',
 'buffer_slot',
 'bearerUsageStatsNew_gatewayType',
 'bearerUsageStatsNew_gatewayName',
 'bearerUsageStatsNew_qci',
 'networkContext_name',
 'uePoolStatistics_name',
 'networkContextStatistics_name',
 'ueIpSubPoolStatistics_name',
 'interfaceStatistics_name',
 'interfaceUsageStatsNew_gatewayType',
 'interfaceUsageStatsNew_gatewayName',
 'interfaceUsageStatsNew_interfaceType',
 'interfaceUsageStatsNew_interfaceName',
 'chassis_id',
 'card_id',
 'portstats_name',
 'summary_chassis',
 'summary_slot',
 'connection_chassis',
 'connection_slot',
 'module-stats_chassis',
 'module-stats_slot',
 'systemCpuStats_chassis',
 'systemCpuStats_slot',
 'systemCpuStats_cpu',
 'systemCpuStats_core',
 'systemMemoryStats_chassis',
 'systemMemoryStats_slot',
 'systemMemoryStats_cpu',
 'diskPerfStatus_diskname',
 'resultCodeStatistics_intfType',
 'resultCodeStatistics_nodeName',
 'resultCodeStatistics_peerName',
 'resultCodeStatistics_resultCode',
 'pgw_name',
 'stats_slot',
 'stats_cpuNum',
'segment_chassis',
 'session_chassis',
 'detail_chassis',
 'detail_chassis',
 'contentCategorization_chassis',
 'contentCategorization_slot',
 'contentCategorization_chassis',
 'contentCategorization_slot',
 'detail_chassis',
 'detail_chassis',
 'failure_chassis',
 'latencyStats_chassis',
 'overloadControl_chassis',
 'tcpOptz_chassis',
 'tcpOptz_slot',
 'bytes_chassis',
 'optzsavings_chassis',
 'perf-stats_chassis',
 'perf-stats_slot',
 'perf-stats_chassis',
 'natModuleStats_chassis',
 'all_chassis',
 'all_chassis',
 'pluginEntry_chassis',
 'pluginEntry_chassis',
 'advanced_chassis',
 'basic_chassis',
 'systemLoadStats_chassis',
 'systemLoadStats_slot',
 'systemPfmStats_chassis',
 'systemPfmStats_slot',
 'server-side_chassis',
 'tac_chassis',
 'tcpReOrderPerfStats_chassis',
 'tcpReOrderPerfStats_chassis',
 'tcpReOrderPerfStats_slot',
 'rateLimiting_chassis',
 'detail_chassis',
 'dns_chassis',
 'failure_chassis',
 'detail_chassis',
 'dns_chassis',
 'failure_chassis',
 'quic_chassis',
 'quic_chassis',
 'rateLimiting_chassis',
 'optzsavings_chassis',
 'perfStats_chassis',
 'perfStats_chassis',
 'perfStats_slot',
'compression_grid',
 'compression_grid',
 'premptDns_grid',
 'detail_grid',
 'summary_grid',
 'summary_grid',
 'summary_grid',
 'detail_grid',
 'wapgw_grid',
 'general_grid',
 'general_grid',
 'tethering_grid',
 'HttpTransactions_grid',
               'uptime_id',
 'uptime_nodeNumber',
 'uptime_cpuNumber',
       'statistics_cpuNumber',
             'dns_name',
'edrBearerAggrStats_networkContext', 'edrBearerAggrStats_name',
'edrFlowAggrStats_networkContext', 'edrFlowAggrStats_name',
'edrHttpAggrStats_networkContext', 'edrHttpAggrStats_name',
'edrPilotAggrStats_networkContext', 'edrPilotAggrStats_name',
'edrRTTAggrStats_networkContext', 'edrRTTAggrStats_name',
'edrServerAggrStats2_networkContext', 'edrServerAggrStats2_name',
'edrSessionAggrStats_networkContext', 'edrSessionAggrStats_name',
'applicationAttrs_apn', 'applicationAttrs_groupName', 'applicationAttrs_applicationName', 'applicationAttrs_attributeName',
'flow-attrs_apn', 'flow-attrs_name',
'protocolAttrs_apn', 'protocolAttrs_groupName', 'protocolAttrs_protocolName', 'protocolAttrs_attributeName',
'tdci-stats_apn', 'tdci-stats_method',
               'geoGroup_name',
'pcscfAddressInUse_apnName', 'pcscfAddressInUse_addressName',
 'saegwPgw_name', 'saegwPgw_name','pluginEntry_name','pluginEntry_name','serviceDataContainerUsageStatsNew_gatewayType', 
               'serviceDataContainerUsageStatsNew_gatewayName' ,
               'statistics_taskname', 'uplaneDns_name'
               
              ])

# dtypes generator
categories_all = {c: 'category' for c in cat_all}


In [None]:
def merge_csv(filenames: list):
    """
    read list of files and create on stringIO file.
    Headers from files removed exept first file
    """
    count = 0
    vfile = io.StringIO(newline='\n')
    for filename in filenames:
        with open(filename, "r") as file:
            if count == 0:
                vfile.write(file.read())
                count+=1
                continue
            vfile.write("\n".join(file.readlines()[1:]))
        count+=1
    # print("vfile=", vfile.getvalue())
    vfile.seek(0) # move cursor to possition 0. begin of file
    return vfile
            


In [None]:
def stat_frames_old(stat_dir: str, period_start: str, period_stop: str, tz: str = "UTC") -> dict:
    """
    creates dictionary of stat frames
    stat_dir: directory with stat files
    period_start: start time in format 2022-04-22T10:00
    period_stop: stop time
    
    """
    parse_period_start = datetime.strptime(period_start, "%Y-%m-%dT%H:%M")
    parse_period_stop = datetime.strptime(period_stop, "%Y-%m-%dT%H:%M")
    selected_files = selectSortFiles(stat_dir, parse_period_start, parse_period_stop)
    
    def read2pd_files(fileNames: list):
        def custom_readcsv(args):
            dateparse_ems = lambda x: pd.to_datetime(x, unit='ms').tz_localize('UTC').tz_convert(tz)
            return pd.read_csv(args, on_bad_lines="skip",
                               sep=',', header=0, index_col=0,
                               parse_dates=['#timeofcollection'],
                               date_parser=dateparse_ems, 
        #                        for autoconverting types
                               dtype=categories_all, na_values=''
                              )
        return pd.concat(map(custom_readcsv, fileNames))
    
    stat = {}
    timeStart = datetime.now()
    for k, v in selected_files.items():
        stat[k] = read2pd_files([os.path.join(DATAs, f) for f in v])
    print("concat time=", datetime.now()-timeStart)
    return stat

### remove_unnamed: removing columns like `Unnamed: 10` from all df

In [None]:
def remove_unnamed(stat: dict):
    """
    removing columns like `Unnamed: 10` from all df
    """
    for key in stat.keys():
        for columnName in stat[key].columns:
            if columnName.find('Unnamed:') > -1:
                stat[key].drop(columns=columnName, inplace=True)


### stat_frames: main func to parse CSVs

In [None]:
def stat_frames(stat_dir: str, period_start: str, period_stop: str, tz: str = "UTC") -> dict:
    """
    using merge_csv function to merge files in one virtual before puting into df. This working
    quicker then df concat
    
    creates dictionary of stat frames
    stat_dir: directory with stat files
    period_start: start time in format 2022-04-22T10:00
    period_stop: stop time
    
    """
    parse_period_start = datetime.strptime(period_start, "%Y-%m-%dT%H:%M")
    parse_period_stop = datetime.strptime(period_stop, "%Y-%m-%dT%H:%M")
    selected_files = selectSortFiles(stat_dir, parse_period_start, parse_period_stop)
    
    stat = {}
    timeStart = datetime.now()
    num_tabels = len(selected_files)
    count = 0
    for table, filenames in selected_files.items():
        # print("working on table=", table)
        vfile = merge_csv([os.path.join(stat_dir, f) for f in filenames])
        stat[table] = read_ems_csv(vfile, dtype=categories_all, tz=tz)
        vfile.close()
        count +=1
        print(f'done: {count}/{num_tabels}', end='\r', flush=True) # progress bar                      
    print("put in dfs time=", datetime.now()-timeStart)
    
    remove_unnamed(stat)
    return stat

## Table manipulation
### find_cat_counters:

In [15]:
def find_cat_counters(table: dict, unique=50):
    possible_col = []
    possible_col_less = []
    for name, df in table.items():
        obj_columns = [col for col in df.columns if pd.api.types.is_object_dtype(df[col])]
        int_columns = [col for col in df.columns if pd.api.types.is_integer_dtype(df[col])]
        print(f'----- table= {name} -------')
        if len(obj_columns) > 0:
            print(f'contains objects: {obj_columns}')
        if len(int_columns) > 0:
            for col in int_columns:
                if (len(df[col].unique()) < unique) and (df[col].sum()>0) :
                    print(f'possible cat= {col}')
                    test_word = col.lower()
                    if (test_word.find('chassis') > -1) or (test_word.find('slot') > -1) or (test_word.find('_grid') > -1) or (test_word.find('_nodenumber') > -1) or (test_word.find('_cpunumber') > -1):
                        possible_col.append(col)
                    if test_word.startswith('num') or (test_word.find('_min') > -1) or (test_word.find('_max') > -1) or (test_word.find('_avg') > -1):
                        continue
                    else:
                        possible_col_less.append(col)
    return (possible_col, possible_col_less)
    

## DF manipulation
### scrab: removing from view some columns:
- columns where there is `0` in data (as sum)
- columns with unique values

In [16]:
def scrab(df: pd.DataFrame) -> pd.DataFrame:
    _df = df.copy()
    count_before = len(df.columns)
    cat_columns = [col for col in _df.columns if type(_df[col].dtypes) is pd.CategoricalDtype]
    cat_unique = [col for col in cat_columns if len(_df[col].unique()) < 2]
    _df.drop(columns=cat_unique, inplace=True)
#     TODO: get objects columns
    
    # if _df.index.is_unique != True:
    #     print('WARNING:', 'index in dataframe not unique')
    # new_df = df.loc[:, [(df[col]>0).any() for col in df.columns ]   
#     in if statemet used lazy condition
    filter =[col for col in _df.columns if col in cat_columns or (_df[col].sum()>0)]
    new_df = _df.loc[:, filter ]
    logging.debug(f"scrab result columns before: {count_before}, after: {len(new_df.columns)}")
    # print(f"scrab result columns before: {count_before}, after: {len(new_df.columns)}")
    return new_df


### find_columns: return list of columns with a search sub word in column name

In [None]:
def find_columns(df: pd.DataFrame, word: str) -> list:
    return [col for col in df.columns if col.lower().find(word) > -1]

### info: mostly interesting params of df

In [3]:
df_info = namedtuple('df_info', ['unique', 'monotonic', 'start', 'stop', 'len'])
def info(df: pd.DataFrame) -> namedtuple:
    start, stop = df.index[0], df.index[-1]
    all_dates = pd.date_range(start=start, end=stop, freq='5min', tz='UTC')
    unique, monotonic = df.index.is_unique, df.index.is_monotonic_increasing
    lenn = df.shape[0]

    # logging.info(f'is_unique= {unique} is_monotonic= {monotonic} len= {lenn}')
    # logging.info(f'start= {start} stop= {stop}')
    if unique and monotonic:
        logging.warning(f'missing dates= {all_dates.difference(df.index)}')
    return df_info(unique, monotonic, start, stop, lenn)

### top_corr: print top corr to some column

In [None]:
def top_corr(df: pd.DataFrame, column: str, top_num=20):
    _df = dataScrab(df)             # removing columns with 0, to have smaller df
    corr = _df.corr()
# unstack creates multiindex, so just select on first one
    print(f'----- shape= {df.shape} -----')
    print(corr.unstack()[column].sort_values(ascending=False).head(top_num))
    print('----- top negative ------')
    print(corr.unstack()[column].sort_values(ascending=True).head(top_num))

### top_corrwith: print top corr to some column
correlate df with series and applying optionaly lag  
`+` lag means shift forward series - first lines is NaN  
`-` lag means shift backward series - last lines is NaN  

In [None]:
def top_corrwith(df: pd.DataFrame, series: pd.Series, lag=0, top_num=20):
    _df = dataScrab(df)
    print(f'----- shape= {df.shape} -----')
    if lag != 0:
        print('------ lag="+" means first lines in serias is NaN -----')
    print(_df.corrwith(series.shift(lag)).sort_values(ascending=False).head(top_num))
    print('----- top negative ------')
    print(_df.corrwith(series.shift(lag)).sort_values(ascending=True).head(top_num))


### pivot: usind pivod method creating table with unique and monotonic index

In [None]:
def pivot(df: pd.DataFrame) -> pd.DataFrame:
    _df_t =  scrab(df.copy())
    cat_columns = [col for col in _df_t.columns if pd.api.types.is_categorical_dtype(_df_t[col])]
    obj_columns = [col for col in _df_t.columns if pd.api.types.is_object_dtype(_df_t[col])]
    assert len(obj_columns) == 0, f'there is obj_columns= {obj_columns}'
            
    _df_new = _df_t .pivot(columns=cat_columns)
    assert _df_new.index.is_unique == True, 'index in new df not unique'
    if not _df_new.index.is_monotonic:
        warnings.warn('df not monotonic')
        
    return scrab(_df_new)

### corrwith: 
Notes:
function dependency ==> correlation
but
correlation !=> function dependency

In [None]:
def corrwith(df: pd.DataFrame, series: pd.Series, diff=1, lag=0, 
                 method='pearson') -> tuple:
    if not df.index.is_unique:
        _df = pivot(df)
    else:
        _df = scrab(df.copy())
    if not _df.index.is_monotonic:
        warnings.warn('df not monotonic')
        
    assert series.index.is_unique, 'reference series NOT unique'
    _sr = series.copy()
    
    if info(_df) != info(_sr):
        logging.warning('-- main params not same for df and sr --')
        info(_df)
        logging.warning('----- series -----')
        info(_sr)
    
    # if NaN present could happening situation when there is small number of points is present
    # as result correlation is high on this small number
# TODO:
# - fillna with random noise
# - remove from _df columns with na values bigger then in _sr
    _df0 = _df.fillna(0)
    _df0 = _df0.diff(diff)
    _sr = _sr.diff(diff)

    corr = _df0.corrwith(_sr.shift(lag), drop=True, method=method).dropna()
        
    return (corr, _df)

#### corrwith_old: 

In [17]:
def corrwith_old(df: pd.DataFrame, series: pd.Series, diff=1, lag=0, 
                  printt=True, method='pearson') -> tuple:
    if not df.index.is_unique:
        _df = pivot(df)
    else:
        _df = scrab(df.copy())
    if not _df.index.is_monotonic:
        warnings.warn('df not monotonic')
        
    assert series.index.is_unique, 'reference series NOT unique'
    _sr = series.copy()
    
    if info(_df, printt=False) != info(_sr, printt=False):
        warnings.warn('main params not same for df and sr')
        info(_df)
        print('----- series -----')
        info(_sr)
    
    # if NaN present could happening situation when there is small number of points is present
    # as result correlation is high on this small number
# TODO:
# - fillna with random noise
# - remove from _df columns with na values bigger then in _sr
    _df0 = _df.fillna(0)
    _df0 = _df0.diff(diff)
    _sr = _sr.diff(diff)

    corr = _df0.corrwith(_sr.shift(lag), drop=True, method=method).dropna()
    # corr = _df0.corrwith(_sr.shift(lag), drop=False, method=method).dropna()
    # corr_top_p = corr.sort_values(ascending=False).head(top_num)
    # corr_top_n = corr.sort_values(ascending=True).head(top_num)
    # if printt:
    #     print(f'----- shape _df0= {_df0.shape}; _sr= {_sr.shape} -----')
    #     if lag != 0:
    #         print('------ lag="+" means first lines in serias is NaN -----')
    #     print(corr_top_p)
    #     print('----- top negative ------')
    #     print(corr_top_n)
        
    return (corr, _df)

### plot_corr: helper create mutiple plots for corr 

In [None]:
def plot_corr2(x: str, stat_unique: dict = None,
               dates: pd.IndexSlice = pd.IndexSlice[:], axvlines=[]):
    _df = stat_unique[x['table']]
    ax = _df.loc[dates, x['index']].plot(
        legend=True, ax=x['ax'], ylabel=x['ylabel'])
    for line in axvlines:
        ax.axvline(line, color="red", linestyle="--")
    print(f'{x["ylabel"]} {x["table"]} {x["index"]}')
    


In [18]:
def plot_corr(corr_all: pd.DataFrame, corr_filter: pd.Series,
              testdf: pd.DataFrame, stat_unique: dict,
              dates: pd.IndexSlice = pd.IndexSlice[:], axvlines=[]):
    width, height = 15, 3

    c = corr_all[corr_filter].copy()
    assert c.shape[0] > 0, 'no correlations with requested corr_filter, check filter'
    c.sort_values('corr', key=np.abs, inplace=True)

    rows = c.shape[0]
    fig, axes = plt.subplots(rows+1, 1, figsize=(width, rows*height))
    # plotting ref series
    ax = testdf[dates].plot(legend=True, ax=axes[0], ylabel='ref')
    for line in axvlines:
        ax.axvline(line, color="red", linestyle="--")

    # ploting correlated series
    c['ax'], c.loc[:, 'ylabel'] = axes[1:], range(1, rows+1) # adding params to df for plotting
    c.apply(plot_corr2, stat_unique=stat_unique, dates=dates,
            axvlines=axvlines, axis='columns')

In [None]:
def plot_corr_old(corr_all: pd.DataFrame, corr_filter: pd.Series, 
              testdf: pd.DataFrame, stat_unique: dict,
              dates: pd.IndexSlice =pd.IndexSlice[:], axvlines=[]):
    width, height = 15, 3 
    
    c = corr_all.copy()
    c = corr_all[corr_filter]
    assert c.shape[0] > 0, 'no correlations with requested corr_filter, check filter'
    c.sort_values('corr', key=np.abs, inplace=True)
    
    rows = c.shape[0]
    fig, axes = plt.subplots(rows+1, 1, figsize=(width, rows*height))
    testdf[dates].plot(legend=True, ax=axes[0], ylabel='0')
    i = 1
    for _, row in c.iterrows():
        df = stat_unique[row['table']]
        ax = df.loc[dates, row['index']].plot(legend=True, ax=axes[i], ylabel=i)
        for line in axvlines:
            ax.axvline(line, color="red", linestyle="--")
        print(f'{i} {row["index"]} {row["table"]}')
        i=i+1
#     print c df for easy selections
    # c.index = range(1,rows+1)
    # display(c)
# correct method, but I don't know how to change arg for diff rows
# c.apply(lambda x: plot_corr(stat_unique, x), axis='columns')

### corrAll:
creating corr matrix for all tables
stat: dict - with all tables, before pivot  
sr: pd.Series - test series, to correlate with  
-> tuple - (pd.DataFrame, dict) - df in format `index 	corr 	table`, dict contains  stat_unique table (after pivot)


In [None]:
def corrAll(stat: dict, sr: pd.Series) -> tuple:
    stat_unique = {}
    corr_all = pd.DataFrame(columns=['index', 'corr', 'table'])
    num_tabels = len(stat)
    count = 0
    for k, v in stat.items():
        try:
            corr, stat_unique[k] = corrwith(v, sr)
        except Exception as e:
            logging.error(f'error in k= {k}')
            logging.error(f'{e}')
            break

        dfcorr = pd.DataFrame({'index': corr.index.values, 'corr':corr.values})
        dfcorr['table'] = k
        corr_all = pd.concat([corr_all, dfcorr], ignore_index=True)
        count +=1
        print(f'done: {count}/{num_tabels}, finished {k}', end='\r', flush=True)
    print(corr_all.shape)
    return corr_all, stat_unique
    