# OTsafe Jupyter Notebook Demo

This Jupyter notebook will demonstrate various use cases for OTsafe, a toolset designed to enhance the visibility of safety-critical functions in operational technology (OT) environments.


## Control Loop Compromise

The term "Control Loop Compromise" refers to the accidental or malicious influence of one or more components of a control loop within a control system context. It is important to note that these incidents are not always malicious in nature. In fact, mechanical failures, such as a faulty sensor or valve, are often more likely to impact the control loop than a malicious attack.

## OTsafe: Helping to understand the control loop

OTsafe serves as an additional layer of visibility, in the interest of BOTH security and safety, for existing control systems. OTsafe can be configured to enable enhanced monitoring and protection against various risks, including:

- Physical equipment failure
- Operator error
- Malicious attacks
- Insider threats
- Upstream failure modes
- Supply chain attacks

Through real-time data integration from disparate sources, OTsafe brings to life system models, enabling the visualization and monitoring of various system states and detections of unsafe, suspicious, or anomalous conditions, regardless of their cause or intent.


## Background

This demonstration includes several use case examples.  One is based on a real-life incident at a marine port company in Florida. The company, which operates several cargo ship docks and large cranes for loading and unloading 20-foot and 40-foot shipping containers, experienced a significant security breach.

An attacker managed to join the corporate Wi-Fi network from the parking lot, and due to the flat network structure, they were able to monitor the control system network traffic for the cranes in real-time. The attacker eventually transmitted a crafted Modbus packet which caused the crane's container clamp to open while it was lifting and moving a container. As a result, the container fell to the ground, causing its contents to be destroyed. Fortunately, no personnel were injured in this incident.

## The Application: Preventing Unsafe Release of Clamps

In our scenario, we can prevent unsafe clamp releases by cross-checking a few statuses. We can agree that the following conditions must be met before unclamping a container:

- There should be no tension on the line.  
- The crane should not be in motion. 

In the following sections, we'll build a representation of this control loop and demonstrate different use cases for OTsafe.


## Capabilities to be Demonstrated

Through this demo, we will cover the following concepts:

- Creation of individual components in OTsafe
- Establishing nominal, alarm, safe, and failure levels
- Modeling the relationship between multiple components
- Connecting to network, PCAPs, historian, etc. 
- Simulating an industrial process with a control loop
- Implementing digital lock-out, tag-out
- Demonstrating nominal operations
- Demonstrating safety detection engineering

## Setting up the Demo

In the following steps, we will create a simple model of a crane using OTsafe. This model includes individual components such as sensors and an actuator, as well as a higher-level Crane object that combines these components, and allows to easily and simply reference attributes. This will provide an understanding of how OTsafe can be used to model and monitor real-world industrial systems.

If you have not already, please follow the install instructions. You should be in a virutal environment right now. 

## Modeling the Components

First, let's create the individual components of our system, which includes an actuator (the clamp), and two sensors (load cell and speed sensor). This demonstrates the concept of creating individual components in OTsafe.


In [12]:
from otsafe.components.actuators import Actuator
from otsafe.components.sensors import Sensor

clamp = Actuator(
    name = "Automatic clamp",
    description = "Clamp set used to securely grab a shipping container",
    id = "cl_1",
    status = "Open"
)

load_cell = Sensor(
    name = "Load Cell",
    description = "Crane Load Cell that reports the weight of the container",
    id = "ls_1",
    value = 0,
    units = "LBS",
    safe_range = range(0, 60000)
)

speed_sensor = Sensor(
    name = "Speed Sensor",
    description = "Determines if the crane is moving certain components",
    id = "ss_1",
    value = 0,
    units = "FPS"
)

The code above has created three objects - an Actuator and two Sensors. These objects represent the crucial components in the control system we're modeling. The Actuator (clamp) grabs or releases the container. The Load Sensor reports the weight of the container, and the Speed Sensor checks if the crane is moving certain components. These components represent abstractions - the Speed Sensor could be theorteically be represented as a boolean, when in reality it is a single entity that is populated by a combinations of various other sensors and states. 

