In [219]:
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import os
import sys

In [220]:
class ExpLogHandler:
    def __init__(self, log_names):
        self.log_names = log_names
        
    def getData(self):
        self.data = pd.read_csv(self.log_names[0], error_bad_lines=False)
        for i in range(1, len(self.log_names)):
            data_next   = pd.read_csv(self.log_names[i], error_bad_lines=False)
            data_frames = [self.data, data_next]
            self.data   = pd.concat(data_frames, ignore_index=True)
        self.data.drop(self.data.loc[(self.data["Timestamp"] == "Timestamp")].index.tolist())
        self.dataLength = self.data.shape[0]

        wrchckbrdLocs  = self.data.loc[(self.data["Board"] == "M") & (self.data["Mikroe_socket"] == "A") & (self.data["Status"] == "WRCHCKBRD")].index.tolist()
        wrchckbrdIndex = [(i, "WRCHCKBRD") for i in wrchckbrdLocs]
        initLocs       = self.data.loc[(self.data["Board"] == "M") & (self.data["Mikroe_socket"] == "A") & (self.data["Status"] == "INIT")].index.tolist()
        initIndex      = [(i, "INIT") for i in initLocs]
        runIndex       = wrchckbrdIndex + initIndex
        runIndex       = sorted(runIndex)
        runIndex.append((self.dataLength, "END"))
        
        runId  = 1
        runCol = [''] * self.dataLength
        for h in range(len(runIndex) - 1):
            if runIndex[h][1] == "WRCHCKBRD":
                start              = runIndex[h][0]
                end                = runIndex[h+1][0]
                runCol[start:end]  = [runId]*(end-start)
                runId              += 1

        self.data.insert(loc=0, column="Run_ID", value=runCol)


        return self.data

In [221]:
class BeamLogHandler:
    def __init__(self, firstTime, lastTime, targetDir_fullpathList):     
        self.beamLogFnameFormat     = "%Y-%m-%d"
        self.firstTime              = firstTime
        self.lastTime               = lastTime
        self.targetDir_fullpathList = targetDir_fullpathList

    def getLogs(self):
        self.targetDir_fullpathList[-4:] = ["neutrons"]
        firstBeamTime   = datetime.strftime(self.firstTime, self.beamLogFnameFormat)
        print(firstBeamTime)
        lastBeamTime    = datetime.strftime(self.lastTime, self.beamLogFnameFormat)
        print(lastBeamTime)
        beamlog_day_dif = int(datetime.strftime(self.lastTime, "%d")) - int(datetime.strftime(self.firstTime, "%d"))
        self.beamlog    = pd.read_csv(f"../../../neutrons/countlog-{firstBeamTime}.txt", delim_whitespace=True, header=None, skiprows=1)
        self.nextTime   = self.firstTime
        for i in range(0, beamlog_day_dif+1):
            self.nextTime      = self.nextTime + timedelta(days=1)
            self.nextBeamTime  = datetime.strftime(self.nextTime, self.beamLogFnameFormat)
            try:
                nextBeamlog    = pd.read_csv(f"../../../neutrons/countlog-{self.nextBeamTime}.txt", delim_whitespace=True, header=None, skiprows=1)
            except FileNotFoundError:
                pass
            beamlogFrames      = [self.beamlog, nextBeamlog]
            self.beamlog       = pd.concat(beamlogFrames, ignore_index=True)
            
        self.beamlog.columns = ["Date", "HMS_time", "Millisecs", "Count1", "Count2", "Count3", "Count4", "protonCharge", "Beam_current"]
        return self.beamlog

