In [1]:
# import json
import pandas as pd
from datetime import datetime, timedelta
from dateutil.parser import parse # to convert string date to pyton Date type so that we can compare dates with each other
# from pathlib import Path
import glob # to get the names and paths of the files in a directory/folder 

# 1. Class which Convert records csv to into alarms

In [2]:
# format csvs

class CSV2Alarms:
    """
        Summary:
        Record mean 1 row of the csv file. 
        This class converts records into alarms. 
    """
    def __init__(self,config): #done
        self.config = config
        print(f'>>Input file: {self.config["dir"]+self.config["in_fname"]}')
         
        self.df = None
        # reading raw csv fiile
        try:
            self.df = pd.read_csv(self.config["dir"]+self.config["in_fname"], delimiter= ";" , usecols=self.config["cols"], encoding = "ISO-8859-1")
        except Exception as e:
            # print(f"\n\n !!!!!!!!!!!!!!!!!! Excetpion 1  {e} !!!!!!!!!!!!!!!!!!!!!!!!!!")
            a = 1
        
        try:
            self.df = pd.read_csv(self.config["dir"]+self.config["in_fname"], delimiter= "," , usecols=self.config["cols"], encoding = "ISO-8859-1")
        except Exception as e:
            a =1
            # self.df = pd.read_csv(self.config["dir"]+self.config["in_fname"], encoding = "ISO-8859-1")
            # print(f"\n\n !!!!!!!!!!!!!!!!!! Excetpion 2 {e} !!!!!!!!!!!!!!!!!!!!!!!!!!")
        print(">>File is read.")

    # removing the extra chars and returning the python Date   
    def __changeDate(self,d): # done
        d = d.replace(".000000000","") # removing extra zeros from the string date
        d = d.replace("/","-") 
        return parse(d) # converting string to a python Date type e.g., float('5')

    # Returning the alarm message type
    def __getMessageType(self,message): # done
        if message.find("Recover") != -1:  # finding the deactivation message
            return "Recover"
        elif message.find("NR") != -1: 
            return "NR"
        else:
            return "Activation" 
    
    def __getAlarmsFromDFs(self,df_start, df_end): # done
        alarms = []
        start_records = [v for v in sorted(df_start.to_dict(orient="records"), key=lambda arg: arg["EventTime"], reverse=False)] # sortting the dataframes rows in ascending order and storing them in a list
        end_records = [v for v in sorted(df_end.to_dict(orient="records"), key=lambda arg: arg["EventTime"], reverse=False)]

        i = 0
        j = 0
        # print("End len",len(end_records), "Start len", len(start_records))

        while j < len(end_records): # ingore the row/entry if the end time is lesser to start time
            # print(i,j)
            if len(start_records)>0 and end_records[j]["EventTime"] < start_records[i]["EventTime"]:
                j += 1
            else:
                break

        while i < len(start_records): # find the one to one correspondence between startingtime and its coressponding most recent end time.
            
            if j <len(end_records) and start_records[i]["EventTime"] <= end_records[j]["EventTime"]:
                if i+1 < len(start_records) and start_records[i+1]["EventTime"] < end_records[j]["EventTime"]: # check for the next record
                    i += 1
                    continue
                alarm = {k: v for k, v in start_records[i].items()} # an alarm dictionary
                alarm["StartTime"] = alarm["EventTime"]
                alarm["EndTime"] = end_records[j]["EventTime"]
                alarm["EndMessage"] = end_records[j]["Message"]
                del alarm["EventTime"]
                alarms.append(alarm)
                j += 1
            elif j <len(end_records) and start_records[i]["EventTime"] > end_records[j]["EventTime"]:
                j +=1
                continue   
                
            i += 1

        return alarms
    
    def __convertRecordsToAlarmsV1(self,df_source): # done 
        """ Convert records from the same source to proper alarms with start and end time.   

            The record which contains "Recover" or "NR" in the Message column shows the deactivations. 

        Parameters
        ----------
        records : list of dict
            Each dict represent either activation of an alarm or deactivation of an alarm.  

        Returns
        -------
        alarms : list of dict
            Each dict in the list is an alarm with the StartTime and EndTime of an alarm. 
        """
        alarms = []
        for condition in df_source["Condition"].unique():
            df_condition = df_source.loc[df_source['Condition'].isin([condition])]
            df_start = df_condition.loc[df_condition['MessageType'].isin([
                                                                        "Activation"])]
            end_types = [t for t in df_condition["MessageType"].unique() if t !=
                    "Activation"] # NR, Recover, NR+Reocver
            # print(types)
            df_end = df_condition.loc[df_condition['MessageType'].isin(end_types)]
            alarms += self.__getAlarmsFromDFs(df_start, df_end)
        return alarms
    
    
    def formatCSV(self): # done
        print(">>Column  Types: ", end="")
        for col in self.df.columns:
            print(col, type(self.df[col][0]), end=", ")
            if isinstance(self.df[col][0],str):
                try:
                    self.df[col] = self.df[col].apply(lambda s: " ".join(s.split())) # removing the extra spaces in a column 
                except Exception as e:
                    print(f"\n\n !!!!!!!!!!!!!!!!!! Excetpion {e} !!!!!!!!!!!!!!!!!!!!!!!!!!")

        print(type(self.df["EventTime"][0]))
        self.df["EventTime"] = self.df["EventTime"].apply(self.__changeDate)
        self.df["MessageType"] = self.df["Message"].apply(self.__getMessageType)
        # self.df["Month"] = self.df["EventTime"].apply(lambda arg: arg.month)
        print(f">>Before Filtering ACKS: {self.df.shape}")
        self.df = self.df.loc[self.df['Message'].map(lambda arg: arg.find(self.config["ack-filter"])) == -1]  # removing the Acknowledgements
        print(f">>After Filtering ACKS: {self.df.shape}") 
        

        fpath = self.config["dir"] + self.config["formated_fname"]
        self.df.to_csv(fpath, index=False) # storing pandas dataframe into a csv file
        return self.df
    
    def readFormattedCSV(self): # done
        fpath = self.config["dir"] + self.config["formated_fname"]
        self.df = pd.read_csv(fpath, low_memory=False, parse_dates=["EventTime"])
        print(f">> Formating is complete. Outfile: {fpath}")
        return self.df

    def convertRecords2Alarms(self,df): # done
        # df = pd.read_csv(p, low_memory=False, usecols=cols,parse_dates=["EventTime"])

        assert len(df["MachineName"].unique()) == 1 # all the alarms should be related to the same unit
        
        alarms = []
        sources_ranks_dict = df['SourceName'].value_counts()
        id = 0 # for debugging
        for sname in sources_ranks_dict.keys(): # for each source find the activation and deactivation times and covert them to alarms
            id += 1
            # TIC4544
            df_sname = df.loc[df['SourceName'].isin([sname])] # source DF
            # types_rank_dict = df_sname["MessageType"].value_counts() # source ranks
            # total = 0
            # for key in types_rank_dict.keys():
            #     total += types_rank_dict[key]
            # assert(total== sources_ranks_dict[sname]) # sum is equal to count 
            print(f"Test:{sname}")
            source_alarms = self.__convertRecordsToAlarmsV1(df_sname) # getting alarms related to one source (sname)
            print("end")
            alarms += source_alarms
        
            print(f"[{id}]Source:{sname}, Conditions:{df_sname['Condition'].unique()}, Total Alarms:{len(source_alarms)}")

        df_out = pd.DataFrame(alarms)
        df_out["TimeDelta"] = df_out[["StartTime", "EndTime"]].apply(lambda arg: timedelta.total_seconds(arg[1]-arg[0]) , axis=1)
        df_out["Year-Month"] =df_out["StartTime"].apply(lambda arg: (arg.year,arg.month))

        file_path = self.config["dir"]+self.config["alarm_out_fname"] 
        df_out.to_csv(file_path, index = False)
        print(f">>Conversion from records to alarms is complete. Outputfile : {file_path}, Info : {df_out.info()}")
        return df_out


