__Dependencies__

In [1]:
import copy  # copy big/deep objects by value
import datetime  # datetime operations
import itertools  # operate with iterators
import json  # read/write from/into json format
import os  # OS operations (read/write files/folders)
import warnings  # hide warnings

# process parallelization
from multiprocessing import Lock, Manager, Pool, RawArray, cpu_count

import matplotlib.pyplot as plt  # mother of plots focr Python
import matplotlib.ticker as ticker  # matplotlib ticker utils
import numpy as np  # array/matrix operations (e.g. linear algebra)
import pandas as pd  # operate with dataframes
import seaborn as sns  # matplotlib plotting nice with shortcuts
from IPython.display import display  # print nicely
from tqdm.notebook import tqdm  # mother of progressbars for Python

# from matplotlib.ticker import FormatStrFormatter  # tick formatter

__Options and constants__

In [2]:
warnings.filterwarnings("ignore")
# set default float display format to 2 decimals
pd.options.display.float_format = "{:.3f}".format

STYLE = "darkgrid"
sns.set_style(STYLE)  # set seaborn plotting style

PATH_PROC = "./data/processed"

# progress bar customized format
B_FORMAT = """📄 {n_fmt} of {total_fmt} {desc} processed: {bar} 
            {percentage:3.0f}% ⏱️{elapsed} ⏳{remaining} ⚙️{rate_fmt}{postfix}"""

CORES = cpu_count()  # number of cpu threads for multiprocessing
print(f"Total CPU threads: {CORES}")

Total CPU threads: 16


__Helper functions__

In [3]:
def pbar_fork_hack(l=None):
    """
    Hack to enforce progress bars to be displayed by fork processes on
    IPython Apps like Jupyter Notebooks.

    Avoids [IPKernelApp] WARNING | WARNING: attempted to send message from fork

    Important: pass this function as argument for the initializer parameter
    while initializing a multiprocessing pool to make it work. E.g.:

    pool = Pool(processes=N_CORES, initializer=pbar_fork_hack)

    Source:
     - https://github.com/ipython/ipython/issues/11049#issue-306086846
     - https://github.com/tqdm/tqdm/issues/485#issuecomment-473338308
    """
    if l:
        global lock
        lock = l
    print(" ", end="", flush=True)


def angle(a, b):
    """
    Angle (in degrees) between two vectors.

    Given two 3D vectors (a and b):
    a = [xa, ya, za] , b = [xb, yb, zb]

    And the basic geometric formula for the dot product:
    a · b = |a| * |b| * cos(α) => α = arccos[(a · b) / (|a| * |b|)]

    In other words:
    angle = arccos[(xa * xb + ya * yb + za * zb) / (√(xa2 + ya2 + za2) * √(xb2 + yb2 + zb2))]

    Source: https://www.omnicalculator.com/math/angle-between-two-vectors

    Parameters:
        a (numpy.array): First vector
        b (numpy.array): Second vector
    Returns:
        float: Angle between vectors a and b in degrees.
    """
    # a · b
    dotp = np.dot(a, b)

    # magnitude vectors |a| and |b|
    m_a = np.linalg.norm(a)
    m_b = np.linalg.norm(b)

    # |a| * |b|
    m_prod = m_a * m_b

    # (a * b) / (|a| * |b|)
    res = dotp / m_prod

    # α = arccos[(a · b) / (|a| * |b|)]
    an = np.arccos(res)

    # calculated angle from radians to degrees
    return np.degrees(an)

__Read participants cleaned data__

In [4]:
parts = pd.read_csv("./participants_clean.csv")
parts = parts.set_index("id")  # set uid column as index (remove default)
parts

Unnamed: 0_level_0,date,expo,side,condition,questionnaire,nulls_%
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
4f583872ffed48249874b00f3d389cfc,2019-05-14 13:21:00,MSW,right,TaxiDriver,No,6.739
33d68ad55ef544fab9fd08108e3913ca,2019-05-14 16:44:00,MSW,right,AVAS,No,0.225
8598a83e2af441b8bcd0ae5d84beb875,2019-05-14 16:51:00,MSW,right,RadioTalk,No,46.237
37d7e67934974217830bb429bba7fd76,2019-05-14 16:54:00,MSW,left,AVAS,No,8.049
42bac596059749b5b8e8e83ae61de9b4,2019-05-14 16:56:00,MSW,right,TaxiDriver,No,26.881
...,...,...,...,...,...,...
cfe9482181f74f80b88cd4b1c048ab94,2019-12-30 15:29:00,BMBF,right,AVAS,No,34.182
611d35a7c3e940cc82495e53c2a8532d,2020-01-03 16:12:00,BMBF,right,TaxiDriver,No,7.413
3b6fda285d9e412eb081986b2f22a2e3,2020-01-03 16:13:00,BMBF,left,AVAS,No,7.151
18ffb0abdc8642098c479380bfa533d1,2020-01-03 16:15:00,BMBF,left,RadioTalk,Yes,13.890


