In [1]:
import numpy as np
import serial
import datetime as dt
import os
from os.path import join, exists
import time
import matplotlib.pyplot as plt
import matplotlib
from multiprocessing import Process, Queue, Event
import glob
import pandas as pd
import warnings
import sys

import subprocess
from tqdm.notebook import tqdm
import datetime

#if you want to display images as you record
import cv2
import matplotlib.pyplot as plt

from pyk4a import *

# imports from this module
from top_bottom_triggered.fast_animate import *
from top_bottom_triggered.commutator_utils import *
from top_bottom_triggered.video_io import *
from top_bottom_triggered.multicam_utils import *


In [2]:
def interrupt_sync_device(sync_device_port=None, sync_device=None):
    if sync_device_port is not None and sync_device is not None:
        raise ValueError('pass either port or device object')
    elif sync_device_port is not None:
        with serial.Serial(sync_device_port, baudrate=9600, timeout=0.1) as sync_device:
            sync_device.write(b"i")
            response = sync_device.readline().decode("utf-8")
    elif sync_device is not None:
        sync_device.write(b"i")
        response = sync_device.readline().decode("utf-8")
        
    return response


def unfreeze_azures(sync_device_port=None, sync_device=None):
    """Send a short burst of triggers to the azures
    """
    num = b"".join([packIntAsLong(int(5))])
    if sync_device_port is not None and sync_device is not None:
        raise ValueError('pass either port or device object')
    elif sync_device_port is not None:
        with serial.Serial(sync_device_port, baudrate=9600, timeout=0.1) as sync_device:
            sync_device.write(num)
            response = sync_device.readline().decode("utf-8")
    elif sync_device is not None:
        sync_device.write(num)
        response = sync_device.readline().decode("utf-8")
        
    return response


class ThermistorAnimator():
    def __init__(
        self, 
        n_samples, 
        fs, 
        show_opto=False,
        q_downsample=15,
    ):
        self.nsamp = n_samples
        self.show_opto = show_opto
        
        self.current_val = 0
        self.data = np.zeros((n_samples,), dtype='float')
        self.data_head_idx = 0  # to trace out data like an o-scope
        
        self.queue = Queue()
        self.animator_exit_event = Event()
        self.animate_process = main_from_ipynb(self.queue, n_samples, int(fs/q_downsample), self.animator_exit_event)
        self.q_downsample_counter = 0
        self.q_downsample = 15
        
    def extract_indices_from_header(self, header):
        self.therm_idx = [i for i,val in enumerate(header.split(',')) if (val=='therm' or val=='thermistor')][0]
        if self.show_opto:
            self.opto_idx = [i for i,val in enumerate(header.split(',')) if val==opto_header_name][0]
        self.led_idxs = [i for i,val in enumerate(header.split(',')) if 'led' in val]
                 
    def start(self):
        if not self.animate_process.is_alive():
            self.animate_process.start()
            
    def update(self, line):
        
        # Extract data from correct index in line
        self.current_val = np.array(line.split(',')[self.therm_idx], dtype='float')
        
        # Update vector
        self.data[self.data_head_idx] = self.current_val

        # Extract opto val if using
        if show_opto:
            opto_val = np.array(line.split(',')[self.opto_idx], dtype='float')

            # TODO
            # Show inhales and exhales in super janky way
#                 if np.array(line.split(',')[8], dtype='int') == 1:
#                     therm_data[data_head_idx] = 1000
#                 elif np.array(line.split(',')[9], dtype='int') == 1:
#                     therm_data[data_head_idx] = 0
        
    
        # Delete the oldest data to make it o-scope-like
        self.data[(self.data_head_idx+100) % self.nsamp] = np.nan
        self.data_head_idx += 1
        self.data_head_idx = self.data_head_idx % self.nsamp

        # Increment downsample counter
        self.q_downsample_counter += 1
        self.q_downsample_counter = self.q_downsample_counter % self.q_downsample
    
        # Decide if sending to animator
        if self.q_downsample_counter == 0:
            sync_tup = tuple([line.split(',')[idx] for idx in self.led_idxs])
            if show_opto:
                self.queue.put((self.data, sync_tup, opto_val))
            else:
                self.queue.put((self.data, sync_tup, 0))
    
    def close(self):
        self.animator_exit_event.set()
        self.queue.close()
        self.queue.cancel_join_thread()