In [3]:

# def getPlantShutDownPeriods(filepath):
#     df_feed = pd.read_excel(filepath)
    
#     feed_records = [v for v in sorted(df_feed.to_dict(orient="records"), key=lambda arg: arg["TimeStamp"], reverse=False)]
#     plant_shut_down_periods = []

#     start_time = None
#     start_flag = True

#     for i in range(len(feed_records)):
#         if feed_records[i]["47FIC011.PV"] <=50 and start_flag ==True:
#             start_time = feed_records[i]["TimeStamp"]
#             start_flag = False
#             # print(f"{start_time},{feed_records[i]['47FIC011.PV']}  ", end=" ")
#         elif feed_records[i]["47FIC011.PV"] >50 and i>0 and feed_records[i-1]["47FIC011.PV"]<=50:
#             plant_shut_down_periods.append((start_time,feed_records[i]["TimeStamp"])) 
#             start_flag =True
#             # print(f'{feed_records[i]["TimeStamp"]},{feed_records[i-1]["47FIC011.PV"]},{feed_records[i]["47FIC011.PV"]}')


#     return plant_shut_down_periods



# def monthlyAlarms2SingleFile(config):
#     alarms_dir, out_file_path ,cols = config["dir"], config["out_file"], config['cols']
#     fps = [f for f in glob.glob(alarms_dir+ "*.csv")]
#     print(f">> Files to process {fps}")
#     dfs_list = []
#     for f in fps:
#         print(f">> === File: {f.split('/')[-1]}")
#         df = pd.read_csv(f, usecols = cols ,parse_dates = ["StartTime","EndTime"])
#         dfs_list.append(df)

