In [1]:
import json
import pandas as pd
from datetime import datetime, timedelta
from dateutil.parser import parse
from pathlib import Path
import glob

In [2]:
# format csvs

class CSV2Alarms:
    """
        Summary:
        Record mean 1 row of the csv file. 
        This class converts records into alarms. 
    """
    def __init__(self,config):
        self.config = config
        print(f'>>Input file: {self.config["dir"]+self.config["in_fname"]}')
         
        self.df = None
        try:
            self.df = pd.read_csv(self.config["dir"]+self.config["in_fname"], delimiter= ";" , usecols=self.config["cols"], encoding = "ISO-8859-1")
        except Exception as e:
            # print(f"\n\n !!!!!!!!!!!!!!!!!! Excetpion 1  {e} !!!!!!!!!!!!!!!!!!!!!!!!!!")
            a = 1
        
        try:
            self.df = pd.read_csv(self.config["dir"]+self.config["in_fname"], delimiter= "," , usecols=self.config["cols"], encoding = "ISO-8859-1")
        except Exception as e:
            a =1
            # self.df = pd.read_csv(self.config["dir"]+self.config["in_fname"], encoding = "ISO-8859-1")
            # print(f"\n\n !!!!!!!!!!!!!!!!!! Excetpion 2 {e} !!!!!!!!!!!!!!!!!!!!!!!!!!")
        
        cols =self.df.columns.values.tolist()
        print(">>File is read.")
        
    def __changeDate(self,d):
        d = d.replace(".000000000","")
        d = d.replace("/","-")
        return parse(d)

    def __getMessageType(self,message):
        if message.find("Recover") != -1:
            return "Recover"
        elif message.find("NR") != -1:
            return "NR"
        else:
            return "Activation"
    
    def __getAlarmsFromDFs(self,df_start, df_end):
        alarms = []
        start_records = [v for v in sorted(df_start.to_dict(
            orient="records"), key=lambda arg: arg["EventTime"], reverse=False)]
        end_records = [v for v in sorted(df_end.to_dict(
            orient="records"), key=lambda arg: arg["EventTime"], reverse=False)]
        i = 0
        j = 0
        # print("End len",len(end_records), "Start len", len(start_records))
        while j < len(end_records):
            # print(i,j)
            if len(start_records)>0 and end_records[j]["EventTime"] < start_records[i]["EventTime"]:
                j += 1
            else:
                break

        while i < len(start_records):
            
            if j <len(end_records) and start_records[i]["EventTime"] <= end_records[j]["EventTime"]:
                if i+1 < len(start_records) and start_records[i+1]["EventTime"] < end_records[j]["EventTime"]: # check for the next record
                    i += 1
                    continue
                alarm = {k: v for k, v in start_records[i].items()}
                alarm["StartTime"] = alarm["EventTime"]
                alarm["EndTime"] = end_records[j]["EventTime"]
                alarm["EndMessage"] = end_records[j]["Message"]
                del alarm["EventTime"]
                alarms.append(alarm)
                j += 1
            elif j <len(end_records) and start_records[i]["EventTime"] > end_records[j]["EventTime"]:
                j +=1
                continue   
                
            i += 1

        return alarms
    
    def __convertRecordsToAlarmsV1(self,df_source):
        alarms = []
        for condition in df_source["Condition"].unique():
            df_condition = df_source.loc[df_source['Condition'].isin([condition])]
            df_start = df_condition.loc[df_condition['MessageType'].isin([
                                                                        "Activation"])]
            end_types = [t for t in df_condition["MessageType"].unique() if t !=
                    "Activation"]
            # print(types)
            df_end = df_condition.loc[df_condition['MessageType'].isin(end_types)]
            alarms += self.__getAlarmsFromDFs(df_start, df_end)
        return alarms
    
    def __convertRecordsToAlarmsOld(self,records):
        """ Convert records from the same source to proper alarms with start and end time.   

            The record which contains "Recover" or "NR" in the Message column shows the deactivations. 

        Parameters
        ----------
        records : list of dict
            Each dict represent either activation of an alarm or deactivation of an alarm.  

        Returns
        -------
        alarms : list of dict
            Each dict in the list is an alarm with the StartTime and EndTime of an alarm. 
        """
        alarms = []  # conainsts alarms with start and end time.
        # for enqueue and deque of records., Needed dictionary because there can be multiple types of alarms from the same source.
        conditions_queues = {}
        alarm = None  # dictionary
        records = [v for v in sorted(
            records, key=lambda arg: arg["EventTime"], reverse=False)]
        for record in records:

            # initiazlize the queue
            if conditions_queues.get(record["Condition"]) == None:
                conditions_queues[record["Condition"]] = []

            # Enqueue the record
            if record["Message"].find("Recover") == -1 and record["Message"].find("NR") == -1:
                conditions_queues[record["Condition"]].append(record)
            else:
                if len(conditions_queues[record["Condition"]]) == 0:
                    continue

                alarm = conditions_queues[record["Condition"]].pop(
                    0)  # Dqueue the record
                alarm = {k: v for k, v in alarm.items()}
                alarm["StartTime"] = alarm["EventTime"]
                alarm["EndTime"] = record["EventTime"]
                alarm["EndMessage"] = record["Message"]
                del alarm["EventTime"]
                alarms.append(alarm)

        return alarms
    
    def formatCSV(self):
        print(">>Column  Types: ", end="")
        for col in self.df.columns:
            print(col, type(self.df[col][0]), end=", ")
            if isinstance(self.df[col][0],str):
                try:
                    self.df[col] = self.df[col].apply(lambda s: " ".join(s.split()))
                except Exception as e:
                    print(f"\n\n !!!!!!!!!!!!!!!!!! Excetpion {e} !!!!!!!!!!!!!!!!!!!!!!!!!!")

        print(type(self.df["EventTime"][0]))
        self.df["EventTime"] = self.df["EventTime"].apply(self.__changeDate)
        self.df["MessageType"] = self.df["Message"].apply(self.__getMessageType)
        # self.df["Month"] = self.df["EventTime"].apply(lambda arg: arg.month)
        print(f">>Before Filtering ACKS: {self.df.shape}")
        self.df = self.df.loc[self.df['Message'].map(lambda arg: arg.find(self.config["ack-filter"])) == -1] 
        print(f">>After Filtering ACKS: {self.df.shape}") 
        

        fpath = self.config["dir"] + self.config["formated_fname"]
        
        self.df.to_csv(fpath, index=False)
        self.df = pd.read_csv(fpath, low_memory=False, parse_dates=["EventTime"])
        print(f">> Formating is complete. Outfile: {fpath}")
        return self.df

    def convertRecords2Alarms(self,df):
        # df = pd.read_csv(p, low_memory=False, usecols=cols,parse_dates=["EventTime"])

        assert len(df["MachineName"].unique()) == 1 # all the alarms should be related to the same unit
        
        alarms = []
        differs = []
        sources_ranks_dict = df['SourceName'].value_counts()
        id = 0 # for debugging
        for sname in sources_ranks_dict.keys():
            id += 1
            df_sname = df.loc[df['SourceName'].isin([sname])] # source DF
            types_rank_dict = df_sname["MessageType"].value_counts() # source ranks
            total = 0
            for key in types_rank_dict.keys():
                total += types_rank_dict[key]
            assert(total== sources_ranks_dict[sname]) # sum is equal to count 
            
            source_alarms = self.__convertRecordsToAlarmsV1(df_sname)
            alarms += source_alarms
        
            print(f"[{id}]Source:{sname}, Conditions:{df_sname['Condition'].unique()}, Total Alarms:{len(source_alarms)}")

        ##comparing it with older algo
            # temp_alarms2 = self.__convertRecordsToAlarmsOld(df_sname.to_dict(orient="records"))
            # if len(source_alarms)-len(temp_alarms2) != 0:
            #     print(">>[{}]Source: {},Conditions:{}".format(id,sname, df_sname["Condition"].unique()), end="=>")
            #     print("ALARMS1:{},Alarms2:{},Diff(new-old):{}".format(len(source_alarms),len(temp_alarms2), len(source_alarms)-len(temp_alarms2)),end="")
            #     print("")

        
            # if (len(source_alarms) != len(temp_alarms2)):
            #     differs.append(sname)
        

        # print(">> Difference in 2 Algos",differs,len(differs))

        # writing to ouptut alarms
        df_out = pd.DataFrame(alarms)
        df_out["TimeDelta"] = df_out[["StartTime", "EndTime"]].apply(lambda arg: timedelta.total_seconds(arg[1]-arg[0]) , axis=1)
        df_out["Year-Month"] =df_out["StartTime"].apply(lambda arg: (arg.year,arg.month))

        file_path = self.config["dir"]+self.config["alarm_out_fname"] 
        df_out.to_csv(file_path, index = False)
        print(f">>Conversion from records to alarms is complete. Outputfile : {file_path}, Info : {df_out.info()}")
        return df_out


