In [15]:
import socket
import time
import os
import sys
import platform
from pathlib import Path
import datetime as datetime
import subprocess
import re

# run the remote commands on a Linux machine
def run_command(command):
    if command is None:
        command = "ls" # default command 

    # Run the command and capture the output
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
    stdout, stderr = process.communicate()
    stdout_str = str(stdout)
    # Check for errors
    if process.returncode != 0:
        print(f"An error occurred while executing the command {command}:\n{stderr}")
    else:
        print(f"Command output:\n{stdout}")

    return stdout_str

In [16]:

# =======================START OF CONFIGURATION SECTION (USER MUST MODIFY TO MATCH THEIR SETTINGS)================
# See the manual Automation Server Protocol.pdf for more details
TCP_IP = '192.168.147.1'    # The IP address of the interface that the automation server is running on
TCP_PORT = 22901        # The default port to talk with the automation server

MAX_TO_READ = 1000

# This will capture Bluetooth LE
# This is described in the section "IOParameters – Sodera and X500 of the Automation Server Protocol manual
# IMPORTANT: This is ignored for the X240. For configurations that include one or more X240 devices, the capture 
# technology should be configured by using the appropriate datasource prior to starting capture via automation. 
# Configuration of capture technology and initiating firmware update are not supported via the automation interface. 
capture_technology = "capturetechnology=bredr-off|le-on|2m-on|spectrum-off|wifi-off"

# Only one personality_key should be uncommented and it should be the device that is connected and powered up before
# running the script. Additional personality keys to support other equipment and configurations 
# can be found under the section "Start FTS" in the Automation Server Protocol manual.
#
# Only uncomment one of the personality_key statements below:
personality_key = "SODERA"
# personality_key = "X240"  
# personality_key = "X500"

# Setup the path to the the location of FTS.exe. Change this to your directory
wps_path=r'C:\Program Files (x86)\Teledyne LeCroy Wireless\Wireless Protocol Suite 2.60'


# This is the directory where the capture folder will be created. Make sure this path exists 
data_path=r'C:\Users\Public\Documents\share\input'

# This will be the name of the capture folder
capture_name = 'le_capture_' + time.strftime("%Y%m%d_%H%M%S")

# ===================================END OF CONFIGURATION SECTION=====================================

In [17]:
personality_key = personality_key.strip()  # remove any spaces

capture_absolute_filename =  data_path + '\\' +  capture_name + ".cfa" 
html_absolute_filename =  data_path + '\\' +  capture_name + ".html" 

wps_server_path = wps_path + '\\' +  'Executables\\Core'
wps_server_full_filename = wps_server_path + '\\' + 'FTSAutoServer.exe'
wps_server_full_filename_str=str(wps_server_full_filename)


# Assume that the Windows platform is where the device is running
if platform.system() == 'Windows':
    # Verify all the user specified files and paths exist
    if not os.path.exists(data_path):
        print(f"data_path {data_path} was not found. You may need to create it. Exiting.")
        sys.exit(1)

    if not os.path.exists(wps_path):
        print(f"wps_path {wps_path} was not found. Is this the path where the software is installed? Exiting.")
        sys.exit(1)

    if not os.path.exists(wps_server_full_filename_str):
        print(f"wps_server_full_filename {wps_server_full_filename_str} was not found. Check to make sure it exists. Exiting.")
        sys.exit(1)
    # start the automation server
    automation_server_process=subprocess.Popen(wps_server_full_filename_str)


In [18]:
# connect to the automation server
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((TCP_IP,TCP_PORT))
data=s.recv(MAX_TO_READ)
print("Trying connection. Receiving: " + str(data))

Trying connection. Receiving: b'TCP CONNECT;SUCCEEDED;Timestamp=6/25/2023 6:12:49 PM;Reason=C:\\Program Files (x86)\\Teledyne LeCroy Wireless\\Wireless Protocol Suite 2.60\\Executables\\Core\\\r\n'


In [19]:
# Start Wireless Protocol Suite
FTE_CMD="Start FTS"+";" + str(wps_server_path) + ";" + personality_key
print(FTE_CMD)
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("s1 Sending: " + str(send_data))
s.send(send_data)

# Wait to hear the start succeeded
is_done_waiting = False
EXPECTED_COMMAND="START FTS"
EXPECTED_STATUS="SUCCEEDED"

