# Simple animal position tracking

To run this notebook:

- open Anaconda prompt via Windows start button -> type "Anac.." -> launch
- in Anaconda prompt type "jupyter notebook"
- in the Jupyter browser go to Desktop/postrack and click on "experiment"

## Import modules

Make sure that Python 3 and the following modules (recommended version ID) are installed on your computer:

In [1]:
# to install with pip run:
# pip install -r requirements.txt

with open('requirements.txt', 'r') as f:
    print(''.join(f.readlines()))

jupyter==1.0.0
pyFirmata==1.1.0
numpy==1.18.4
opencv-python==4.2.0.34
sounddevice


In [2]:
import numpy as np                        # Import numpy module
import sounddevice as sd                  # Import sounddevice module for "real-time" sound playback

from pyfirmata import Arduino             # Arduino support
from collections import OrderedDict       # keep order of session settings
from scipy.io  import wavfile             # WAV-file import filter

import cv2                                # Import opencv module for image processing
import threading                          # use a separate thread for TTL pulses
import math                               # Import math module
import time                               # Import time module for time measurements and pausing
import random                             # Import random module for random number generation
import json                               # JSON to read / write session settings
import datetime                           # session date/time management
import os, shutil                         # file/folder path handling

## Load experiment settings

For every experimental cofiguration you can copy the original 'settings.json' file, build your own specific experimental preset, save it in this folder as e.g. 'settings_elena.json' and load it here instead of 'settings.json'.

In [3]:
settings_filename = 'settings_test.json'

In [17]:
with open(settings_filename) as json_file:
    cfg = OrderedDict(json.load(json_file))
cfg['experiment_date'] = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

print(json.dumps(cfg, indent=4))

{
    "trial_number": 50,
    "session_duration": 3600,
    "trial_duration": 60,
    "start_radius": 67,
    "start_x": 195,
    "start_y": 195,
    "target_radius": 80,
    "target_duration": 5,
    "subject": "003901",
    "experiment_type": "aSIT",
    "background_color": "T",
    "init_duration": 0.2,
    "resolution_x": 1024,
    "resolution_y": 768,
    "arena_x": 512,
    "arena_y": 384,
    "arena_radius": 430,
    "distractor_island": 0,
    "capture_background": true,
    "MCSArduinoPort": "fake",
    "experiment_date": "2021-07-15_18-03-41"
}


## Initialize session folder

Run the upcoming cell, to create a session folder and to save the chosen experimetal parameters to a JSON-file ("experiment_id_parameters.json"). The session folder will be created here where this notebook is located.

In [18]:
# This session's protocols will be saved to this folder
experiment_id = "%s_%s_%s" % (cfg['subject'], cfg['experiment_type'], cfg['experiment_date'])
save_to = os.path.join('sessions', experiment_id)
             
if not os.path.exists(save_to):
    os.makedirs(save_to)

# Saves all parameters to a JSON file with the user-defined "Experiment ID" as filename
with open(os.path.join(save_to, experiment_id + '_parameters.json'), 'w') as f:
    json.dump(cfg, f, indent=4)

## Prepare the audio stream

The following cell initiates the audio stream, to which we will later feed our stimuli. The default sample rate is set to 44.1 kHz. The cell also loads sound files with the stimuli. Here, we use short pure tones as stimuli and a silent sound object, which is fed to the audiostream between stimuli. In our setup, we found this to be necessarry to reduce undesired clicking sounds at stimulus on- and offset, even though the sounds are ramped. Whether this will be necessary for you, will strongly depend on your audio hardware. 

The audio stimulation provided by this notebook differs from the MATLAB version in two important aspects: Firstly, the MATLAB version generates the stimuli on the fly, while this notebook uses sound files as input. Feel free to change the code if you prefer the other solution. Secondly, the MATLAB version stimulates at fixed time intervals and the sample rate of the video tracking is locked to the stimulation interval, i.e. high temporal precision in the sound stimulation comes with the cost of lower temporal resolution of the animal tracking. Here, we chose the opposite approach, with the video feed defining the cycle frequency (approx. 14 Hz with the given Camera and a resolution of 800x600 px) and the audio stimulation being locked to the framerate of the camera. Thus, higher temporal resolution of the animal tracking comes with the cost that inter-stimulus intervals cannot freely be chosen, but only be multiple integers (3 or higher) of the mean video frame duration. In the example setup and the code below, we decided for the stimulus to be played every three cycles (approx. every 215 ms). 

