 ## <b>Connecting vxi11 compatible devices via Ethernet </b>

In [None]:
import vxi11
import time

In [None]:
keysight_ip = '192.168.0.8'

In [None]:
Keysight = vxi11.Instrument(keysight_ip)
print(Keysight.ask("*IDN?"))

## <b> Connecting via USB </b>

In [None]:
!pip install pyvisa

In [None]:
!lsusb

In [None]:
import pyvisa
rm = pyvisa.ResourceManager()

print(rm.list_resources())

Keysight = rm.open_resource(rm.list_resources()[0])
print(Keysight.query("*IDN?"))

## <b> Python Wrapper for Keysight 33500B - Modified from Carlin's code</b>

In [None]:
class keysight():
    
# Uncomment the __init__ function below when using ethernet, comment when using USB    
    
#     def __init__(self,instrument_ID=keysight_ip):
#         self.Keysight = vxi11.Instrument(instrument_ID)
#         print(self.Keysight.ask("*IDN?"))
        
# Uncomment the __init__function below when using USB, comment when using ethernet
    def __init__(self): #
        self.Keysight = Keysight
        print(self.Keysight.query("*IDN?"))
        
    def enable(self):
        self.Keysight.write("OUTPut ON")
        
    def disable(self):
        self.Keysight.write("OUTPut OFF") 
        
    def enable_high_impedance(self):
        self.Keysight.write("OUTPut:LOAD INF")  
        
    def sine(self,frequency=500,amplitude=0.01,offset=0):
        self.Keysight.write(f"VOLTage:OFFSet {offset}")
        self.Keysight.write("VOLTage:UNIT VRMS")
        self.Keysight.write(f"VOLTage {amplitude}")
        self.Keysight.write("FUNCtion SINusoid")
        self.Keysight.write(f"FREQuency {frequency}")
        
    def enable_sweep(self, mode="linear", runTime=10):
        self.Keysight.write("FUNCtion SINusoid")
        self.Keysight.write("SWEep:STATe ON")
        self.Keysight.write(f"SWEep:SPAcing {mode}")
        self.Keysight.write(f"SWEep:TIME {runTime}")
        self.Keysight.write(f"FREQuency:STARt {500}")
        self.Keysight.write(f"FREQuency:STOP {3000}")
        self.Keysight.write("OUTPut ON")
        
    def disable_sweep(self):
        self.Keysight.write("SWEep:STATe OFF")
        self.Keysight.write("OUTPut OFF")

## <b> Connecting to RPI </b>

In [None]:
#Connecting to the RPi over Wi-Fi
import datetime
import paramiko
from paramiko import SSHClient
from scp import SCPClient
#RPi = '192.168.0.7' # This is the IP address when RPi is connected to SHL network, note that this changes depending on different networks unless static IP is set
RPi = '172.20.10.10' # This is the IP address when RPi is connected to RickyiPhone hotspot
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh.load_system_host_keys()
ssh.connect(RPi,username='pi',password='VM3000Pi4')

In [None]:
scp = SCPClient(ssh.get_transport())

In [None]:
# File directory for all audio .wav test files
import os

# Change windows directory to Audio Files
os.chdir("C:\\Users\\ricky\\Documents\\VM3000-Microphones\\Experimental Testing")

# Checking Current File directory
print(os.getcwd())

In [None]:
# Checking RPI Zero W files using ls

stdin, stdout, stderr = ssh.exec_command('ls -l')
for x in stdout: # stdout is a file, therefore to see the ls contents of the RPI, we use a for loop and print out all lines in stdout
    print(x)

## <b> Class for Test Functions </b>

