Notes
- Do not blindly hit run all
- Restart kernel and clear outputs before re-running code
- Laser off command at the bottom of the page
- When conducting test, ensure that the correct blocks are running

In [None]:
import numpy as np
import pathlib
import pyvisa
import time
import matplotlib.pyplot as plt
import math
from datetime import datetime
import threading

TODAY_STR = datetime.now().strftime("%d.%m.%Y") # Format the date as DD.MM.YYYY
print(f"\x1b[1;3;4;96mDate of running the code:\x1b[0m \x1b[1;3;4;92m{TODAY_STR}\x1b[0m")
print(f"Time of running the code: {datetime.now().strftime("%H:%M:%S")}")
print("\x1b[1;31mWait for it...\x1b[0m")

# Device communication - must always be run
rm = pyvisa.ResourceManager()
rm.list_resources()   
print("PyVisa running nominally")

In [None]:
# # Laser connection. Laser is TSL
is_connect_success = False 
while not is_connect_success:
    try:
        TSL = rm.open_resource("TCPIP::169.254.82.30::5000::SOCKET", read_termination="\r")
        print(TSL.query("*IDN?"))
        print("\x1b[0;92mTSL Connection established\x1b[0m")
        is_connect_success = True
    except pyvisa.VisaIOError:
        print("Retrying...")
        time.sleep(0.5)

if TSL.query(":POWer:STATe?") == "+0":
    TSL.write(":POWer:STATe 1")
    print("Laser was off. Laser turning on/warming up. 5 minute sleep has begun. \n If already on, restart kernel and re-run.")
    time.sleep(300) # 5 minute laser warm up, if necessary
    print("5 minutes has passed; laser warmed up.")

else:
    print("Laser on and ready.")

# Setting the laser power in dBm
set_laser_power = 1     
TSL.write(f":POW {set_laser_power}")
power_in_dbm = TSL.query(":POW?")
print(f"Laser power set to {round(float(power_in_dbm), 2)} dBm")

In [None]:
# # oscilloscope connection
# Open the connection to the oscilloscope using its IP address
oscilloscope = rm.open_resource('TCPIP::169.254.112.67::INSTR', open_timeout=5000) # For the RIGOL DHO1204 in A*STAR

# Send a command to verify connection (e.g., identify the instrument)
# Sanity check for successful communication with oscilloscope
response = oscilloscope.query('*IDN?')
print(response, end='')
print("\x1b[0;92mOscilloscope Connection established.\x1b[0m")

# # Oscilloscope configure settings
oscilloscope.write(':TIMebase:ROLL 0')      # Turn off roll

oscilloscope.write(':ACQuire:MDEPth 1M')    # Check again if this command is working! If not manually set MemDepth

oscilloscope.write(':TIMebase:MAIN:SCALe 0.2')      # Time base is 200ms
oscilloscope.write(':TIMebase:MAIN:OFFSet 0')

oscilloscope.write(':CHANnel1:SCALe 3')         # Ensure the channel 1 measurement is 3V
oscilloscope.write(':CHANnel1:OFFSet 0')

oscilloscope.write(':CHANnel2:SCALe 0.02')      # Ensure the channel 2 measurement is 20mV
oscilloscope.write(':CHANnel2:OFFSet 0')

oscilloscope.write(':CHANnel1:DISPlay 1')       # Turn on display of channel 1
oscilloscope.write(':CHANnel2:DISPlay 1')       # Turn on display of channel 2

print("Oscilloscope settings configured")

In [None]:
# # Laser wavelength settings: Set parameters for wavelength sweep
WAV_START = 1535
WAV_END = 1540
speed = 20                    # Sweep speed (units of nm/s)
step_size = "10pm"             # step size of sweep. Read TSL manual pg 42/113 for minimum step size based on speed
delay_between_sweeps = 0.3

TSL.write(":WAVelength:SWEep 0")  # Engineering reset
time.sleep(0.1)

# Parameter writing
TSL.write(f":WAVelength:SWEep:STARt {WAV_START}E-9") # Set start wavelength
time.sleep(0.1)
TSL.write(f":WAVelength:SWEep:STOP {WAV_END}E-9")   # Set stop wavelength
time.sleep(0.1)

TSL.write(f":WAVelength:SWEep:SPEed {speed}")  
time.sleep(0.1)

TSL.write(":WAVelength:SWEep:MODe 1")   # Continuous and one-way sweep, for finest wavelength change per unit time.
time.sleep(0.1)

TSL.write("TRIG:OUTP:STEP " + step_size)
time.sleep(0.1)
# print(TSL.query("TRIG:OUTP:STEP?"))  # Reduce the delay between sweeps
# time.sleep(0.1) 

TSL.write(f"WAVelength:SWEep:DELay {delay_between_sweeps}")  # Reduce the delay between sweeps
time.sleep(0.1)
# print(TSL.query("WAVelength:SWEep:DELay?"))  # Reduce the delay between sweeps
# time.sleep(0.1) 

# Begin sweeping
TSL.write(":WAVelength:SWEep 1")        
time.sleep(0.1)
print()
print(f"""Sweeping initiated: 
      \n {WAV_START}nm to {WAV_END}nm at {round(float(power_in_dbm), 2)}dBm
      \n Step size of {step_size} and speed {speed}nm/s, 
      \n Delay of {delay_between_sweeps}s between sweeps.""")