In [222]:
class Run:
    def __init__(self, name, data):
        self.name   = name
        self.valid  = -1
        self.hot    = -1
        self.delay  = -1
        self.num    = [int(s) for s in name.split('_') if s.isdigit()][0]
        self.df     = data.loc[(data['Run_ID'] == self.num)]
        self.errors = -1

    def ctor(self):
        try:
            self.before_delay_i = self.df.loc[(self.df['Mikroe_socket'] == 'D') & (self.df['Status'] == 'STORE_OK')].index[0]
            self.before_delay_t = self.df.loc[self.before_delay_i]['Timestamp']
            self.after_delay_i  = self.df.loc[(self.df['Mikroe_socket'] == 'A') & (self.df['Status'] == 'VERIF')].index[0] + 1
            self.after_delay_t  = self.df.loc[self.after_delay_i]['Timestamp']
            self.before_delay_dt = datetime.strptime(self.before_delay_t, exp_tformat)
            self.after_delay_dt  = datetime.strptime(self.after_delay_t, exp_tformat)
        except IndexError:
            self.valid = 0
        else:
            self.valid = 1

        if self.valid:    
            self.dif = self.after_delay_dt - self.before_delay_dt
            if timedelta(seconds = 0.1) < self.dif < timedelta(seconds = 0.5):
                self.delay = 0.1
            if timedelta(seconds = 0.5) < self.dif < timedelta(seconds = 5):
                self.delay = 1
            if timedelta(seconds = 5) < self.dif < timedelta(seconds = 50):
                self.delay = 10
            if timedelta(seconds = 50) < self.dif < timedelta(seconds = 500):
                self.delay = 100
            if timedelta(seconds = 500) < self.dif < timedelta(seconds = 5000):
                self.delay = 1000
        else:
            return 0

        if self.valid:
            if beamOn(self.before_delay_t, self.after_delay_t):
                self.hot = 1
            else:
                self.hot = 0
        else:
            self.hot = 0

        if self.valid:
            self.error_num = self.df['Status'].value_counts()['SDC']
        else:
            return None 