#     df = pd.concat(dfs_list, ignore_index=True)
#     df.to_csv(out_file_path, index=False)
#     return df

# 2. Configuration of input dirs and output dirs and file names

In [10]:
ACK_FILTER = "ACK"
# PLANT = "plant_33"
in_config = {
    "dir": "../data/",
    "csv_names" : [f.split('/')[-1] for f in glob.glob(f"../../data/processed/alarms/"+ "*.csv") if f.find("f-pre-1") !=-1], # list comprehension
    "in_fname": None, # input file name
    "formated_fname":  None, 
    "alarm_out_fname": None, 
    "ack-filter":ACK_FILTER,
    'cols':["MachineName","SourceName","EventTime", "Message","Condition"]

}

print(in_config)

out_config = {
    "dir": "../../data/processed/alarms/", 
    "out_file": "../../data/processed/final/all-months-alarms-with-feed.csv",
    "cols": ["SourceName", "Condition","StartTime", "EndTime","TimeDelta","Year-Month"]
}


{'dir': '../data/', 'csv_names': ['f-pre-1-2019_10.csv', 'f-pre-1-2019_6.csv', 'f-pre-1-2019_4.csv', 'f-pre-1-2019_8.csv', 'f-pre-1-2019_12.csv', 'f-pre-1-2020_03.csv', 'f-pre-1-2019_5.csv', 'f-pre-1-2020_01.csv', 'f-pre-1-2019_11.csv', 'f-pre-1-2020_02.csv', 'f-pre-1-2019_3.csv', 'f-pre-1-2019_7.csv', 'f-pre-1-2019_9.csv'], 'in_fname': None, 'formated_fname': None, 'alarm_out_fname': None, 'ack-filter': 'ACK', 'cols': ['MachineName', 'SourceName', 'EventTime', 'Message', 'Condition']}


In [None]:
alarm = None
temp_df = None
obj = None
# for f in in_config["csv_names"]:

#     in_config['in_fname'] =  "raw/"+f
#     in_config['formated_fname'] = "raw/formatted_"+f
#     in_config["alarm_out_fname"] = "alarms/alarms_"+f
#     obj  = CSV2Alarms(in_config)
#     temp_df = obj.formatCSV()
    

for f in in_config["csv_names"]: # reading csv files and first its formatting the csv files (i.e., removing extra zeros etc) and finally convertng the rows/records to alarms and storing back into a csv file in the processed folder

    in_config['in_fname'] =  "raw/"+f
    in_config['formated_fname'] = "raw/formatted_"+f
    in_config["alarm_out_fname"] = "alarms/alarms_"+f
    obj = CSV2Alarms(in_config)
    # temp_df = alarm.formatCSV()
    temp_df = obj.readFormattedCSV()
    df_alarms = obj.convertRecords2Alarms(temp_df)


print(">> Complete")

In [11]:

# plant_shut_down_periods = getPlantShutDownPeriods("../../data/feed.xlsx")
# plant_shut_down_periods
# # df_f_month = pd.read_csv()