In [None]:
class signalTests():    
    def sweepTest(self, mode="logarithmic", recordTime=9, runTime=5, sleepTime=2,arrayAngle=0):
        from paramiko import SSHClient
        from scp import SCPClient
        import time
        
        now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        sample_rate = 96000
        
        startFreq = 500
        endFreq = 3000

        cmd = f'cd ~/AudioFiles; arecord -D plughw:1 -c2 -r {sample_rate} -f S16_LE -t wav -V stereo -v -d {recordTime} Pi4sweepTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{runTime}.wav'

        print(cmd)
        ssh.exec_command(cmd)
        
        time.sleep(sleepTime) # Buffer time prior to enabling signal generator
        
        testName = "Sweep Test"
        
        sg.enable_sweep(mode, runTime)
        sg.enable()
        time.sleep(runTime)
        sg.disable()
        time.sleep(sleepTime)
        sg.disable_sweep()
        
        print(f"Finished {testName} Output")
        
        while (int(ssh.exec_command('ps -C arecord | wc -l')[1].read()) > 1): # Checking for arecord to finish writing .wav file prior to pulling file using scp
            time.sleep(1)
        
        scp.get(f'~/AudioFiles/Pi4sweepTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{runTime}.wav') # Command pulls file into .../Audio Files folder
        
    def sineTest(self, frequency=2000, amplitude=0.01, offset=0, recordTime=15, runTime=10, sleepTime=5):
        from paramiko import SSHClient
        from scp import SCPClient
        import datetime
        import time

        now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        sample_rate = 96000

        cmd = f'cd ~/AudioFiles; arecord -D plughw:1 -c2 -r {sample_rate} -f S16_LE -t wav -V stereo -v -d {recordTime} Pi4sineTest_{now}.wav'

        print(cmd)
        ssh.exec_command(cmd)
        
        time.sleep(2)
        
        testName = "Sine Wave Test"
        
        sg.sine(frequency,amplitude,offset)
        sg.enable()
        time.sleep(runTime)
        sg.disable()
        time.sleep(sleepTime)

        print(f"Finished {testName} Output")
        
        while (int(ssh.exec_command('ps -C arecord | wc -l')[1].read()) > 1):
            time.sleep(1)
        
        scp.get(f'~/AudioFiles/Pi4sineTest_{now}.wav')
        
    def sineSweepBurstTest(self, frequency=500, amplitude=0.01, offset=0, step=125, recordTime=12, runTime=8, sleepTime=2, arrayAngle=0):
        from paramiko import SSHClient
        from scp import SCPClient
        import datetime
        import time

        now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        sample_rate = 96000

        cmd = f'cd ~/AudioFiles; arecord -D plughw:1 -c2 -r {sample_rate} -f S16_LE -t wav -V stereo -v -d {recordTime} Pi4sweepBurstTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{step}_{runTime}.wav'

        print(cmd)
        ssh.exec_command(cmd)
        
        time.sleep(2)
        
        testName = "Sine Sweep Burst Test"
        
        sg.sine(frequency,amplitude,offset)
        sg.enable()
        
        for i in range(1,22,1):
            if i < 2:
                frequency=frequency
                sg.sine(frequency,amplitude,offset)
                sg.enable()
                time.sleep(0.2)
                sg.disable()
                time.sleep(0.2)
            else:  
                frequency=frequency+step
                sg.sine(frequency,amplitude,offset)
                sg.enable()
                time.sleep(0.2)
                sg.disable()
                time.sleep(0.2)
            
        time.sleep(sleepTime)    
            
        print(f"Finished {testName} Output")
        
        while (int(ssh.exec_command('ps -C arecord | wc -l')[1].read()) > 1):
            time.sleep(1)
        
        scp.get(f'~/AudioFiles/Pi4sweepBurstTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{step}_{runTime}.wav')        

## <b> Data Collection </b>

In [None]:
from scipy.io import wavfile
from scipy.fftpack import fft,fftfreq

import IPython
import numpy as np
import soundfile
import wave

import IPython.display as ipd
import ffmpeg
import time
import datetime

In [None]:
st = signalTests()
sg = keysight()
sg.enable_high_impedance()

### Sine Wave Test

In [None]:
frequency = 2000
amplitude = 0.03 # Vrms
offset = 0
sleep_time = 2

sg.sine(frequency,amplitude,offset)
sg.enable()
time.sleep(sleep_time)
sg.disable()
time.sleep(sleep_time)

print(f"Finished Sine Wave Output")

### Frequency Sweep Test

In [None]:
# Running Frequency Sweep Test & recording audio from Pi4

mode="linear"
runTime = 5
sleepTime = 2
recordTime = 9 # Always specify a recordTime greater than runTime to build in a buffer at the end of audio recording
distance = 2
arrayAngle = 340
startFreq = 500
endFreq = 3000

now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

st.sweepTest(mode,recordTime,runTime,sleepTime,arrayAngle)

In [None]:
# Trimming the empty buffer time of wav file

audio_input = ffmpeg.input(f'Pi4sweepTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{runTime}.wav')
audio_cut = audio_input.audio.filter('atrim', start=2, end=7.2)
audio_output = ffmpeg.output(audio_cut, f'Pi4sweepTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_5.2_trimmed.wav')
ffmpeg.run(audio_output)

In [None]:
# Play recorded audio
data, samplerate = soundfile.read(f"Pi4sweepTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{runTime}_trimmed.wav")

ipd.Audio(f"Pi4sweepTest_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{runTime}.wav") # load recent local sweep test file

### Burst Frequency Sweep Test

In [None]:
frequency=500
amplitude=0.03 # Vrms
offset=0
step=125
runTime=9
sleepTime=2
recordTime=13
arrayAngle=340
distance=2
startFreq=500
endFreq=3000

now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

st.sineSweepBurstTest(frequency,amplitude,offset,step,recordTime,runTime,sleepTime,arrayAngle)

In [None]:
# Trimming the empty buffer time of wav file

audio_input = ffmpeg.input(f'Pi4sweepBurstTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{step}_{runTime}.wav')
audio_cut = audio_input.audio.filter('atrim', start=2, end=12.2)
audio_output = ffmpeg.output(audio_cut, f'Pi4sweepBurstTest_{now}_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_10.2_trimmed.wav')
ffmpeg.run(audio_output) 

In [None]:
# Play recorded audio
ipd.Audio(f"Pi4sweepTest_{distance}_0{arrayAngle}_{startFreq}_{endFreq}_{runTime}.wav") # load recent local sine test file