This code establishes a connection to a compressor and returns data outside of OCS for easy testing

In [9]:
#Input the correct IP address. Running this cell establishes connection to the compressor

import sys, os
import binascii
import time
import struct
import socket
import signal
import errno
from contextlib import contextmanager
from ocs import site_config, ocs_agent
from ocs.ocs_twisted import TimeoutLock

#Input information
ip_address = "10.10.10.197"
port = 502
timeout=10

#Establish connection
comm = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
comm.connect((ip_address, port)) #connects to the PTC
comm.settimeout(timeout)

#Function definitions
def buildRegistersQuery():
    query = bytes([0x09, 0x99,  # Message ID
                   0x00, 0x00,  # Unused
                   0x00, 0x06,  # Message size in bytes
                   0x01,        # Slave Address
                   0x04,        # Function Code  3= Read HOLDING registers, 4 read INPUT registers
                   0x00,0x01,   # The starting Register Number
                   0x00,0x35])  # How many to read
    return query
    
def breakdownReplyData(rawdata):
    """
    Take in raw ptc data, and return a dictionary. The dictionary keys are the data labels, 
    the dictionary values are the data in floats or ints. 
    """

    #Associations between keys and their location in rawData
    keyloc = {"Operating State": [9,10], "Pump State": [11, 12], "Warnings": [15, 16, 13, 14], "Alarms":[19, 20, 17, 18],
              "Coolant In": [22, 21, 24, 23], "Coolant Out": [26, 25, 28, 27], "Oil": [30, 29, 32, 31], 
              "Helium": [34, 33, 36, 35], "Low Pressure": [38, 37, 40, 39], "Low Pressure Average":[42, 41, 44, 43],
              "High Pressure": [46,45,48,47], "High Pressure Average": [50, 49, 52, 51], "Delta Pressure": [54, 53, 56, 55],
              "Motor Current": [58, 57, 60, 59]}

    #Iterate through all keys and return the data in a usable format. If there is an error in the string format, print the  
    # error to logs, return an empty dictionary, and flag the data as bad
    data = {}

    try:
        for key in keyloc.keys():
            locs = keyloc[key] 
            wkrBytes = bytes([rawdata[loc] for loc in locs])

            #three different data formats to unpack
            if key in ["Operating State", "Pump State"]:
                state = int.from_bytes(wkrBytes, byteorder='big')
                data[key] = state

            if key in ["Warnings", "Alarms"]:
                data[key] = int(''.join('{:02x}'.format(x) for x in wkrBytes), 16)

            if key in ["Coolant In", "Coolant Out", "Oil", "Helium", "Low Pressure", "Low Pressure Average","High Pressure", "High Pressure Average", "Delta Pressure","Motor Current"]:
                data[key] = struct.unpack('f', wkrBytes)[0]

        data_flag = False

    except:
        data_flag = True
        print(f"Compressor output could not be converted to numbers. Skipping this data block. Bad output string is {rawdata}")

    return data_flag, data

In [26]:
#Get one data block

comm.sendall(buildRegistersQuery()) 
data = comm.recv(1024)
data_flag, brd = breakdownReplyData(data)
print("Valid output string {}".format(not data_flag))
print(brd)

Valid output string True