# Experiment setup

## Get mouse order

In [3]:
# generate a random order for mice to run, based on today's date 
# (will be the same even, eg, 1 hour later, as long as date is the same)

mice_to_run = ['gmou77', 'gmou78', 'gmou81', 'gmou83']

today = dt.datetime.now().date()
date_hash = int(dt.datetime(today.year, today.month, today.day).timestamp())
np.random.seed(date_hash)
np.random.shuffle(mice_to_run)
print(mice_to_run)

['gmou78', 'gmou83', 'gmou81', 'gmou77']


## File name + dir, expt length

In [4]:
subject = 'test'
time_in_minutes = 10 # go slightly longer than mkv to ensure complete overlap
base_path = R'D:\Jonah\trigger_testing'
# base_path = R'E:\Jonah\CeAMouse'
file_suffix = ''  # disambiguate between two sessions on the same day
date = dt.datetime.now().strftime('%Y%m%d')

overwrite = True

In [5]:
path = os.path.join(base_path, f'{subject}\\{date}_{subject}')
# path = path.format(subject=subject, date=date)

if not os.path.exists(path):
    os.makedirs(path)
    print(f'Created {path}')
else:
    print(f'Path {path} exists!')

Path D:\Jonah\trigger_testing\test\20230428_test exists!


# Commutator Setup

In [6]:
commutator_port = 'COM4'
sync_device_port = 'COM7'

with serial.Serial(commutator_port, baudrate=115200, timeout=0.1) as ino:
    ino.write('r'.encode('utf-8')) ## reset trigger counter

In [7]:
show_opto = False  # only set to true if there is a "stim" col in ino data
debug = False

In [8]:
# test serial port and check dac value
with serial.Serial(commutator_port, baudrate=115200, timeout=0.1) as ino:
    line = ino.readline().decode('utf-8').strip('\r\n')
    print(line)
    print(f'Data has {len(line.split(","))} elements')

23926,0,1,0,1,328.4375,-36.8750,46.5000,0.07,-0.03,-0.08,0,3.30,0
Data has 14 elements


In [9]:
# hard-coded params -- don't chage
n_samples = 4000  # how many thermistor samples to show
q_downsample = 15  # leave at 15; how much to downsample rt output (doesnt affect saved data) (eg if 20, and ino at 1 khz, will be 50 hz)
fs = 500  # fs of the commutator teensy
commutator_fname = f'{date}_{subject}{file_suffix}.txt'
commutator_fullfile = os.path.join(path, commutator_fname)
if exists(commutator_fullfile) and not overwrite:
    raise ValueError(f'File {commutator_fullfile} exists! Add a suffix or change subject name.')
elif exists(commutator_fullfile):
    os.remove(commutator_fullfile)
else:
    pass

# Azure setup
Shouldn't really need to change this stuff

In [29]:
# 'bottom': '000343492012',  # old bottom
# 'bottom': '000693321712',  # new bottom

# 000364192012  # old top
# # new top
serial_numbers = {
    'bottom': '000693321712',
    'top': '000500221712'
}
master = 'top'  # don't change (should be top)
sync_delay,sync_delay_step = 0,500
record_processes = {}

file_prefix = os.path.join(path, f'{date}_{subject}' + file_suffix)
print(f'File will be saved to: {file_prefix}.XYZ')

File will be saved to: D:\Jonah\trigger_testing\test\20230428_test\20230428_test.XYZ