The duration of the audio files should not exceed the cycle length.

In [19]:
# Set sample rate for audio output
sd.default.samplerate = 44100
fs = 44100          

# Audio stream
stream = sd.OutputStream(samplerate=fs, channels=1, dtype='float32')

# Cycle counter: sound is played every "delay_cycles" cycles (video frames)
delay_cycles = 3  

# Open sound files
sound_foraging = wavfile.read(os.path.join('assets', '6000Hz-short-68.wav'))[1]
sound_distractor = wavfile.read(os.path.join('assets', '10kHz-short-68.wav'))[1]
sound_target = wavfile.read(os.path.join('assets', '4000Hz-short-68.wav'))[1]
sound_silence = wavfile.read(os.path.join('assets', 'silence-short-68.wav'))[1]

  sound_foraging = wavfile.read(os.path.join('assets', '6000Hz-short-68.wav'))[1]
  sound_distractor = wavfile.read(os.path.join('assets', '10kHz-short-68.wav'))[1]
  sound_target = wavfile.read(os.path.join('assets', '4000Hz-short-68.wav'))[1]
  sound_silence = wavfile.read(os.path.join('assets', 'silence-short-68.wav'))[1]


## Initialize the microcontroller to sync with the Acquisition system

The next cell initializes a microcontroller to send a TTL pulse at the time the first detected animal position is written. If you DON'T have a real arduino connected, you can just still run this experiment with the Fake Arduino. The Fake Arduino will just print the text message here in this notebook when sending a pulse.

In [20]:
pin_diode = 13
pin_TTL = 6

class MCSArduino(Arduino):
    def __init__(self, *args, **kwargs):
        self.last_cmd = False  # False - Arduino LOW, True - Arduino HIGH
        super(MCSArduino, self).__init__(*args, **kwargs)
        
    def start_or_stop(self):
        self.last_cmd = not self.last_cmd
        self.digital[pin_diode].write(self.last_cmd)
        self.digital[pin_TTL].write(self.last_cmd)

class FakeArduino():
    def start_or_stop(self):
        print("Fake Arduino - sending a TTL pulse")
        
    def exit(self):
        print("Fake Arduino - exiting...")
        
# Initialize REAL Arduino if connected
if cfg['MCSArduinoPort'] == 'fake':
    board = FakeArduino()
else:
    board = MCSArduino(cfg['MCSArduinoPort'])  # Windows - 'COM10', Linux - '/dev/ttyACM0', check /dev/tty*

## Initialize the position log file

The following cell generates a CSV-file to which the essential data (i.e. animal position, positions of the target areas, etc.) from each cycle (video frame) of the experiment will be saved. The CSV-file ("ExperimentID_protocol.csv") will be saved to the session folder inside the folder containing this notebook. 

In [21]:
def log_frame_data(args):
    with open(os.path.join(save_to, experiment_id + '_protocol.csv'), 'a') as f:
        f.write(",".join([str(x) for x in args]) + "\n")

headers = [
    'time',       # Time stamp
    'animal_x',   # X-Coordinate of the subject
    'animal_y',   # Y-Coordinate of the subject
]

log_frame_data(headers)   # saves headers to the log file

## Capture a background image

The tracking algorithm used in this notebook compares the frames of the video feed during the experiment with an image of the empty arena to later track the position of the largest object in the arena (which usually is your animal). If you are confident in the stability of your video quality, it should suffice to capture the picture once and to skip this cell in the subsequent experiments. However, since this step only takes a few seconds, we recommend to take a new picture of the arena for each new experiment. In the preview of the video feed that will pop-up if you run the next cell, the space outside the arena is masked, so that the camera preview can also be used to check if the camera/arena are still positioned correctly. 

Before taking the picture, make sure that the conditions in your lab (especially the illumination) are the exact same as they will be during the experiments. Once you are happy with the preview of your background image, press "c" to capture the image. It will be saved as "background.png" to the session folder containing this notebook.

