Permalink
Fetching contributors…
Cannot retrieve contributors at this time
819 lines (708 sloc) 29.2 KB
"""
(*)~---------------------------------------------------------------------------
Pupil - eye tracking platform
Copyright (C) 2012-2018 Pupil Labs
Distributed under the terms of the GNU
Lesser General Public License (LGPL v3.0).
See COPYING and COPYING.LESSER for license details.
---------------------------------------------------------------------------~(*)
"""
"""
Fixations general knowledge from literature review
+ Goldberg et al. - fixations rarely < 100ms and range between 200ms and 400ms in duration (Irwin, 1992 - fixations dependent on task between 150ms - 600ms)
+ Very short fixations are considered not meaningful for studying behavior - eye+brain require time for info to be registered (see Munn et al. APGV, 2008)
+ Fixations are rarely longer than 800ms in duration
+ Smooth Pursuit is exception and different motif
+ If we do not set a maximum duration, we will also detect smooth pursuit (which is acceptable since we compensate for VOR)
Terms
+ dispersion (spatial) = how much spatial movement is allowed within one fixation (in visual angular degrees or pixels)
+ duration (temporal) = what is the minimum time required for gaze data to be within dispersion threshold?
"""
import csv
# logging
import logging
import os
from bisect import bisect_left, bisect_right
from collections import deque
from itertools import chain
import msgpack
import numpy as np
from pyglui import ui
from pyglui.cygl.utils import RGBA, draw_circle
from pyglui.pyfontstash import fontstash
from scipy.spatial.distance import pdist
import background_helper as bh
import cv2
import file_methods as fm
import player_methods as pm
from methods import denormalize
from plugin import Analysis_Plugin_Base
logger = logging.getLogger(__name__)
class Empty(object):
pass
class Fixation_Detector_Base(Analysis_Plugin_Base):
icon_chr = chr(0xEC03)
icon_font = "pupil_icons"
def fixation_from_data(dispersion, method, base_data, timestamps=None):
norm_pos = np.mean([gp["norm_pos"] for gp in base_data], axis=0).tolist()
dispersion = np.rad2deg(dispersion) # in degrees
fix = {
"topic": "fixations",
"norm_pos": norm_pos,
"dispersion": dispersion,
"method": method,
"base_data": list(base_data),
"timestamp": base_data[0]["timestamp"],
"duration": (base_data[-1]["timestamp"] - base_data[0]["timestamp"]) * 1000,
"confidence": float(np.mean([gp["confidence"] for gp in base_data])),
}
if method == "pupil":
fix["gaze_point_3d"] = np.mean(
[gp["gaze_point_3d"] for gp in base_data if "gaze_point_3d" in gp], axis=0
).tolist()
if timestamps is not None:
start, end = base_data[0]["timestamp"], base_data[-1]["timestamp"]
start, end = np.searchsorted(timestamps, [start, end])
end = min(end, len(timestamps) - 1) # fix `list index out of range` error
fix["start_frame_index"] = int(start)
fix["end_frame_index"] = int(end)
fix["mid_frame_index"] = int((start + end) // 2)
return fix
class Fixation_Result_Factory(object):
__slots__ = ("_id_counter",)
def __init__(self):
self._id_counter = 0
def from_data(self, *args, **kwargs):
datum = fixation_from_data(*args, **kwargs)
self._set_fixation_id(datum)
fixation_start = datum["timestamp"]
fixation_stop = fixation_start + (datum["duration"] / 1000)
datum = self._serialize(datum)
return (datum, fixation_start, fixation_stop)
def _set_fixation_id(self, fixation):
fixation["id"] = self._id_counter
self._id_counter += 1
def _serialize(self, fixation):
serialization_hook = fm.Serialized_Dict.packing_hook
fixation_serialized = msgpack.packb(
fixation, use_bin_type=True, default=serialization_hook
)
return fixation_serialized
def vector_dispersion(vectors):
distances = pdist(vectors, metric="cosine")
# use 20% biggest distances, but at least 4, see reasoning at
# https://github.com/pupil-labs/pupil/issues/1133#issuecomment-382412175
distances.sort() # sort by distance
cut_off = np.max([distances.shape[0] // 5, 4])
return np.arccos(1.0 - distances[-cut_off:].mean())
def gaze_dispersion(capture, gaze_subset, use_pupil=True):
if use_pupil:
data = [[], []]
# for each eye collect gaze positions that contain pp for the given eye
data[0] = [
gp
for gp in gaze_subset
if any(("3d" in pp["method"] and pp["id"] == 0) for pp in gp["base_data"])
]
data[1] = [
gp
for gp in gaze_subset
if any(("3d" in pp["method"] and pp["id"] == 1) for pp in gp["base_data"])
]
method = "pupil"
# choose eye with more data points. alternatively data that spans longest time range
eye_id = 1 if len(data[1]) > len(data[0]) else 0
base_data = data[eye_id]
all_pp = chain.from_iterable((gp["base_data"] for gp in base_data))
pp_with_eye_id = (pp for pp in all_pp if pp["id"] == eye_id)
vectors = np.array(
[pp["circle_3d"]["normal"] for pp in pp_with_eye_id], dtype=np.float32
)
else:
method = "gaze"
base_data = gaze_subset
locations = np.array([gp["norm_pos"] for gp in gaze_subset])
# denormalize
width, height = capture.frame_size
locations[:, 0] *= width
locations[:, 1] = (1.0 - locations[:, 1]) * height
# undistort onto 3d plane
vectors = capture.intrinsics.unprojectPoints(locations)
dist = vector_dispersion(vectors)
return dist, method, base_data
def detect_fixations(
capture, gaze_data, max_dispersion, min_duration, max_duration, min_data_confidence
):
yield "Detecting fixations...", ()
gaze_data = [
fm.Serialized_Dict(msgpack_bytes=serialized) for serialized in gaze_data
]
if not gaze_data:
logger.warning("No data available to find fixations")
return "Fixation detection complete", ()
use_pupil = "gaze_normal_3d" in gaze_data[0] or "gaze_normals_3d" in gaze_data[0]
logger.info(
"Starting fixation detection using {} data...".format(
"3d" if use_pupil else "2d"
)
)
fixation_result = Fixation_Result_Factory()
Q = deque()
enum = deque(gaze_data)
while enum:
# check if Q contains enough data
if len(Q) < 2 or Q[-1]["timestamp"] - Q[0]["timestamp"] < min_duration:
datum = enum.popleft()
Q.append(datum)
continue
# min duration reached, check for fixation
dispersion, origin, base_data = gaze_dispersion(capture, Q, use_pupil=use_pupil)
if dispersion > max_dispersion:
# not a fixation, move forward
Q.popleft()
continue
left_idx = len(Q)
# minimal fixation found. collect maximal data
# to perform binary search for fixation end
while enum:
datum = enum[0]
if datum["timestamp"] > Q[0]["timestamp"] + max_duration:
break # maximum data found
Q.append(enum.popleft())
# check for fixation with maximum duration
dispersion, origin, base_data = gaze_dispersion(capture, Q, use_pupil=use_pupil)
if dispersion <= max_dispersion:
fixation = fixation_result.from_data(
dispersion, origin, base_data, capture.timestamps
)
yield "Detecting fixations...", fixation
Q.clear() # discard old Q
continue
slicable = list(Q) # deque does not support slicing
right_idx = len(Q)
# binary search
while left_idx + 1 < right_idx:
middle_idx = (left_idx + right_idx) // 2 + 1
dispersion, origin, base_data = gaze_dispersion(
capture, slicable[:middle_idx], use_pupil=use_pupil
)
if dispersion <= max_dispersion:
left_idx = middle_idx - 1
else:
right_idx = middle_idx - 1
middle_idx = (left_idx + right_idx) // 2
dispersion_result = gaze_dispersion(
capture, slicable[:middle_idx], use_pupil=use_pupil
)
fixation = fixation_result.from_data(*dispersion_result, capture.timestamps)
yield "Detecting fixations...", fixation
Q.clear() # clear queue
enum.extendleft(slicable[middle_idx:])
yield "Fixation detection complete", ()
class Offline_Fixation_Detector(Fixation_Detector_Base):
"""Dispersion-duration-based fixation detector.
This plugin detects fixations based on a dispersion threshold in terms of
degrees of visual angle within a given duration window. It tries to maximize
the length of classified fixations within the duration window, e.g. instead
of creating two consecutive fixations of length 300 ms it creates a single
fixation with length 600 ms. Fixations do not overlap. Binary search is used
to find the correct fixation length within the duration window.
If 3d pupil data is available the fixation dispersion will be calculated
based on the positional angle of the eye. These fixations have their method
field set to "pupil". If no 3d pupil data is available the plugin will
assume that the gaze data is calibrated and calculate the dispersion in
visual angle within the coordinate system of the world camera. These
fixations will have their method field set to "gaze".
"""
def __init__(
self,
g_pool,
max_dispersion=3.0,
min_duration=300,
max_duration=1000,
show_fixations=True,
):
super().__init__(g_pool)
# g_pool.min_data_confidence
self.max_dispersion = max_dispersion
self.min_duration = min_duration
self.max_duration = max_duration
self.show_fixations = show_fixations
self.current_fixation_details = None
self.fixation_data = deque()
self.prev_index = -1
self.bg_task = None
self.status = ""
self.notify_all(
{"subject": "fixation_detector.should_recalculate", "delay": 0.5}
)
def init_ui(self):
self.add_menu()
self.menu.label = "Fixation Detector"
def set_max_dispersion(new_value):
self.max_dispersion = new_value
self.notify_all(
{"subject": "fixation_detector.should_recalculate", "delay": 1.0}
)
def set_min_duration(new_value):
self.min_duration = min(new_value, self.max_duration)
self.notify_all(
{"subject": "fixation_detector.should_recalculate", "delay": 1.0}
)
def set_max_duration(new_value):
self.max_duration = max(new_value, self.min_duration)
self.notify_all(
{"subject": "fixation_detector.should_recalculate", "delay": 1.0}
)
def jump_next_fixation(_):
cur_idx = self.last_frame_idx
all_idc = [f["mid_frame_index"] for f in self.g_pool.fixations]
if not all_idc:
logger.warning("No fixations available")
return
# wrap-around index
tar_fix = bisect_right(all_idc, cur_idx) % len(all_idc)
self.notify_all(
{
"subject": "seek_control.should_seek",
"index": int(self.g_pool.fixations[tar_fix]["mid_frame_index"]),
}
)
def jump_prev_fixation(_):
cur_idx = self.last_frame_idx
all_idc = [f["mid_frame_index"] for f in self.g_pool.fixations]
if not all_idc:
logger.warning("No fixations available")
return
# wrap-around index
tar_fix = (bisect_left(all_idc, cur_idx) - 1) % len(all_idc)
self.notify_all(
{
"subject": "seek_control.should_seek",
"index": int(self.g_pool.fixations[tar_fix]["mid_frame_index"]),
}
)
for help_block in self.__doc__.split("\n\n"):
help_str = help_block.replace("\n", " ").replace(" ", "").strip()
self.menu.append(ui.Info_Text(help_str))
self.menu.append(
ui.Info_Text("Press the export button or type 'e' to start the export.")
)
self.menu.append(
ui.Slider(
"max_dispersion",
self,
min=0.01,
step=0.1,
max=5.0,
label="Maximum Dispersion [degrees]",
setter=set_max_dispersion,
)
)
self.menu.append(
ui.Slider(
"min_duration",
self,
min=10,
step=10,
max=4000,
label="Minimum Duration [milliseconds]",
setter=set_min_duration,
)
)
self.menu.append(
ui.Slider(
"max_duration",
self,
min=10,
step=10,
max=4000,
label="Maximum Duration [milliseconds]",
setter=set_max_duration,
)
)
self.menu.append(
ui.Text_Input(
"status", self, label="Detection progress:", setter=lambda x: None
)
)
self.menu.append(ui.Switch("show_fixations", self, label="Show fixations"))
self.current_fixation_details = ui.Info_Text("")
self.menu.append(self.current_fixation_details)
self.next_fix_button = ui.Thumb(
"jump_next_fixation",
setter=jump_next_fixation,
getter=lambda: False,
label=chr(0xE044),
hotkey="f",
label_font="pupil_icons",
)
self.next_fix_button.status_text = "Next Fixation"
self.g_pool.quickbar.append(self.next_fix_button)
self.prev_fix_button = ui.Thumb(
"jump_prev_fixation",
setter=jump_prev_fixation,
getter=lambda: False,
label=chr(0xE045),
hotkey="F",
label_font="pupil_icons",
)
self.prev_fix_button.status_text = "Previous Fixation"
self.g_pool.quickbar.append(self.prev_fix_button)
def deinit_ui(self):
self.remove_menu()
self.current_fixation_details = None
self.g_pool.quickbar.remove(self.next_fix_button)
self.g_pool.quickbar.remove(self.prev_fix_button)
self.next_fix_button = None
self.prev_fix_button = None
def cleanup(self):
if self.bg_task:
self.bg_task.cancel()
self.bg_task = None
def get_init_dict(self):
return {
"max_dispersion": self.max_dispersion,
"min_duration": self.min_duration,
"max_duration": self.max_duration,
"show_fixations": self.show_fixations,
}
def on_notify(self, notification):
if notification["subject"] == "gaze_positions_changed":
logger.info("Gaze postions changed. Recalculating.")
self._classify()
if notification["subject"] == "min_data_confidence_changed":
logger.info("Minimal data confidence changed. Recalculating.")
self._classify()
elif notification["subject"] == "fixation_detector.should_recalculate":
self._classify()
elif notification["subject"] == "should_export":
self.export_fixations(notification["range"], notification["export_dir"])
def _classify(self):
"""
classify fixations
"""
if self.g_pool.app == "exporter":
return
if self.bg_task:
self.bg_task.cancel()
gaze_data = [gp.serialized for gp in self.g_pool.gaze_positions]
cap = Empty()
cap.frame_size = self.g_pool.capture.frame_size
cap.intrinsics = self.g_pool.capture.intrinsics
cap.timestamps = self.g_pool.capture.timestamps
generator_args = (
cap,
gaze_data,
np.deg2rad(self.max_dispersion),
self.min_duration / 1000,
self.max_duration / 1000,
self.g_pool.min_data_confidence,
)
self.fixation_data = deque()
self.fixation_start_ts = deque()
self.fixation_stop_ts = deque()
self.bg_task = bh.IPC_Logging_Task_Proxy('Fixation detection', detect_fixations, args=generator_args)
def recent_events(self, events):
if self.bg_task:
for progress, fixation_result in self.bg_task.fetch():
self.status = progress
if fixation_result:
serialized, start_ts, stop_ts = fixation_result
self.fixation_data.append(
fm.Serialized_Dict(msgpack_bytes=serialized)
)
self.fixation_start_ts.append(start_ts)
self.fixation_stop_ts.append(stop_ts)
if self.fixation_data:
current_ts = self.fixation_stop_ts[-1]
progress = (current_ts - self.g_pool.timestamps[0]) / (
self.g_pool.timestamps[-1] - self.g_pool.timestamps[0]
)
self.menu_icon.indicator_stop = progress
if self.bg_task.completed:
self.status = "{} fixations detected".format(len(self.fixation_data))
self.correlate_and_publish()
self.bg_task = None
self.menu_icon.indicator_stop = 0.0
frame = events.get("frame")
if not frame:
return
self.last_frame_idx = frame.index
frame_window = pm.enclosing_window(self.g_pool.timestamps, frame.index)
fixations = self.g_pool.fixations.by_ts_window(frame_window)
events["fixations"] = fixations
if self.show_fixations:
for f in fixations:
x = int(f["norm_pos"][0] * frame.width)
y = int((1.0 - f["norm_pos"][1]) * frame.height)
pm.transparent_circle(
frame.img,
(x, y),
radius=25.0,
color=(0.0, 1.0, 1.0, 1.0),
thickness=3,
)
cv2.putText(
frame.img,
"{}".format(f["id"]),
(x + 30, y),
cv2.FONT_HERSHEY_DUPLEX,
0.8,
(255, 150, 100),
)
if self.current_fixation_details and self.prev_index != frame.index:
info = ""
for f in fixations:
info += "Current fixation, {} of {}\n".format(
f["id"], len(self.g_pool.fixations)
)
info += " Confidence: {:.2f}\n".format(f["confidence"])
info += " Duration: {:.2f} milliseconds\n".format(f["duration"])
info += " Dispersion: {:.3f} degrees\n".format(f["dispersion"])
info += " Frame range: {}-{}\n".format(
f["start_frame_index"] + 1, f["end_frame_index"] + 1
)
info += " 2d gaze pos: x={:.3f}, y={:.3f}\n".format(*f["norm_pos"])
if "gaze_point_3d" in f:
info += " 3d gaze pos: x={:.3f}, y={:.3f}, z={:.3f}\n".format(
*f["gaze_point_3d"]
)
else:
info += " 3d gaze pos: N/A\n"
if f["id"] > 1:
prev_f = self.g_pool.fixations[f["id"] - 2]
time_lapsed = (
f["timestamp"] - prev_f["timestamp"] + prev_f["duration"] / 1000
)
info += " Time since prev. fixation: {:.2f} seconds\n".format(
time_lapsed
)
else:
info += " Time since prev. fixation: N/A\n"
if f["id"] < len(self.g_pool.fixations):
next_f = self.g_pool.fixations[f["id"]]
time_lapsed = (
next_f["timestamp"] - f["timestamp"] + f["duration"] / 1000
)
info += " Time to next fixation: {:.2f} seconds\n".format(
time_lapsed
)
else:
info += " Time to next fixation: N/A\n"
self.current_fixation_details.text = info
self.prev_index = frame.index
def correlate_and_publish(self):
self.g_pool.fixations = pm.Affiliator(
self.fixation_data, self.fixation_start_ts, self.fixation_stop_ts
)
self.notify_all({"subject": "fixations_changed", "delay": 1})
@classmethod
def csv_representation_keys(self):
return (
"id",
"start_timestamp",
"duration",
"start_frame_index",
"end_frame_index",
"norm_pos_x",
"norm_pos_y",
"dispersion",
"confidence",
"method",
"gaze_point_3d_x",
"gaze_point_3d_y",
"gaze_point_3d_z",
"base_data",
)
@classmethod
def csv_representation_for_fixation(self, fixation):
return (
fixation["id"],
fixation["timestamp"],
fixation["duration"],
fixation["start_frame_index"],
fixation["end_frame_index"],
fixation["norm_pos"][0],
fixation["norm_pos"][1],
fixation["dispersion"],
fixation["confidence"],
fixation["method"],
*fixation.get(
"gaze_point_3d", [None] * 3
), # expanded, hence * at beginning
" ".join(["{}".format(gp["timestamp"]) for gp in fixation["base_data"]]),
)
def export_fixations(self, export_range, export_dir):
"""
between in and out mark
fixation report:
- fixation detection method and parameters
- fixation count
fixation list:
id | start_timestamp | duration | start_frame_index | end_frame_index |
norm_pos_x | norm_pos_y | dispersion | confidence | method |
gaze_point_3d_x | gaze_point_3d_y | gaze_point_3d_z | base_data
"""
if not self.fixation_data:
logger.warning("No fixations in this recording nothing to export")
return
export_window = pm.exact_window(self.g_pool.timestamps, export_range)
fixations_in_section = self.g_pool.fixations.by_ts_window(export_window)
with open(
os.path.join(export_dir, "fixations.csv"), "w", encoding="utf-8", newline=""
) as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(self.csv_representation_keys())
for f in fixations_in_section:
csv_writer.writerow(self.csv_representation_for_fixation(f))
logger.info("Created 'fixations.csv' file.")
with open(
os.path.join(export_dir, "fixation_report.csv"),
"w",
encoding="utf-8",
newline="",
) as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(("fixation classifier", "Dispersion_Duration"))
csv_writer.writerow(
("max_dispersion", "{:0.3f} deg".format(self.max_dispersion))
)
csv_writer.writerow(
("min_duration", "{:0.3f} sec".format(self.min_duration))
)
csv_writer.writerow((""))
csv_writer.writerow(("fixation_count", len(fixations_in_section)))
logger.info("Created 'fixation_report.csv' file.")
class Fixation_Detector(Fixation_Detector_Base):
"""Dispersion-duration-based fixation detector.
This plugin detects fixations based on a dispersion threshold in terms of
degrees of visual angle with a minimal duration. It publishes the fixation
as soon as it complies with the constraints (dispersion and duration). This
might result in a series of overlapping fixations. These will have their id
field set to the same value which can be used to merge overlapping fixations.
If 3d pupil data is available the fixation dispersion will be calculated
based on the positional angle of the eye. These fixations have their method
field set to "pupil". If no 3d pupil data is available the plugin will
assume that the gaze data is calibrated and calculate the dispersion in
visual angle with in the coordinate system of the world camera. These
fixations will have their method field set to "gaze".
The Offline Fixation Detector yields fixations that do not overlap.
"""
order = 0.19
def __init__(
self, g_pool, max_dispersion=3.0, min_duration=300, confidence_threshold=0.75
):
super().__init__(g_pool)
self.history = deque()
self.min_duration = min_duration
self.max_dispersion = max_dispersion
self.confidence_threshold = confidence_threshold
self.id_counter = 0
def recent_events(self, events):
events["fixations"] = []
gaze = events["gaze"]
self.history.extend(
(gp for gp in gaze if gp["confidence"] >= self.confidence_threshold)
)
try:
ts_oldest = self.history[0]["timestamp"]
ts_newest = self.history[-1]["timestamp"]
inconsistent_timestamps = ts_newest < ts_oldest
if inconsistent_timestamps:
self.reset_history()
return
age_threshold = ts_newest - self.min_duration / 1000.
# pop elements until only one element below the age threshold remains:
while self.history[1]["timestamp"] < age_threshold:
self.history.popleft() # remove outdated gaze points
except IndexError:
pass
gaze_3d = [gp for gp in self.history if "3d" in gp["base_data"][0]["method"]]
use_pupil = len(gaze_3d) > 0.8 * len(self.history)
base_data = gaze_3d if use_pupil else self.history
if (
len(base_data) <= 2
or base_data[-1]["timestamp"] - base_data[0]["timestamp"]
< self.min_duration / 1000.0
):
self.recent_fixation = None
return
dispersion, origin, base_data = gaze_dispersion(
self.g_pool.capture, base_data, use_pupil
)
if dispersion < np.deg2rad(self.max_dispersion):
new_fixation = fixation_from_data(dispersion, origin, base_data)
if self.recent_fixation:
new_fixation["id"] = self.recent_fixation["id"]
else:
new_fixation["id"] = self.id_counter
self.id_counter += 1
self.replace_basedata_with_references(new_fixation)
events["fixations"].append(new_fixation)
self.recent_fixation = new_fixation
else:
self.recent_fixation = None
def reset_history(self):
logger.debug("Resetting history")
self.history.clear()
def replace_basedata_with_references(self, fixation):
fixation["base_data"] = [
(gaze["topic"], gaze["timestamp"]) for gaze in fixation["base_data"]
]
def gl_display(self):
if self.recent_fixation:
fs = self.g_pool.capture.frame_size # frame height
pt = denormalize(self.recent_fixation["norm_pos"], fs, flip_y=True)
draw_circle(
pt, radius=48.0, stroke_width=10.0, color=RGBA(1.0, 1.0, 0.0, 1.0)
)
self.glfont.draw_text(pt[0] + 48.0, pt[1], str(self.recent_fixation["id"]))
def init_ui(self):
self.add_menu()
self.menu.label = "Fixation Detector"
for help_block in self.__doc__.split("\n\n"):
help_str = help_block.replace("\n", " ").replace(" ", "").strip()
self.menu.append(ui.Info_Text(help_str))
self.menu.append(
ui.Slider(
"max_dispersion",
self,
min=0.01,
step=0.1,
max=5.0,
label="Maximum Dispersion [degrees]",
)
)
self.menu.append(
ui.Slider(
"min_duration",
self,
min=10,
step=10,
max=4000,
label="Minimum Duration [milliseconds]",
)
)
self.menu.append(
ui.Slider(
"confidence_threshold",
self,
min=0.0,
max=1.0,
label="Confidence Threshold",
)
)
self.glfont = fontstash.Context()
self.glfont.add_font("opensans", ui.get_opensans_font_path())
self.glfont.set_size(22)
self.glfont.set_color_float((0.2, 0.5, 0.9, 1.0))
def deinit_ui(self):
self.remove_menu()
self.glfont = None
def get_init_dict(self):
return {
"max_dispersion": self.max_dispersion,
"min_duration": self.min_duration,
"confidence_threshold": self.confidence_threshold,
}