while not is_done_waiting:
    rcv_data=s.recv(MAX_TO_READ)
    result_str=str(rcv_data.decode())
    print("s1 received:" + result_str)
    result_parse = result_str.split(";")
    if result_parse[0]==EXPECTED_COMMAND  and result_parse[1]==EXPECTED_STATUS:
        is_done_waiting=True
    else:
        print(f"Received data parsed to {result_parse}, which indicates startup is not complete. Still waiting for the command {EXPECTED_COMMAND} with a status of {EXPECTED_STATUS}.")

# Wait for FTS to be ready
is_done_waiting = False
while not is_done_waiting:
    FTE_CMD="Is Initialized"
    send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
    print("s2 Sending: " + str(send_data))
    s.send(send_data)
    rcv_data=s.recv(MAX_TO_READ)
    result_str=str(rcv_data.decode())
    print("s2 received:" + result_str)
    result_parse = result_str.split(";")
    if result_parse[0]=="IS INITIALIZED"  and result_parse[1]=="SUCCEEDED":
        is_done_waiting=True
    else:
        print(f"Parse of received: {result_parse}. Not the desired result so still waiting..")

Start FTS;C:\Program Files (x86)\Teledyne LeCroy Wireless\Wireless Protocol Suite 2.60\Executables\Core;SODERA
s1 Sending: b'Start FTS;C:\\Program Files (x86)\\Teledyne LeCroy Wireless\\Wireless Protocol Suite 2.60\\Executables\\Core;SODERA'
s1 received:IS INITIALIZED;SUCCEEDED;Timestamp=6/25/2023 6:13:09 PM;Reason=yes

Received data parsed to ['IS INITIALIZED', 'SUCCEEDED', 'Timestamp=6/25/2023 6:13:09 PM', 'Reason=yes\r\n'], which indicates startup is not complete. Still waiting for the command START FTS with a status of SUCCEEDED.
s1 received:IS INITIALIZED;SUCCEEDED;Timestamp=6/25/2023 6:13:12 PM;Reason=yes

Received data parsed to ['IS INITIALIZED', 'SUCCEEDED', 'Timestamp=6/25/2023 6:13:12 PM', 'Reason=yes\r\n'], which indicates startup is not complete. Still waiting for the command START FTS with a status of SUCCEEDED.
s1 received:START FTS;SUCCEEDED;Timestamp=6/25/2023 6:13:24 PM

s2 Sending: b'Is Initialized'
s2 received:IS INITIALIZED;SUCCEEDED;Timestamp=6/25/2023 6:13:24 PM;

In [20]:
# Start Wireless Protocol Suite
if "X240" in personality_key:
    print("The X240 requires that the capture technology is setup before running this script.")
    FTE_CMD="Config Settings;IOParameters;" + personality_key + ";analyze=inquiryprocess-off|pagingnoconn-off|nullsandpolls-off|emptyle-on|anonymousadv-on|meshadv-off|lecrcerrors=on;" 
else:
    FTE_CMD="Config Settings;IOParameters;" + personality_key + ";analyze=inquiryprocess-off|pagingnoconn-off|nullsandpolls-off|emptyle-on|anonymousadv-on|meshadv-off|lecrcerrors=on;" +  capture_technology  

print(FTE_CMD)
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)

# Wait to hear the start succeeded
is_done_waiting = False
while not is_done_waiting:
    rcv_data=s.recv(MAX_TO_READ)
    result_str=str(rcv_data.decode())
    print("received:" + result_str)
    result_parse = result_str.split(";")
    if result_parse[0]=="CONFIG SETTINGS"  and result_parse[1]=="SUCCEEDED":
        is_done_waiting=True
    else:
        print(f"Parse of received: {result_parse}. Not the desired result so still waiting.")

Config Settings;IOParameters;SODERA;analyze=inquiryprocess-off|pagingnoconn-off|nullsandpolls-off|emptyle-on|anonymousadv-on|meshadv-off|lecrcerrors=on;capturetechnology=bredr-off|le-on|2m-on|spectrum-off|wifi-off
Sending: b'Config Settings;IOParameters;SODERA;analyze=inquiryprocess-off|pagingnoconn-off|nullsandpolls-off|emptyle-on|anonymousadv-on|meshadv-off|lecrcerrors=on;capturetechnology=bredr-off|le-on|2m-on|spectrum-off|wifi-off'
received:CONFIG SETTINGS;SUCCEEDED;Timestamp=6/25/2023 6:13:24 PM



