In [11]:
import json
from pathlib import Path
import pandas as pd
import os
import pprint
from Error import Error

class Detectors:
    '''
    - For loading in and running tests for assets in the assets directory.
    '''
    
    # I am not sure that all of this stuff is necessary lol
    def __init__(self):
        self.missions = []
        self.sites = []
        self.assets = []
        self.load_config()
        self.load_assets()
        self.connect_to_carbon()
        
    def load_config(self):
        '''
        # DESC: Loads in the config.json file from the current directory. 
                This file contains missions and sites for associating with id's from failed tests.
                Loads this information into the class' "missions" and "sites" objects.
        # PARAMS: None
        # RETURNS: None
        '''

        data = None
        
        try:
            with open('./config.json') as f:
                data = json.load(f)
                self.missions = data["missions"]
                self.sites = data["sites"]
                
        except OSError:
            raise Exception("Could not read config.json file. Please make sure that it exists in the current directory.")
            
        
    def load_assets(self): 
        '''
        # DESC: Loads in all json asset files from the ./assets directory.
        # PARAMS: NONE
        # RETURNS: NONE
        '''
        
        p = Path('./assets')
        for child in p.iterdir():
            filename, ext = os.path.splitext(str(child))
            if(ext == ".json"):
                try:
                    with open(str(child)) as f:
                        self.assets.append(json.load(f))
                except:
                    print("There was a problem processing one of the asset files. Please check that it is formatted correctlty. File: " + str(child))
                    
    def connect_to_carbon(self):
        '''
        # DESC: For connecting to the remote carbon API. Will raise an exception if it fails to connect.
        # PARAMS: None
        # RETURNS: None
        '''
        self.carbon = {}
    
    def within(self, val, v_range):
        '''
        # DESC: Helper function to see if value is in a given range.
        # PARAMS: val:INT, v_range:INT[2]
        # RETURNS: is_within_range:BOOLEAN
        '''
        
        return val >= v_range[0] and val <= v_range[1]
                    
    def getLatestData(self, id):
        '''
        # DESC: Gets the last row of the data set of the given machine id.
        # PARAMS: id:INT
        # RETURNS: data_row:PANDAS_DATAFRAME_ROW
        '''
        try:
            df = pd.read_csv("../mocker/"+str(id)+".csv")
            return df.iloc[-1]
        except:
            raise Exception("Error: Could not find data file for machine id " + str(id))
    
    def runTests(self):
        '''
        # DESC: Runs all the loaded tests agains the last row of data in each asset file.
        # PARAMS: None
        # RETURNS: errors:Error[]
        '''
        
        errors = []
        
        for asset in self.assets:
            data = self.getLatestData(asset["id"])
            
            # TODO: Implement test presets.
#             tests = []
#             if(hasattr(asset,"tests_preset")):
#                 with open('./assets/presets/'+asset["tests_preset"]+'.json') as f:
#                     tests = tests + json.load(f)["tests"]
                
#                 tests = tests + asset["tests"]

            for test in asset["tests"]:
                # IF preconditions must be met for a test to be accurate
                if len(test["if"]) > 0:
                    conditions_met = True
                    
                    for condition in test["if"]:
                        # If a precondition for the most recent data field is not within the specified range.
                        if not self.within(data[condition["field"]],condition["value_range"]):
                            conditions_met = False
                    
                    if conditions_met:
                        for condition in test["then_conditions"]:
                            if not self.within(data[condition["field"]], condition["value_range"]):
                                # Test failed.
                                errors.append(Error(asset["id"], condition["field"], condition["value_range"], data[condition["field"]], test["test_classification"], data["datetime"]))
                
                # If there are no prerequisits to running the test
                else:
                    for condition in test["then_conditions"]:
                        if not self.within(data[condition["field"]], condition["value_range"]):
                            errors.append(Error(asset["id"], condition["field"], condition["value_range"], data[condition["field"]], test["test_classification"], data["datetime"]))
        return errors
    
    def print(self):
        pp = pprint.PrettyPrinter(indent=4)
        print("\n--- Missions ---")
        pp.pprint(self.missions)
        print("\n--- Sites ---")
        pp.pprint(self.sites)
        print("\n--- Assets ---")
        pp.pprint(self.assets)
        

a = Detectors()
errors = a.runTests()
if (len(errors) == 0):
    print("All tests passed")
for e in errors:
    print(e)

Test failled for asset id 0. Expected value for field data1 was in range [200, 240], received value 191.873171142148 at 2016-01-01 06:00:00


In [14]:
class CriticalityTool:
    '''
    - For checking or changing the criticality of an asset.
    '''
    def __init__(self):
        '''
        - No init conditions or actions.
        '''
        0
        
    def getCriticality(self, id):
        '''
        # DESC: Get's the criticality and other basic information of an asset.
        # PARAMS: id:INT
        # RETURNS: asset:OBJECT{name:STRING, site:STRING, missions:INT[], criticalities:INT[]}
        '''
        p = Path('./assets')
        for child in p.iterdir():
            filename, ext = os.path.splitext(str(child))
            if(ext == ".json"):
                try:
                    with open(str(child)) as f:
                        asset = json.load(f)
                        # Could possibly optimize this if we just name the files [id].json 
                        if(asset["id"] == id):
                            return {"name": asset["name"], "site": asset["site_id"], "missions": asset["mission_ids"], "criticalities":asset["mission_criticalities"]}
                except:
                    raise Exception("Problem loading in one of the asset json files.")
                        
    def changeCriticality(self, id, mission_id, newCrit):
        '''
        # DESC: Changes the criticality for a specific mission for an asset file.
        # PARAMS: id:INT, mission_id:INT, newCrit:INT
        # RETURNS: None
        '''
        p = Path('./assets')
        for child in p.iterdir():
            filename, ext = os.path.splitext(str(child))
            if(ext == ".json"):
                try:
                    with open(str(child)) as f:
                        asset = json.load(f)
                        if(asset["id"] == id):
                            for i in range(len(asset["mission_ids"])):
                                if asset["mission_ids"][i] == mission_id:
                                    asset["mission_criticalities"][i] = newCrit
                                    try:
                                        with open(child, 'w') as outfile:
                                            json.dump(asset, outfile)
                                    except:
                                        raise Exception("Problem writing new json asset file.")
                except:
                    raise Exception("Problem loading in one of the asset json files.")

dc = CriticalityTool()
dc.changeCriticality(0, 0, 4)
dc.getCriticality(0)

{'name': 'Raytheon XM100 Air Threat Radar',
 'site': 0,
 'missions': [0],
 'criticalities': [4]}