__Collect and display all object groups across all participants__

In [5]:
def collect_groups(uid):
    """
    Collect all unique object groups given a participant UID.
    Parameters:
        uid (str): Participant UID to process.
    """
    cur = pd.read_csv(f"{PATH_PROC}/hits/{uid}.csv")

    gs = cur.group.unique()
    for g in gs:
        if not pd.isnull(g):
            lock.acquire()
            if g not in groups:
                groups.append(g)
            lock.release()


manager = Manager()  # manage shared memory types

# convert into memory-shared dict of lists for multiprocessing
groups = manager.list()

# multiprocessing lock to avoid raicing conditions
l = Lock()

# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=CORES, initializer=pbar_fork_hack, initargs=(l,))

# participants uids to process
uids = parts.index.tolist()

# participants progress bar
parts_pbar = tqdm(
    iterable=pool.imap(func=collect_groups, iterable=uids),
    total=len(uids),
    desc="📂 participants",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in parts_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()

# cast proxy object to list (otherwise they can't be accessed)
groups = list(groups)
# sort list alphabetically
groups.sort()
display(groups)

                

📄 0 of 24356 📂 participants processed:                                                                        …

['Buildings',
 'Crosswalks',
 'Dynamic Cars',
 'Dynamic Pedestrians',
 'End',
 'Events',
 'Inside Car',
 'Nature',
 'Roads',
 'Sky',
 'Start',
 'Static Cars',
 'Static Pedestrians',
 'Street Objects',
 'Traffic Lights',
 'Traffic Signs']

__Collect and store all object unique groups and names across all participants__

In [None]:
def collect_names(uid):
    """
    Collect all unique object names given a participant UID.
    Parameters:
        uid (str): Participant UID to process.
    """
    cur = pd.read_csv(f"{PATH_PROC}/hits/{uid}.csv")
    # TODO:
    # - Arrange code to collect all unique names per part
    # - Then assign a group per thread and check all unique names per group
    for grp in groups:
        c = cur[cur.group == grp]
        ns = c.name.unique()
        for n in ns:
            if n not in names[grp]:
                names[grp].append(n)


manager = Manager()  # manage shared memory types

# create memory-shared dict of lists for multiprocessing
names = manager.dict(manager.dict{uid: {grp: manager.list() for grp in groups} for uid in uids})

# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=1, initializer=pbar_fork_hack, initargs=(l,))

# participants progress bar
parts_pbar = tqdm(
    iterable=pool.imap(func=collect_names, iterable=uids[:5]),
    total=len(uids),
    total=5,
    desc="📂 participants",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in parts_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()

                

📄 0 of 24356 📂 participants processed:                                                                        …

Exception ignored in: Exception ignored in: Exception ignored in: Exception ignored in: Exception ignored in: <function _ConnectionBase.__del__ at 0x7f49292ef040><Finalize object, dead><Finalize object, dead><Finalize object, dead>
<Finalize object, dead>



Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/connection.py", line 130, in __del__
  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/util.py", line 224, in __call__
  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/util.py", line 224, in __call__
  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/util.py", line 224, in __call__
  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/pyth

 

conn = _Client(token.address, authkey=authkey)
  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/connection.py", line 508, in Client
    

 

answer_challenge(c, authkey)
  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/connection.py", line 757, in answer_challenge


 

    response = connection.recv_bytes(256)        # reject large message
  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes


 

    buf = self._recv_bytes(maxlength)


 

  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)

 


  File "/home/student/m/mvidaldepalo/.local/share/miniconda3/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    

 

chunk = read(handle, remaining)
KeyboardInterrupt: 

 




Store them

In [None]:
# cast proxy object/s (otherwise they can't be accessed)
names = dict(names)

# # cast list(set) on top of list() cast to avoid duplicates
names = {k: list(v) for k, v in names.items()}

with open(f"./groups_and_names.json", "w") as f:
    json.dump(names, f, indent=4)

Process all participants counting and groupping consecutive hitpoints

In [None]:
def nearest_hit(df):
    """
    Add description.

    Parameters:
        df (pd.DataFrame): Add desc.
    Returns:
        found (bool): Add desc.
        row (pd.Series): Add desc.
    """
    idxs = df.index.tolist()
    found = False
    for idx in idxs:
        c = df.loc[idx]
        row = c if c.group not in EXCEPT_GROUPS and c.group != "" else None
        found = not isinstance(row, type(None))
        if found:
            break
    return found, row


def collect_hit_events(uid):
    """
    Collect hit (saccade/gaze) events and related info of a participant given
    its UID.

    Parameters:
        uid (str): Participant unique identifier.
    """
    # read current participant origins and hits datasets
    orgs = pd.read_csv(f"{PATH_PROC}/origins/{uid}.csv")
    orgs = orgs.set_index("frame")  # set frame column as index
    hits = pd.read_csv(f"{PATH_PROC}/hits/{uid}.csv", keep_default_na=False)

    frames = orgs.index.tolist()  # all frames (0-2670)

    # dict to collect gaze events info
    h_events = {
        "name": [],
        "group": [],
        "start": [],
        "end": [],
        "nHits": [],
        "distance": [],
    }

    # dict to check consecutives
    check = {"p": None, "start": None, "cnt": 0}
    store = False
    dist = []
    for f in frames:  # for each frame
        cur = hits[hits.frame == f]
        # closest object hit if not in exceptions nor null, otherwise next
        found, cur = nearest_hit(cur)
        if found:
            d = float(cur.distance)
            name = cur["name"]  # .name calls .Name of the series!!
            group = cur.group
            if f < len(frames) - 1:
                fou, nex = nearest_hit(hits[hits.frame == f + 1])
                nex_name = nex["name"] if fou else None
                nex_grp = nex.group if fou else None
            else:
                pass  # skip last datapoint processing
            if not check["p"]:  # not previous yet or ATM
                dist = []
                dist.append(d)
                check["p"] = name
                check["start"] = f
                check["cnt"] += 1
            else:
                dist.append(d)
                check["p"] = name
                check["cnt"] += 1
            if name != nex_name:
                store = True
            if store:
                print("Storing data!")
                d = sum(dist) / len(dist)
                d = round(d, 3)
                h_events["name"].append(name)
                h_events["group"].append(group)
                h_events["start"].append(check["start"])
                h_events["end"].append(f)
                h_events["nHits"].append(check["cnt"])
                h_events["distance"].append(d)
                # reset
                check = {"p": None, "start": None, "cnt": 0}
                store = False

    h_events = pd.DataFrame(h_events)
    h_events.to_csv(f"{PATH_PROC}/gazes/{uid}.csv", index=False)

In [None]:
# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=CORES, initializer=pbar_fork_hack)

# participants ids
uids = parts.index.tolist()

# participants progress bar
parts_pbar = tqdm(
    iterable=pool.imap_unordered(func=collect_hit_events, iterable=uids),
    total=len(uids),
    desc="📂 participants",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in parts_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()

In [None]:
# participants ids
uids = parts.index.tolist()

cats = [
    "rest_cnt",
    "gaze_cnt",
    "rest_avg",
    "gaze_avg",
    "rest_len",
    "gaze_len",
    "rest_%",
    "gaze_%",
]

grp_cats = [
    "Buildings",
    "Crosswalks",
    "Dynamic Cars",
    "Dynamic Pedestrians",
    # "End",
    # "Events",
    "Inside Car",
    "Nature",
    "Roads",
    # "Start",
    "Static Cars",
    "Static Pedestrians",
    "Street Objects",
    "Traffic Lights",
    "Traffic Signs",
]

manager = Manager()  # manage shared memory types

# convert into memory-shared dict of dicts for multiprocessing
hev_cnts = manager.dict(
    {uid: manager.dict({cat: None for cat in cats}) for uid in uids}
)

grp_percs = manager.dict(
    {uid: manager.dict({cat: None for cat in grp_cats}) for uid in uids}
)


def gazes_vs_saccades(uid):
    """
    Collect hit (saccade/gaze) events and related info of a participant given
    its UID.

    Parameters:
        uid (str): Participant unique identifier.
    """
    h_events = pd.read_csv(f"{PATH_PROC}/gazes/{uid}.csv")

    saccs = h_events[h_events.nHits < 8]
    gazes = h_events[h_events.nHits > 7]
    saccs["length"] = saccs.nHits * 0.033
    gazes["length"] = gazes.nHits * 0.033

    hev_cnts[uid]["rest_cnt"] = saccs.index.size
    hev_cnts[uid]["gaze_cnt"] = gazes.index.size
    hev_cnts[uid]["rest_avg"] = saccs["length"].mean()
    hev_cnts[uid]["gaze_avg"] = gazes["length"].mean()

    sacc_len = saccs["length"].sum()
    gaze_len = gazes["length"].sum()
    hev_cnts[uid]["rest_len"] = sacc_len
    hev_cnts[uid]["gaze_len"] = gaze_len
    total_len = sacc_len + gaze_len

    hev_cnts[uid]["rest_%"] = sacc_len * 100 / total_len
    hev_cnts[uid]["gaze_%"] = gaze_len * 100 / total_len
    grps = gazes.group.unique()
    for grp in grps:
        sel = gazes[gazes.group == grp]
        g_len = sel["length"].sum()
        grp_percs[uid][grp] = g_len * 100 / gaze_len
    for grp in grp_cats:
        if not grp_percs[uid][grp]:
            grp_percs[uid][grp] = 0


# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=CORES, initializer=pbar_fork_hack)

# participants progress bar
parts_pbar = tqdm(
    iterable=pool.imap_unordered(func=gazes_vs_saccades, iterable=uids),
    total=len(uids),
    desc="📂 participants",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in parts_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()

hev_cnts = dict(hev_cnts)
hev_cnts = {k: dict(v) for k, v in hev_cnts.items()}
hev_cnts = pd.DataFrame(hev_cnts).transpose()
hev_cnts.to_csv("./hit_events_counters.csv")
display(hev_cnts)

grp_percs = dict(grp_percs)
grp_percs = {k: dict(v) for k, v in grp_percs.items()}
grp_percs = pd.DataFrame(grp_percs).transpose()
grp_percs.to_csv("./group_percentages.csv")
display(grp_percs)

In [None]:
hev_cnts[hev_cnts["sacc_%"] == 0]

In [None]:
uids = parts.index.tolist()
right = pd.read_csv(f"{PATH_PROC}/gazes/{uids[0]}.csv")
right

In [None]:
uid = "5b7ec90677be4c1ead048265fe46d3e0"

In [None]:
wrong = pd.read_csv(f"{PATH_PROC}/gazes/{uid}.csv")
wrong

In [None]:
wrong = pd.read_csv(f"{PATH_PROC}/origins/{uid}.csv")
wrong = wrong.set_index("frame")
wrong

In [None]:
wrong = pd.read_csv(f"{PATH_PROC}/hits/{uid}.csv")
wrong

In [None]:
collect_hit_events(uid)

In [None]:
hit_evs = pd.read_csv(f"./group_percentages.csv", index_col=0)
hit_evs

In [None]:
test = hit_evs.join(
    parts[["expo", "side", "condition", "questionnaire"]], how="outer"
)
test

In [None]:
display(hit_evs.describe())

In [None]:
sns.set(rc={"figure.figsize": (23, 7)})

ax = test.groupby(["condition"]).mean().transpose().plot.bar(rot=0)

ax.set_ylabel("%")

plt.title("Gaze events per object group")

plt.show()

In [None]:
parts = pd.read_csv("./participants_clean.csv")
parts = parts.set_index("id")

start = 65

for i in range(0, int(start), 5):
    cur = start - i if start - i >= 1 else 0

    sel = parts[parts["nulls_%"] <= cur]
    sample_size = sel.index.size

    sns.set(rc={"figure.figsize": (23, 7)})

    ax = (
        test.loc[sel.index]
        .groupby(["condition"])
        .mean()
        .transpose()
        .plot.bar(rot=0)
    )

    ax.set_ylabel("%")

    plt.title(f"Gaze events per object group (%nulls<={cur}, N={sample_size})")

    plt.show()