This notebook will use the main camera of your system as an input device. If you have more than one camera installed (e.g. on a notebook with internal chat camera), make sure to deactivate all cameras other than the camera of your setup  prior to running the notebook. Also make sure that the video dimensions defined here match you arena dimensions defined above and the video dimensions of the video feeds that will be defined in the subsequent cells.

In [24]:
# Define BGR colors
BGR_COLOR = {
    'red': (0,0,255),
    'green': (127,255,0),
    'blue': (255,127,0),
    'yellow': (0,127,255),
    'black': (0,0,0),
    'white': (255,255,255)
}

cfg['arena_radius'] = 350

if cfg['capture_background']:
    # Define video capture device (0 = webcam1) to capture background frame
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)  # if slow https://github.com/opencv/opencv/issues/17687

    # Set picture dimensions
    cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, cfg['resolution_x'])
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, cfg['resolution_y'])
    cap.set(cv2.CAP_PROP_FPS, 20)
    cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))    
    
    # Capture Background frame (c = capture)
    while(True):
        ret, img = cap.read()

        # Draw the arena area
        img_arena = cv2.circle(img, (cfg['arena_x'], cfg['arena_y']), cfg['arena_radius'], BGR_COLOR['red'], 2)

        # Mask the space outside the arena
        mask = np.zeros(shape = img.shape, dtype = "uint8")
        cv2.circle(mask, (cfg['arena_x'], cfg['arena_y']), cfg['arena_radius'], BGR_COLOR['white'], -1)
        img_arena = cv2.bitwise_and(src1=img_arena, src2=mask)

        # text for buttons
        img_arena = cv2.putText(img_arena, "c-capture, q-quit", (10, 20), cv2.FONT_HERSHEY_DUPLEX, .5, BGR_COLOR['white'])

        # Display the resulting frame
        cv2.imshow('Press (c)-to capture the background image', img_arena)

        k = cv2.waitKey(33)
        if k == ord('c'):
            cv2.imwrite(os.path.join(save_to, 'background.png'), img_arena)
            break
        if k == ord('q'):
            break        

    # When the background image is captured, release the capture
    cap.release()
    cv2.destroyAllWindows()

else:
    shutil.copyfile(os.path.join('assets', 'background.png'), os.path.join(save_to, 'background.png'))

## Start the experiment

This cell contains code for animal tracking. We hope that the comments provided in the code suffice to understand the individual steps and to adjust them to your own setup and needs, if necessary.

- press 's' to start recording
- press 's' again to stop recording
- press 'q' to quit

The experiment will stop automatically if the pre-defined session duration is reached.

In [10]:
# increase FPS
# https://www.pyimagesearch.com/2017/02/06/faster-video-file-fps-with-cv2-videocapture-and-opencv/
# https://www.pyimagesearch.com/2015/12/21/increasing-webcam-fps-with-python-and-opencv/

# Define BGR colors
BGR_COLOR = {
    'red': (0,0,255),
    'green': (127,255,0),
    'blue': (255,127,0),
    'yellow': (0,127,255),
    'black': (0,0,0),
    'white': (255,255,255)
}

# Loads current background as object img
background = cv2.imread(os.path.join(save_to, 'background.png'), 1)

# Define video capture device for live-stream (0 = webcam1) and tracking
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)

# Set picture dimensions
cap.set(cv2.CAP_PROP_FRAME_WIDTH, cfg['resolution_x'])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, cfg['resolution_y'])

# Mask the space outside the arena
mask = np.zeros(shape = background.shape, dtype = "uint8")
cv2.circle(mask, (cfg['arena_x'], cfg['arena_y']), cfg['arena_radius'], BGR_COLOR['white'], -1)

# Define the codec and create VideoWriter object
videoName = os.path.join(save_to, experiment_id + '_video.avi')
fourcc = cv2.VideoWriter_fourcc(*'XVID')
#fourcc = cv2.VideoWriter_fourcc(*'DIVX')
# Make sure that the frame rate of your output appoximately matches 
# the number of cycles per second, to avoid time lapsed output videos
out = cv2.VideoWriter(videoName, fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))))

# Define and start the experiment timer
exp_time = time.time()
last_timestamp = time.time()
frame_counter = 0
exp_running = False  # indicates if recording has started

# Start the audio stream
stream.start()