In [3]:

ACK_FILTER = "ACK"
config = {
    "dir": "../.data/csvs/",
    # "in_fname": "raw/2019_5.csv",
    # "formated_fname": "raw/formatted_"+"2019_5.csv",
    # "alarm_out_fname": "alarms/alarms_"+"2019_5.csv",
    "ack-filter":ACK_FILTER,
    'cols':["MachineName","SourceName","EventTime", "Message","Condition"]

}

csv_names = [f.split('/')[-1] for f in glob.glob("../.data/csvs/raw/"+ "*.csv") if f.find("formatted")==-1]
alarm = None
temp_df = None
for f in csv_names:
    config['in_fname'] =  "raw/"+f
    config['formated_fname'] = "raw/formatted_"+f
    config["alarm_out_fname"] = "alarms/alarms_"+f
    alarm  = CSV2Alarms(config)
    temp_df = alarm.formatCSV()
    df_alarms = alarm.convertRecords2Alarms(temp_df)

print(">> Complete")

otal Alarms:4
[392]Source:47TI703A-2, Conditions:['HTRP' 'HHH'], Total Alarms:4
[393]Source:47TI2077, Conditions:['LL' 'LO'], Total Alarms:2
[394]Source:47PI807ABC, Conditions:['LTRP' 'LLL'], Total Alarms:4
[395]Source:47XL1553-ANN, Conditions:['ALM'], Total Alarms:3
[396]Source:47TI609, Conditions:['HI'], Total Alarms:3
[397]Source:47LAL701-ANN, Conditions:['ALM'], Total Alarms:3
[398]Source:47XL1554-ANN, Conditions:['ALM'], Total Alarms:3
[399]Source:47FI2002, Conditions:['LO'], Total Alarms:2
[400]Source:47XL1538B-ANN, Conditions:['ALM'], Total Alarms:3
[401]Source:47PALL707-ANN, Conditions:['ALM'], Total Alarms:3
[402]Source:48TI018, Conditions:['HI' 'LL' 'LO'], Total Alarms:2
[403]Source:47PAL708-ANN, Conditions:['ALM'], Total Alarms:3
[404]Source:47TI1765, Conditions:['HI' 'HH'], Total Alarms:3
[405]Source:47TI925B, Conditions:['HI' 'LO'], Total Alarms:2
[406]Source:48FI020, Conditions:['LO' 'LL'], Total Alarms:2
[407]Source:47C56SUM_DT, Conditions:['HI' 'HH'], Total Alarms:2
[40

In [5]:
# alarm2 = CSV2Alarms(config)
# # df_alarms = alarm2.convertRecords2Alarms(df2)
# # df_alarms

In [6]:
def monthlyAlarms2SingleFile(alarms_dir, out_file_path ,cols):
    fps = [f for f in glob.glob(alarms_dir+ "*.csv")]
    print(f">> Files to process {fps}")
    dfs_list = []
    for f in fps:
        print(f">> === File: {f.split('/')[-1]}")
        df = pd.read_csv(f, usecols = cols ,parse_dates = ["StartTime","EndTime"])
        dfs_list.append(df)

    df = pd.concat(dfs_list, ignore_index=True)
    df.to_csv(out_file_path, index=False)
    return df


alarms_dir = "../.data/csvs/alarms/"
output_file = "../.data/all-months-alarms.csv"
cols = ["SourceName", "Condition","StartTime", "EndTime","TimeDelta","Year-Month"]

monthlyAlarms2SingleFile(alarms_dir=alarms_dir, out_file_path=output_file, cols=cols)


>> Files to process ['../.data/csvs/alarms/alarms_2019_3.csv', '../.data/csvs/alarms/alarms_processed_2020_9.csv', '../.data/csvs/alarms/alarms_2019_6.csv', '../.data/csvs/alarms/alarms_processed_2020_10.csv', '../.data/csvs/alarms/alarms_processed_2020_7.csv', '../.data/csvs/alarms/alarms_2020_03.csv', '../.data/csvs/alarms/alarms_processed_2020_11.csv', '../.data/csvs/alarms/alarms_2019_10.csv', '../.data/csvs/alarms/alarms_2019_4.csv', '../.data/csvs/alarms/alarms_2020_02.csv', '../.data/csvs/alarms/alarms_2019_11.csv', '../.data/csvs/alarms/alarms_2019_7.csv', '../.data/csvs/alarms/alarms_2019_8.csv', '../.data/csvs/alarms/alarms_2019_12.csv', '../.data/csvs/alarms/alarms_processed_2018.csv', '../.data/csvs/alarms/alarms_processed_2020_12.csv', '../.data/csvs/alarms/alarms_processed_2020_4.csv', '../.data/csvs/alarms/alarms_2020_01.csv', '../.data/csvs/alarms/alarms_processed_2020_6.csv', '../.data/csvs/alarms/alarms_2019_9.csv', '../.data/csvs/alarms/alarms_processed_2020_5.csv', 

Unnamed: 0,SourceName,Condition,StartTime,EndTime,TimeDelta,Year-Month
0,47TI931A,IOP,2019-03-06 13:19:17,2019-03-06 13:19:33,16.0,"(2019, 3)"
1,47TI931A,IOP,2019-03-06 13:19:35,2019-03-06 13:20:22,47.0,"(2019, 3)"
2,47TI931A,IOP,2019-03-06 13:20:24,2019-03-06 13:20:28,4.0,"(2019, 3)"
3,47TI931A,IOP,2019-03-06 13:20:30,2019-03-06 13:20:49,19.0,"(2019, 3)"
4,47TI931A,IOP,2019-03-06 13:20:51,2019-03-06 13:21:03,12.0,"(2019, 3)"
...,...,...,...,...,...,...
23435875,47HCD-046H-ANN,ALM,2020-08-20 08:58:39,2020-08-20 09:02:07,208.0,"(2020, 8)"
23435876,47LI951,LO,2020-08-12 22:51:27,2020-08-13 15:46:05,60878.0,"(2020, 8)"
23435877,47XL1525A-ANN,ALM,2020-08-22 11:40:12,2020-08-22 11:41:22,70.0,"(2020, 8)"
23435878,47FIC2026,LO,2020-08-21 11:50:10,2020-08-21 11:50:38,28.0,"(2020, 8)"