In [None]:
# # Data collection of optical absorption

def setup_trigger(trigger_level, trigger_source='CHAN1'):
    """Function to set up and enable triggering"""
    # Set the trigger source (e.g., Channel 1)
    oscilloscope.write(f':TRIGger:EDGE:SOURce {trigger_source}')
    # Set the trigger level (e.g., 0.5V)
    oscilloscope.write(f':TRIGger:EDGE:LEVel {trigger_level}')
    # Set the trigger type (e.g., edge trigger on a rising edge)
    oscilloscope.write(':TRIGger:EDGE:SLOPe POSitive')
    # Set the trigger sweep mode
    oscilloscope.write(':TRIGger:SWEep SINGle')
    print(f"Trigger set to {trigger_level}V on {trigger_source}.")


def plot_and_save_waveform(channels: list[int]):
    """Function to save the waveform as CSV file. Plot for sanity check"""
    for ch in channels:
        # Select the channel
        oscilloscope.write(f':WAV:SOUR CHAN{ch}')

        # Set the waveform format to ASCII (can also use 'BYTE' or 'WORD' for binary)
        oscilloscope.write(':WAV:FORM ASCII')

        # Get the waveform data
        print(f"Querying waveform data for channel {ch}")
        data = oscilloscope.query(':WAV:DATA?')

        # Parse the data (it's returned as a comma-separated string in ASCII mode)
        waveform_data = np.array([float(i) for i in data.split(',')])
        waveforms[f"waveform_ch{ch}"] = waveform_data

        # Get the X-axis scale and position (Time per division, etc.)
        x_increment = float(oscilloscope.query(':WAV:XINC?'))
        x_origin = float(oscilloscope.query(':WAV:XOR?'))

        # Generate the time axis
        time_axis = np.linspace(x_origin, x_origin + x_increment * len(waveform_data), len(waveform_data))
        waveforms[f"time_axis_ch{ch}"] = time_axis

        # Generate a timestamp for the file name
        timestamp = datetime.now().strftime("%d.%m.%Y_%H.%M.%S")
        if ch == 1:
            csvfilename = f"C:/Users/groov/VSCode/CSV_Waveforms/{timestamp}_triggers_ch{ch}.csv"
        elif ch == 2:
            csvfilename = f"C:/Users/groov/VSCode/CSV_Waveforms/{timestamp}_optical_ch{ch}.csv"
        # Save the waveform data to a CSV file
        np.savetxt(csvfilename, np.column_stack((time_axis, waveform_data)), delimiter=",", header="Time, Amplitude")

    # Plot the waveform for visualization and sanity check
    plt.figure(figsize=(10, 6))
    for ch in channels:
        plt.plot(waveforms[f"time_axis_ch{ch}"],  waveforms[f"waveform_ch{ch}"], label=f"channel {ch}")
    plt.title("Waveform from Channels " + ", ".join(str(ch) for ch in channels))
    plt.xlabel("Time (s)")
    plt.ylabel("Amplitude (V)")
    plt.grid(True)
    plt.legend()
    # plt.savefig(f"waveform_channel_{channel}_{timestamp}.png")
    plt.show()


waveforms = {} # Initialise empty dictionary of waveforms

global_start_time = time.time()

# # Data acquisition pseudo-infinite loop
# # Stops based on auto timeout (24 hours)

while (time.time() - global_start_time) < 86400:

    for i in range(10):
        oscilloscope.write(':STOP') # Quickly "engineering reset" the oscilloscope
        time.sleep(0.1) # Small delay to let oscilloscope settle before running and arming trigger

        oscilloscope.write(':RUN') # Need oscilloscope to start running before it can capture anything
        # The trigger will automatically start capturing once the trigger is detected

        setup_trigger(trigger_level=0.5)  # # Arm the trigger, Unit of trigger_level is V
        time.sleep(0.1) # Small delay to confirm trigger preparation
        # print("Trigger armed, ready to fire.")

        while 1:
            if oscilloscope.query(':TRIGger:STATus?').strip() == 'TD':
                print("Triggered! Starting capture; it will stop by itself.")
                break

            elif oscilloscope.query(':TRIGger:STATus?').strip() == 'WAIT':
                # print("Waiting for trigger.") # \nIf the laser has fully swept, hit 'trigger' again.")
                # time.sleep(10)  # Check status every 10 seconds
                continue

            else:
                pass

        while 1:
            if oscilloscope.query(':TRIGger:STATus?').strip() == 'STOP':
                plot_and_save_waveform(channels=[1,2])       # TODO: Write in the list of channel numbers that we wish to plot and save
                print("Frozen waveforms captured and saved as CSV")
                break
            
            else:
                time.sleep(0.1)  # Small delay to prevent busy-waiting
                pass

    sleep_time = 30*60  # Time in seconds to wait before going to next capture
    print(f"Waiting for {sleep_time/60} minutes before next capture")
    time.sleep(sleep_time)  

In [None]:
# # Laser turn off
TSL.write(":WAVelength:SWEep 0")
time.sleep(0.1)
TSL.write(":WAVelength 1550nm")
time.sleep(0.1)
TSL.write(":POWer:STATe +0")
time.sleep(0.1)
print("Ensure the 'active' light is not green. If it is, press it")