In [30]:
# If you get an error here, try unfreezing the azures; or just unplug + re-plug them. 
camera_indexes = get_camera_indexes(serial_numbers)
print(camera_indexes)

Index:0	Serial:000693321712	Color:1.6.110	Depth:1.6.80
Index:1	Serial:000500221712	Color:1.6.110	Depth:1.6.80
{'bottom': 0, 'top': 1}


In [31]:
interrupt_queues = {camera: Queue() for camera,ix in camera_indexes.items()}
trigger_started_event = Event()
for camera,ix in camera_indexes.items():
    if camera==master:
        k4a = PyK4A(Config(color_resolution=ColorResolution.OFF,  # RES_720P
                           depth_mode=DepthMode.NFOV_UNBINNED,
                           synchronized_images_only=False,
                           wired_sync_mode=WiredSyncMode.SUBORDINATE), device_id=ix)
        
        p = Process(target=capture_from_azure, 
                    args=(k4a, file_prefix+'.'+camera, int(time_in_minutes*60)),
                    kwargs={
                        'display_time': True,
                        'display_frames':True,
                        'externally_triggered': True,
                        'trigger_started_event': trigger_started_event,
                        'interrupt_queue': interrupt_queues[camera]
                    })
        
    else:
        sync_delay += sync_delay_step
        k4a = PyK4A(Config(color_resolution=ColorResolution.OFF,
                           depth_mode=DepthMode.NFOV_UNBINNED,
                           synchronized_images_only=False,
                           wired_sync_mode=WiredSyncMode.SUBORDINATE,
                           subordinate_delay_off_master_usec=sync_delay), device_id=ix)

        p = Process(target=capture_from_azure, 
                    args=(k4a, file_prefix+'.'+camera, int(time_in_minutes*60)+5),
                    kwargs={
                        'display_time': camera==False,
                        'externally_triggered': True,
                        'interrupt_queue': interrupt_queues[camera]})

    record_processes[camera] = p
    
record_processes

{'bottom': <Process name='Process-8' parent=11596 initial>,
 'top': <Process name='Process-9' parent=11596 initial>}

## Run the experiment!

In [13]:
# Get the header from the arduino, and save it to the file
first_line = 1  # don't change
second_line = 0  # don't change
sync_sent = 0
header_max_attempts = 10
second_line_max_attempts = 10

# Open queue to animator
thermistor_animator = ThermistorAnimator(n_samples, 500, show_opto=show_opto)

with open(commutator_fullfile, 'x') as file:
    with serial.Serial(commutator_port, baudrate=115200, timeout=0.1) as ino:
        ino.write('r'.encode('utf-8')) ## reset trigger counter
        reader = ReadLine(ino)
        
        # These checks get header and process it
        if first_line:
            # Ask the arduino to print the header
            ino.write('h'.encode('utf-8'))

            # Verify first line. First_line becomes false when good (ie, we're no longer on the first line)
            status, first_line, second_line, header, n_attempts, read_lines = first_line_check(header_max_attempts, reader, file=file)
            if not status:
                raise RuntimeError('Didnt receive header!')

            # Extract indices of values we're intersted in
            header_len = len(header.split(','))
            print(header)
            thermistor_animator.extract_indices_from_header(header)
            trigger_idx = [i for i,val in enumerate(header.split(',')) if val=='trigger'][0]
        
        if second_line:
            status, second_line = second_line_check(second_line_max_attempts, reader, header, n_good_thresh=10)
        if not status:
                raise RuntimeError('Number of csv''d datapoints doesnt match number of csv''d elements in header!') 

time,led1,led2,led3,led4,yaw,roll,pitch,acc_x,acc_y,acc_z,therm,dac,trigger


In [24]:
# Start Azures   
for camera in camera_indexes:
    if camera != master:
        record_processes[camera].start()
time.sleep(5)
record_processes[master].start()
print('Azures primed...')

Azures primed...