# Here you can choose different modes of amplitude modulation
ampMod = (random.randrange(2396,2962,1)/100)**np.e/10000 # Unbiased Voltage Ratio -5dB
# ampMod = random.randrange(5623,10001,1)/10000 # Voltage Ratio -5dB
# ampMod = random.randrange(3162,10001,1)/10000 # Power Ratio -5dB
# ampMod = 1 # No modulation
    
while(cap.isOpened() and (time.time() - exp_time) <= cfg['session_duration']):
    
    ret, frame = cap.read()
    if not ret == True:
        break
        
    maskedFrame = cv2.bitwise_and(src1=frame, src2=mask)

    # Substracts background from current frame
    subject = cv2.subtract(maskedFrame, background)

    # Converts subject to grey scale
    subject_gray = cv2.cvtColor(subject, cv2.COLOR_BGR2GRAY)

    # Applies blur and thresholding to the subject
    kernel_size = (25,25)
    frame_blur = cv2.GaussianBlur(subject_gray, kernel_size, 0)
    _, thresh = cv2.threshold(frame_blur, 40, 255, cv2.THRESH_BINARY)

    # Finds contours and selects the contour with the largest area
    contours, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)

    if (len(contours) == 0):  # If there is no subject, the sreen is blackened
        subject_hull_centroid = np.zeros(frame.shape, np.uint8)
        #subject_hull_centroid = maskedFrame
        subject_hull_centroid = cv2.circle(subject_hull_centroid, (20, 80), 3, BGR_COLOR['yellow'], -1)
    
    else:  # If there is a subject, it is tracked
        contour = contours[np.argmax(list(map(cv2.contourArea, contours)))]
        M = cv2.moments(contour)
        if ((M['m00']) == 0):
            subject_hull_centroid = np.zeros(frame.shape,np.uint8)
            #subject_hull_centroid = maskedFrame
            subject_hull_centroid = cv2.circle(subject_hull_centroid, (20, 60), 3, BGR_COLOR['yellow'], -1)
        else:
            x = int(M['m10'] / M['m00'])
            y = int(M['m01'] / M['m00'])
            hull = cv2.convexHull(contour)
            subject_hull_centroid = maskedFrame

        # Draws contour and centroid of the subject
        cv2.drawContours(subject_hull_centroid, [contour], 0, BGR_COLOR['green'], 1, cv2.LINE_AA)
        subject_hull_centroid = cv2.circle(subject_hull_centroid, (x,y), 3, BGR_COLOR['yellow'], -1)

    # Dot signaling experiment waiting / running
    dot_color = 'red' if exp_running else 'green'
    subject_hull_centroid = cv2.circle(subject_hull_centroid, (20, 40), 10, BGR_COLOR[dot_color], -6)

    # Adds a stopwatch / FPS
    fps = 1.0 / (time.time() - last_timestamp)
    subject_hull_centroid = cv2.putText(subject_hull_centroid,
        str('Time: %.2f; FPS: %.1f' % (time.time() - exp_time, fps) ),
        (10, 20), cv2.FONT_HERSHEY_DUPLEX, .5, BGR_COLOR['white'])
    last_timestamp = time.time()

    frame_counter += 1
    k = cv2.waitKey(33)
    if k == ord('s'):
        command = "stop" if exp_running else "start"
        exp_running = not exp_running
        
        t1 = threading.Timer(0, board.start_or_stop, ())
        t1.start()
        
    if k == ord('q'):
        break

    # save the detected position
    if exp_running:
        log_frame_data([time.time(), x, y])
    
        # Writes the modified frame to the video protocol
        out.write(subject_hull_centroid)

        if frame_counter % delay_cycles == 0:
            stream.write((sound_foraging*ampMod))
            soundPlayed = 'foraging'
        else:
            stream.write(sound_silence)
            soundPlayed = 'false'  
        
    else:
        stream.write(sound_silence)
        soundPlayed = 'false'
        
    # showing a video frame in a window
    cv2.imshow('Press (q)-to end the experiment', subject_hull_centroid)
    
# release objects    
cap.release()
out.release()
cv2.destroyAllWindows()
board.exit()

Fake Arduino - sending a TTL pulse
Fake Arduino - sending a TTL pulse
Fake Arduino - exiting...