In [21]:
# Start the recording
FTE_CMD="Start Record"
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)
data=s.recv(MAX_TO_READ)
print("Receiving: " + str(data))


Sending: b'Start Record'
Receiving: b'START RECORD;SUCCEEDED;Timestamp=6/25/2023 6:13:26 PM\r\n'


In [22]:
FTE_CMD="Start Analyze"
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)
rcv_data=s.recv(MAX_TO_READ)
result_str=str(rcv_data.decode())
print("received:" + result_str)
result_parse = result_str.split(";")
if result_parse[0]=="START ANALYZE"  and result_parse[1]=="SUCCEEDED":
    pass
else:
    print(f"ERROR failed to start analysis with a parsed value of: {result_parse}")

Sending: b'Start Analyze'
received:START ANALYZE;SUCCEEDED;Timestamp=6/25/2023 6:13:46 PM



In [23]:
# Stop Record
# • Is Analyze Complete – Repeat until status indicating completion has been received (see Is Analyze Complete).
# • Stop Analyze
# • Query State – Repeat until successful status with CAPTURE ACTIVE NO DATA or CAPTURE STOPPED reason.
# • Is Processing Complete – Repeat until status indicating completion has been received (see Is Processing Complete).
# • Save Capture – Wait until status has been reported.
FTE_CMD="Stop Record"
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)
data=s.recv(MAX_TO_READ)
print(data)

# • Is Analyze Complete – Repeat until status indicating completion has been received (see Is Analyze Complete).
is_done_waiting = False
while not is_done_waiting:
    FTE_CMD="IS ANALYZE COMPLETE"
    send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
    print("Sending: " + str(send_data))
    s.send(send_data)
    rcv_data=s.recv(MAX_TO_READ)
    result_str=str(rcv_data.decode())
    print(result_str)
    result_parse = result_str.split(";")
    if result_parse[0]=="IS ANALYZE COMPLETE" and \
    result_parse[1]=="SUCCEEDED"  and \
    result_parse[3]=="Reason=analyze_complete=yes\r\n":
        is_done_waiting=True
    else:
        print(f"Parse of received: {result_parse}. Not the desired result so still waiting.")

Sending: b'Stop Record'
b'STOP RECORD;SUCCEEDED;Timestamp=6/25/2023 7:08:05 PM\r\n'
Sending: b'IS ANALYZE COMPLETE'
IS ANALYZE COMPLETE;SUCCEEDED;Timestamp=6/25/2023 7:08:05 PM;Reason=analyze_complete=no

Parse of received: ['IS ANALYZE COMPLETE', 'SUCCEEDED', 'Timestamp=6/25/2023 7:08:05 PM', 'Reason=analyze_complete=no\r\n']. Not the desired result so still waiting.
Sending: b'IS ANALYZE COMPLETE'
IS ANALYZE COMPLETE;SUCCEEDED;Timestamp=6/25/2023 7:08:05 PM;Reason=analyze_complete=no

Parse of received: ['IS ANALYZE COMPLETE', 'SUCCEEDED', 'Timestamp=6/25/2023 7:08:05 PM', 'Reason=analyze_complete=no\r\n']. Not the desired result so still waiting.
Sending: b'IS ANALYZE COMPLETE'
IS ANALYZE COMPLETE;SUCCEEDED;Timestamp=6/25/2023 7:08:05 PM;Reason=analyze_complete=no

Parse of received: ['IS ANALYZE COMPLETE', 'SUCCEEDED', 'Timestamp=6/25/2023 7:08:05 PM', 'Reason=analyze_complete=no\r\n']. Not the desired result so still waiting.
Sending: b'IS ANALYZE COMPLETE'
IS ANALYZE COMPLETE;SUC

In [24]:
# • Stop Analyze
FTE_CMD="Stop Analyze"
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)
data=s.recv(MAX_TO_READ)
print(data)