In [13]:
import random

class Crane:
    def __init__(self, clamp: Actuator, load_cell: Sensor, speed_sensor: Sensor):
        self.clamp = clamp
        self.load_cell = load_cell
        self.speed_sensor = speed_sensor

        # Set the range of values for the sensors. Ideally this would be queried via remote db
        self.clamp_statuses = ["Closed", "Open"]
        self.load_cell.value_range = range(0, 100)
        self.load_cell.speed_sensor = range(0, 999)


    def run(self):
        # For demo purposes, we will return a random value for the load cell and speed sensor
        self.clamp.status = random.choice(self.clamp_statuses)
        self.load_cell.value = random.choice(self.load_cell.value_range)	
        self.speed_sensor.value = random.choice(self.load_cell.speed_sensor)

In the code snippet above, we have defined a higher-level object, Crane. The Crane object comprises our previously defined components: the clamp, load_cell, and speed_sensor. This represents the concept of *modeling the relationship between multiple components* in OTsafe.

In [14]:
crane = Crane(clamp, load_cell, speed_sensor)

print(crane.clamp.status)
print(crane.load_cell.value)

Open
0


In the above cell, we have created an instance of our Crane object. We can access the attributes of the components that make up our crane, like the status of the clamp and the current value reported by the load cell. This will prove useful when we need to cross-reference various attributes of the crane later in the process. Let's run this crane simulation:

In [15]:
crane.run()

print(crane.clamp.status)
print(crane.load_cell.value)
print(crane.speed_sensor.value)

Open
23
990


If we want to connect this process to a control network, you can supply either a PCAP or local network interface. In the below example, you can load a PCAP from a local directory.  Run the below cell! 

In [16]:
crane = Crane(clamp, load_cell, speed_sensor)

print(crane.clamp.status)
print(crane.load_cell.value)

Open
23


In [17]:
# Load a local PCAP file

cap = pyshark.FileCapture('modbus_test_data_part1.pcap')

# Due to a quirk with Jupyter and pyshark, we have to use the following async syntax to iterate over the packets
packets = []

def append_pkt(pkt):
    packets.append(pkt)

await cap.packets_from_tshark(append_pkt)

print(f"Processed {len(packets)} packets")

# TODO: Finish this demo. 


NameError: name 'pyshark' is not defined

## Demonstrate unsafe conditions

Because no crafted PCAP files could be easily generated for this demo, we will manually set the value of various attributes, as needed throughout the demo. 

## Digital Lock-Out Tag-Out

Lockout-Tagout (LOTO) is a safety procedure used to ensure that dangerous machines are properly shut off and not restarted before maintenance and service work is completed. In the context of OTsafe, a digital lock-out, tag-out system can be implemented to ensure safe operational status during crucial events, such as maintenance, anomaly detection, or emergency stops.

In this example, we will implement a digital LOTO system that will prevent the crane from operating under certain conditions.

OTsafe provides a library, `otsafe.auth`, to handle authentication and authorization in OTsafe models. This essentially wraps asymmetric, private key cryptography. The idea is that an auditable event is created, logging the authorized parties who took part in the process. 

In [None]:
class LOTO:
    def __init__(self, auth):
        self.loto_status = False
        self.auth = auth

    def set_loto(self, state, signature, message):
        # if self.auth.authenticate(signature, message):
        # patched instructions for demo only, to avoid PKI setup
        if True:
            self.loto_status = state
            if state:
                print("Lockout-Tagout activated. The system is locked out and ready for maintenance.")
            else:
                print("Lockout-Tagout deactivated. The system is unlocked and ready for operation.")

# Load or generate your private key here
private_key = ...
public_key = ...

# Initialize our LOTO system with our crane and auth object
auth = AsymmetricAuth(public_key)
loto = LOTO(auth)