In [15]:
# timing vars
start_time = dt.datetime.now()
one_mindelta = dt.timedelta(minutes=1)
exp_timedelta = time_in_minutes*one_mindelta # key var to be compared against (now - start_time)

# Main DAQ loop
try:
    with open(commutator_fullfile, 'a') as file:
        with serial.Serial(commutator_port, baudrate=115200, timeout=0.1) as ino, serial.Serial(sync_device_port, baudrate=9600, timeout=0.1) as sync_device:
            
            # Create more efficient serial reader
            reader = ReadLine(ino)

            # Start thermistor animator
            thermistor_animator.start()
            
            while (dt.datetime.now() - start_time) < exp_timedelta:  
                
                # Read the line
                line = reader.readline().decode('utf-8').strip('\r\n')
                
                if not (sync_sent):
                    assert int(line.split(',')[trigger_idx]) == 0
                
                # Remove the DEBUG output if present and debugging
                if debug:
                    line = line[:(line.find(',DEBUG:'))]    
                    
                # Assuming data looks good, start the sync device
                if not(sync_sent) and not(first_line or second_line):
                    print('sending start msg to sync device')
                    num = b"".join([packIntAsLong(int(time_in_minutes*60*30 + 300))])
                    sync_device.write(num)
                    sync_sent = 1
                    print(f'Sync device said: {sync_device.readline().decode("utf-8")}')
                    
                # Check for the typical (but rare) serial read issues
                if len(line) == 0:
                    print('Got empty line, continuing...')
                    continue
                elif len(line.split(',')) != header_len:
                    print('Got line with unexpected length (skipping):')
                    print(line)
                    continue
                else:  
                    # typical case -- write line directly to file
                    file.write(line)
                    file.write('\n')
                    
                thermistor_animator.update(line)
                
            # After data collection finishes, close animator queue
            print('closing animator queue')
            thermistor_animator.close()
            
            # Join the Azure processes (ie block until they finish)
            if trigger_started_event.is_set():
                print('joining az processes')
                exit_codes = [p.join() for p in record_processes.values() if p.is_alive()]
            
            # Sync device will finish on its own, but can just stop it here for convenience
#             print('Stopping sync device')
#             response = interrupt_sync_device(sync_device=sync_device)
        
            print('Done with main loop')
            
# Catch unexpected errors            
except:
    
    print('Exception')
    # Stop the Azures in the event of an interrupt
    if trigger_started_event.is_set():
        print('halting azures')
        for q in interrupt_queues.values(): q.put(tuple())
        
        # Stop the syncing device in the event of an interrupt
        print('Stopping sync device')
        response = interrupt_sync_device(sync_device_port)
    else:
        print('unfreezing azures')
        response = unfreeze_azures(sync_device_port=sync_device_port)
        print(f"Sync device said: {response}")
        for q in interrupt_queues.values(): q.put(tuple())
    
    # Stop the animator process
    print('Stopping animator')
    thermistor_animator.close()
    raise
    
finally:
    print('Done.')

sending start msg to sync device
Sync device said: 18300 sync pulses started!

closing animator queue
joining az processes
Done with main loop
Done.


In [28]:
# Run this to send a short pulse of triggers to unfreeze the Azures, if necessary
unfreeze_azures(sync_device_port)
for q in interrupt_queues.values(): q.put(tuple())

## Post-experiment summaries

In [65]:
data = pd.read_csv(glob.glob(os.path.join(path, '*.txt'))[0])

In [66]:
therm_over_thresh_count = ((data.therm > 900) & (data.dac<=0.1)).sum()
therm_under_thresh_count = ((data.therm < 200) & (data.dac >= 3.25)).sum()
print(f'Therm over: {therm_over_thresh_count}')
print(f'Therm under: {therm_under_thresh_count}')
print(f'Time elapsed since start: {(dt.datetime.now() - start_time).seconds/60:0.1f} minutes')

Therm over: 4608
Therm under: 0
Time elapsed since start: 66.8 minutes