# • Query State – Repeat until successful status with CAPTURE ACTIVE NO DATA or CAPTURE STOPPED reason.
is_done_waiting = False
while not is_done_waiting:
    FTE_CMD="Query State"
    send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
    print("Sending: " + str(send_data))
    s.send(send_data)
    rcv_data=s.recv(MAX_TO_READ)
    result_str=str(rcv_data.decode())
    print(result_str)
    result_parse = result_str.split(";")
    if result_parse[0]=="QUERY STATE" and \
    result_parse[1]=="SUCCEEDED"  and \
    (result_parse[3]=="Reason=CAPTURE STOPPED|CurrentState=CAPTURE STOPPED\r\n" or result_parse[3]=="Reason=CAPTURE STOPPED|CurrentState=CAPTURE ACTIVE NO DATA\r\n"):
        is_done_waiting=True
    else:
        print(f"Parse of received: {result_parse}. Not the desired result so still waiting.")

# • Is Processing Complete – Repeat until status indicating completion has been received (see Is Processing Complete).
is_done_waiting = False
while not is_done_waiting:
    FTE_CMD="Is Processing Complete"
    send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
    print("Sending: " + str(send_data))
    s.send(send_data)
    rcv_data=s.recv(MAX_TO_READ)
    result_str=str(rcv_data.decode())
    print(result_str)
    result_parse = result_str.split(";")
    if result_parse[0]=="IS PROCESSING COMPLETE" and \
    result_parse[1]=="SUCCEEDED"  and \
    result_parse[3]=="Reason=TRUE\r\n":
        is_done_waiting=True
    else:
        print(f"Parse of received: {result_parse}. Not the desired result so still waiting.")

Sending: b'Stop Analyze'
b'STOP ANALYZE;SUCCEEDED;Timestamp=6/25/2023 7:08:30 PM\r\n'
Sending: b'Query State'
QUERY STATE;SUCCEEDED;Timestamp=6/25/2023 7:08:30 PM;Reason=CAPTURE STOPPED|CurrentState=CAPTURE ACTIVE NO DATA

Sending: b'Is Processing Complete'
IS PROCESSING COMPLETE;SUCCEEDED;Timestamp=6/25/2023 7:08:30 PM;Reason=TRUE



In [25]:
# Add a Bookmark
bookmark_text = 'le collect'
bookmark_frame = 1

FTE_CMD=f"Add Bookmark;string={bookmark_text};frame={bookmark_frame}"
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)
data=s.recv(MAX_TO_READ)
print(data)

Sending: b'Add Bookmark;string=le collect;frame=1'
b'ADD BOOKMARK;SUCCEEDED;FRAME=1;Timestamp=6/25/2023 7:08:33 PM\r\n'


In [26]:
# • Save Capture – Wait until status has been reported.
FTE_CMD="Save Capture;" + str(capture_absolute_filename) 
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)
data=s.recv(MAX_TO_READ)
print(data)

Sending: b'Save Capture;C:\\Users\\Public\\Documents\\share\\input\\le_capture_20230625_181228.cfa'
b'SAVE CAPTURE;SUCCEEDED;Timestamp=6/25/2023 7:08:38 PM\r\n'


In [27]:
# • Save Capture – Wait until status has been reported.
FTE_CMD="HTML Export;summary=0;databytes=1;decode=1;frames=all;file=" + str(html_absolute_filename) 
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)
data=s.recv(MAX_TO_READ)
print(data)

Sending: b'HTML Export;summary=0;databytes=1;decode=1;frames=all;file=C:\\Users\\Public\\Documents\\share\\input\\le_capture_20230625_181228.html'
b'HTML EXPORT;SUCCEEDED;Timestamp=6/25/2023 7:09:41 PM\r\n'


In [28]:
FTE_CMD="Stop FTS"
send_data=FTE_CMD.encode(encoding='UTF-8',errors='strict')
print("Sending: " + str(send_data))
s.send(send_data)
data=s.recv(MAX_TO_READ)
print(data)

s.close()

# Assume that the Windows platform is where the device is running
if platform.system() == 'Windows':
    if not os.path.exists(capture_absolute_filename):
        print(f"Your capture file is {capture_absolute_filename} was not found. Check to make sure there were no errors in the output during the recording. Exiting.")
        sys.exit(1)
    else:
        print(f"Your capture file is {capture_absolute_filename}")

    print(f"Shutting down the automation server")
    automation_server_process.kill()

Sending: b'Stop FTS'
b'STOP FTS;SUCCEEDED;Timestamp=6/25/2023 7:10:05 PM\r\n'