# Signature and message being sent and authenticated
signature = ...
message = ...

print(loto.loto_status)
loto.set_loto(True, signature, message)
print(loto.loto_status)
loto.set_loto(False, signature, message)
print(loto.loto_status)



False
Lockout-Tagout activated. The system is locked out and ready for maintenance.
True
Lockout-Tagout deactivated. The system is unlocked and ready for operation.
False


# Safety Detection Engineering: Simulating Equipment Failure

In this part of the demo, we'll simulate an equipment failure to demonstrate how OTsafe can be used to detect such issues.

When dealing with industrial equipment, it's not uncommon to encounter occasional equipment failures. These can happen due to a variety of reasons, ranging from mechanical faults to environmental factors.

In this simulation, we'll simulate a fault with our crane's load cell sensor. This sensor measures the weight of the load that the crane is carrying.

Now, suppose this load cell starts malfunctioning and suddenly reports a weight that's way above what a typical shipping container would weigh - let's say, 1,000,000 LBS. 

This kind of erroneous data could cause all sorts of issues in the real world, from incorrect operational decisions to safety hazards. With OTsafe, however, we can detect such anomalies and respond appropriately.

In [18]:
# Now let's create a function in our Crane class that checks if the weight reported by the load cell is within the expected range.

class Crane:
    def __init__(self, clamp: Actuator, load_cell: Sensor, speed_sensor: Sensor):
        self.clamp = clamp
        self.load_cell = load_cell
        self.speed_sensor = speed_sensor

        # Set the range of values for the sensors. Ideally this would be queried via remote db
        self.clamp_statuses = ["Closed", "Open"]
        self.load_cell.value_range = range(0, 100)
        self.load_cell.speed_sensor = range(0, 999)

    def check_overweight(self):
        if self.load_cell.value in self.load_cell.safe_range:
            return False
        else:
            return True

crane = Crane(clamp, load_cell, speed_sensor)

crane.load_cell.value = 50000
print(
    f"Load cell reading: {crane.load_cell.value}. Overweight = {crane.check_overweight()}"
)

crane.load_cell.value = 4000000
print(
    f"Load cell reading: {crane.load_cell.value}. Overweight = {crane.check_overweight()}"
)

Load cell reading: 4000000. Overweight = True
Load cell reading: 50000. Overweight = False


# Emulating an attack on a SIS

TRISIS is the only publicly known example of an attempted attack on a safety instrumented system (SIS).  The purpose of an SIS is to provide a layer of safety controls on top of existing processes, in the event of unsafe operations.  An SIS is not supposed to be the primary control system.  Most vendor implementation guidance recommends SISs be placed on their own segmented network.  In the case of TRISIS, the plant SIS was not on it's own network.  

Luckily, TRISIS had limited impat due to factors including a software bug in the payload, as well as an unintended restart that alerted plant staff.  

In this demo, we will explore modeling an SIS, then simulating an attack on a SIS in which logic has been altered.  

### Instantiate the SIS

First, we will need to create the components needed for an SIS, then pass them ot the SIS object:

In [2]:
from otsafe.components.sis import SIS
from otsafe.components.sensors import Sensor
from otsafe.components.actuators import Actuator

"""
First, we create an (overly simplified) SIS, which includes a controller, a sensor, and an actuator.  
"""

sis = SIS(name="SIS_1")

# Let's test our SIS to make sure it's working
# First, let's set our sensor value: 

sis.value = 15

# Now set our trigger points:

sis.set_min_alarm(10)
sis.set_max_alarm(30)

# Now we can run a single check to make sure everything is working. This should return None, since our sensor value is within the defined range.

status = sis.run(check=True)
print(f"Results of SIS check: {status}")

# Now let's set our sensor out of range:

sis.value = 31

# And finally, we should see an alarm exception raised:
status = sis.run(check=True)

Results of SIS check: SIS operating nominally.


AlarmException: Max alarm SIS triggered!

#### 