In [223]:
class Analyser:
    def __init__(self, targetDir_fullpath):
        self.targetDir_fullpath  = targetDir_fullpath
        self.expLogTstampFormat  = "%Y-%m-%d_%H-%M-%S-%f"
        self.beamLogTstampFormat = "%d/%m/%Y %H:%M:%S"
 
    def setup(self):
        self.targetDir_fullpathList = self.targetDir_fullpath.split("/")
        targetDir_trunc             = self.targetDir_fullpathList[-3:]
        self.chip                   = targetDir_trunc[0]
        self.variant                = targetDir_trunc[1]
        self.size                   = targetDir_trunc[2]

        # Get data from logs
        self.log_names     = [self.targetDir_fullpath+i for i in os.listdir(self.targetDir_fullpath) if ".csv" in i]
        self.log_names.sort(key=os.path.getmtime)
        self.ExpLogHandler = ExpLogHandler(self.log_names)
        self.data          = self.ExpLogHandler.getData()

        # Get beam status from logs
        firstTimestamp           = self.data[0:1]['Timestamp'][0]
        lastTimestamp            = self.data[self.data.shape[0]-2:self.data.shape[0]-1]['Timestamp'][self.data.shape[0]-2]
        self.firstTime           = datetime.strptime(firstTimestamp, self.expLogTstampFormat)
        self.lastTime            = datetime.strptime(lastTimestamp, self.expLogTstampFormat)
        self.BeamLogHandler      = BeamLogHandler(self.firstTime, self.lastTime, self.targetDir_fullpathList)
        self.beamlog             = self.BeamLogHandler.getLogs()

        # Create datetime objects from beamlog timestamps
        beamlogTstamps    = (self.beamlog['Date'] + self.beamlog['HMS_time'] + self.beamlog['Millisecs'].apply(str)).tolist()
        self.beamlogTimes = []
        for i in beamlogTstamps:
            if len(i) > 23:
                i = i[0:26]
            self.beamlogTimes.append(i)
        self.beamlogTimes = [datetime.strptime(i, '%d/%m/%Y%H:%M:%S0.%f') for i in self.beamlogTimes]

    def beamOn(self, firstTstamp, lastTstamp):
        #bOfirstTime           = datetime.strptime(firstTstamp, self.expLogTstampFormat)
        #bOlastTime            = datetime.strptime(lastTstamp, self.expLogTstampFormat)
        bOfirstTime = firstTstamp
        bOlastTime = lastTstamp
        self.beamTimeNearFirstTime = min([i for i in self.beamlogTimes if i <= bOfirstTime], key=lambda x: abs(x - bOfirstTime))
        self.beamTimeNearLastTime  = min([i for i in self.beamlogTimes if i >= bOlastTime], key=lambda x: abs(bOlastTime - x))
        self.firstRow              = self.beamlog.loc[(self.beamlog['Date'] == datetime.strftime(self.beamTimeNearFirstTime, '%d/%m/%Y')) & (self.beamlog['HMS_time'] == datetime.strftime(self.beamTimeNearFirstTime, '%H:%M:%S'))]
        self.lastRow               = self.beamlog.loc[(self.beamlog['Date'] == datetime.strftime(self.beamTimeNearLastTime, '%d/%m/%Y')) & (self.beamlog['HMS_time'] == datetime.strftime(self.beamTimeNearLastTime, '%H:%M:%S'))]
        count4Dif             = self.lastRow.iloc[0]['Count4'] - self.firstRow.iloc[0]['Count4']
        numRows               = self.lastRow.index.astype(int)[0] - self.firstRow.index.astype(int)[0]
        cps                   = count4Dif / numRows
        if cps > 1:
            return 1
        return 0
 
    def createRuns(self):
        runNames  = []
        self.runs = {}
        for i in range(1, self.data[self.data['Run_ID'].last_valid_index():self.data['Run_ID'].last_valid_index()+1]['Run_ID'][self.data['Run_ID'].last_valid_index()]+1):
            runNames.append(f"run_{i}")
        for i in runNames:
            self.runs[i] = Run(i, self.data)

    def processRuns(self): 
        for value in self.runs.values():
            try:
                before_delay_i = value.df.loc[(value.df['Mikroe_socket'] == 'D') & (value.df['Status'] == 'STORE_OK')].index[0]
                before_delay_t = value.df.loc[before_delay_i]['Timestamp']
                after_delay_i  = value.df.loc[(value.df['Mikroe_socket'] == 'A') & (value.df['Status'] == 'VERIF')].index[0] + 1
                after_delay_t  = value.df.loc[after_delay_i]['Timestamp']
                before_delay_dt = datetime.strptime(before_delay_t, self.expLogTstampFormat)
                after_delay_dt  = datetime.strptime(after_delay_t, self.expLogTstampFormat)   
                dif = after_delay_dt - before_delay_dt
                if timedelta(seconds = 0.1) < dif < timedelta(seconds = 0.5):
                    value.delay = 0.1
                if timedelta(seconds = 0.5) < dif < timedelta(seconds = 5):
                    value.delay = 1
                if timedelta(seconds = 5) < dif < timedelta(seconds = 50):
                    value.delay = 10
                if timedelta(seconds = 50) < dif < timedelta(seconds = 500):
                    value.delay = 100
                if timedelta(seconds = 500) < dif < timedelta(seconds = 5000):
                    value.delay = 1000
                if self.beamOn(before_delay_dt, after_delay_dt):
                    value.hot = 1
                else:
                    value.hot = 0
                try:
                    value.errors = value.df['Status'].value_counts()['SDC']
                except KeyError:
                    value.errors = 0
            except IndexError:
                value.valid = 0
            else:
                value.valid = 1

In [224]:
Analyser = Analyser("C:/Users/Sujit/Documents/STFC/Code/Mikroe/nvRAM_experiment/results/live/SRAM3/1/test/")

In [225]:
Analyser.setup()

2021-05-21
2021-05-22


In [226]:
Analyser.createRuns()

In [227]:
Analyser.processRuns()

In [228]:
test1 = Analyser.runs["run_8"]
bef = datetime.strptime(test1.df.loc[test1.df.loc[(test1.df['Mikroe_socket'] == 'D') & (test1.df['Status'] == 'STORE_OK')].index[0]]['Timestamp'], "%Y-%m-%d_%H-%M-%S-%f")
aft = datetime.strptime(test1.df.loc[test1.df.loc[(test1.df['Mikroe_socket'] == 'A') & (test1.df['Status'] == 'VERIF')].index[0] + 1]['Timestamp'], "%Y-%m-%d_%H-%M-%S-%f")
Analyser.beamOn(bef, aft)


0

In [254]:
Analyser.runs["run_100"].hot

1