[(Timestamp('2018-01-12 20:00:00'), Timestamp('2018-01-31 19:00:00')),
 (Timestamp('2018-01-31 20:00:00'), Timestamp('2018-02-02 03:00:00')),
 (Timestamp('2018-04-11 18:00:00'), Timestamp('2018-04-17 14:00:00')),
 (Timestamp('2018-04-17 19:00:00'), Timestamp('2018-04-18 13:00:00')),
 (Timestamp('2018-05-05 15:00:00'), Timestamp('2018-05-08 15:00:00')),
 (Timestamp('2018-05-08 16:00:00'), Timestamp('2018-05-09 14:00:00')),
 (Timestamp('2018-05-09 15:00:00'), Timestamp('2018-05-09 17:00:00')),
 (Timestamp('2018-05-09 18:00:00'), Timestamp('2018-05-10 14:00:00')),
 (Timestamp('2018-05-10 17:00:00'), Timestamp('2018-05-11 17:00:00')),
 (Timestamp('2018-05-11 18:00:00'), Timestamp('2018-05-12 13:00:00')),
 (Timestamp('2018-05-12 14:00:00'), Timestamp('2018-05-12 16:00:00')),
 (Timestamp('2018-05-12 17:00:00'), Timestamp('2018-05-13 09:00:00')),
 (Timestamp('2018-05-13 17:00:00'), Timestamp('2018-05-14 09:00:00')),
 (Timestamp('2018-05-14 16:00:00'), Timestamp('2018-05-15 09:00:00')),
 (Time

In [12]:
df_formatted = pd.read_csv("../../data/processed/alarms/f-pre-1-2019_5.csv")
df_formatted

Unnamed: 0,MachineName,ServerProgID,ServerNodeName,SubscriptionName,SourceName,EventTime,EventTimeMS,Severity,Message,Quality,...,ActiveTime,ActiveTimeMS,Cookie,ActorID,Attributes,Area,Status,Shelving,Priority,MessageType
0,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,48TIC2026,2019-05-01 17:21:14.000,569043200,500,48TIC2026 E208 CKS TEMIZ MDEA PV = -5.3 C VEL+...,192,...,2019-05-01 17:21:12.000,549043200,10591992,,"[4354,""FCS0105"",""48TIC2026"",""PV"",-5.3238401412...",FCS0105,0,0,,Recover
1,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,48TIC2026,2019-05-01 17:21:15.000,579043200,500,48TIC2026 E208 CKS TEMIZ MDEA PV = -4.9 C VEL-...,192,...,2019-05-01 17:21:14.000,569043200,10591992,,"[4354,""FCS0105"",""48TIC2026"",""PV"",-4.9354796409...",FCS0105,0,0,,Recover
2,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,48TIC2026,2019-05-01 17:21:17.000,599043200,500,48TIC2026 E208 CKS TEMIZ MDEA PV = -5.0 C VEL+...,192,...,2019-05-01 17:21:15.000,579043200,10591992,,"[4354,""FCS0105"",""48TIC2026"",""PV"",-4.9814286231...",FCS0105,0,0,,Recover
3,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,47TI931A,2019-05-01 17:21:22.000,649043200,500,47TI931A C5 1.YTK GIRDISCAP SICAK PV = 691 C I...,192,...,2019-05-01 17:21:01.000,439043200,10591992,,"[4354,""FCS0102"",""47TI931A"",""PV"",690.9027,0,"""",...",FCS0102,1,0,,Recover
4,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,48TIC2026,2019-05-01 17:21:24.000,669043200,500,48TIC2026 E208 CKS TEMIZ MDEA PV = -4.9 C VEL-...,192,...,2019-05-01 17:21:23.000,659043200,10591992,,"[4354,""FCS0105"",""48TIC2026"",""PV"",-4.8875885009...",FCS0105,0,0,,Recover
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1766666,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,48TIC2026,2019-05-30 23:59:15.000,-1978330240,500,48TIC2026 E208 CKS TEMIZ MDEA PV = -5.2 C VEL+...,192,...,2019-05-30 23:59:13.000,-1998330240,4497144,,"[4354,""FCS0105"",""48TIC2026"",""PV"",-5.1514225006...",FCS0105,1,0,,Recover
1766667,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,47TI931A,2019-05-30 23:58:54.000,2106637056,500,47TI931A C5 1.YTK GIRDISCAP SICAK PV = 691 C I...,192,...,2019-05-30 23:58:52.000,2086637056,4497144,,"[4354,""FCS0102"",""47TI931A"",""PV"",690.9027,0,"""",...",FCS0102,1,0,,Recover
1766668,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,48TIC2026,2019-05-30 23:58:58.000,2146637056,500,48TIC2026 E208 CKS TEMIZ MDEA PV = -5.0 C VEL+...,192,...,2019-05-30 23:58:56.000,2126637056,4497144,,"[4354,""FCS0105"",""48TIC2026"",""PV"",-5.0422401428...",FCS0105,1,0,,Recover
1766669,PHD47B,Yokogawa.ExaopcAECS1.1,STN0830,Yokogawa.ExaopcAECS1.1,48TIC2026,2019-05-30 23:57:02.000,986637056,500,48TIC2026 E208 CKS TEMIZ MDEA PV = -5.2 C VEL+...,192,...,2019-05-30 23:57:01.000,976637056,4497144,,"[4354,""FCS0105"",""48TIC2026"",""PV"",-5.1540508270...",FCS0105,1,0,,Recover


In [None]:
# df = df_formatted 
# print(type(df["EventTime"][0]))
# # df = 
# for t in plant_shut_down_periods:
#     start_remove, end_remove = t[0],t[1]
#     df = df.loc[(df.EventTime < start_remove) | (df.EventTime > end_remove)]







<class 'pandas.core.series.Series'>


TypeError: '<' not supported between instances of 'str' and 'Timestamp'