In [None]:
import json
import itertools
import matplotlib
import numpy as np
import pandas as pd
import seaborn as sns

import matplotlib.animation

from datetime import datetime, timedelta
from dateutil import rrule
from matplotlib import pyplot as plt
from matplotlib import dates as mdates
from matplotlib import colors as mcolors
from matplotlib import patheffects as mpeffects

import IPython
from IPython.display import HTML

In [None]:
%matplotlib inline

In [None]:
sns.set_theme(style='white')

In [None]:
FULL_SIZE_FIG = (16, 8)
SAVEFIG_OPTIONS = {}

# Relative vs. Absolute Pitch Figure

In [None]:
def make_packet(t, freq, t0, width=0.5):
    envelope = np.exp(-(t-t0)**2/(width**2))
    carrier = np.sin(2 * np.pi * freq * t)
    return carrier * envelope


This figure shows multiple waveforms at different frequencies. It's not easy to tell by eye what frequencies they are, but you can tell which ones are higher and lower frequency than one another.

In [None]:
N = 2**16

ew = 1
hz_freqs = [2.5, 7.5, 15]
window =  6 * ew * len(hz_freqs)

t = np.linspace(0, window, N)
plt.figure(figsize=FULL_SIZE_FIG)

for ii, freq in enumerate(hz_freqs):
    plt.plot(t, make_packet(t, freq, t0=(3 + 6 * ii) * ew, width=ew), lw=2.5)

plt.plot(t, [0] * len(t), 'k-', lw=2.5)

ax = plt.gca()
ax.tick_params(axis='x', labelsize=20)
ax.set_yticks([])
ax.set_xlim(0, window)
ax.set_xlabel("Time (ms)", fontsize=32);
plt.tight_layout()
plt.savefig("images/frequencies.png", **SAVEFIG_OPTIONS)

This figure represents the three waveforms in the Fourier domain (plotting frequency vs. amplitude), where it is easy to see the absolute frequency of each wave.

In [None]:
def do_fft(signal, convolution_kernel_size=0):
    output = np.real(np.sqrt(np.fft.rfft(signal)**2))
    if convolution_kernel_size > 0:
        kernel = np.ones(convolution_kernel_size) / convolution_kernel_size
        output = np.convolve(output, kernel, mode="same")
    return output

signal_width = window * 30

t2 = np.linspace(0, signal_width, N)
d = t2[1]-t2[0]
f = np.fft.rfftfreq(N, d=d)
ss = slice(0, np.where(f <= max(hz_freqs) + (0.75 * min(hz_freqs)))[0].max())

spectrum_max = 0
plt.figure(figsize=FULL_SIZE_FIG)
for freq in hz_freqs:
    spectrum = do_fft(make_packet(t2, freq, t0=signal_width/2, width=ew))
    spectrum /= spectrum.sum()
    spectrum_max = max(spectrum_max, spectrum.max())
    plt.plot(f[ss], spectrum[ss], lw=5)

plt.plot(f[ss], [0] * len(f[ss]), '-k', lw=5)

ax = plt.gca()
ax.set_yticks([])
ax.tick_params(axis='x', labelsize=20)
ax.set_xlabel("Frequency (kHz)", fontsize=32);
ax.set_xlim(f[ss].min(), f[ss].max())
ax.set_ylim(0, spectrum_max.real * 1.1)
plt.tight_layout()
plt.savefig("images/frequencies_fft.png", **SAVEFIG_OPTIONS)

# Analysis of Progression

In [None]:
color_dates = [
    ("yellow", datetime(2023, 5, 24)),
    ("blue", datetime(2023, 6, 11)),
    ("black", datetime(2023, 7, 13)),
    ("green", datetime(2023, 8, 7)),
    ("orange", datetime(2023, 8, 21)),
    ("purple", datetime(2023, 11, 11)),
    ("pink", datetime(2023, 11, 28)),
]

import itertools
a, b = itertools.tee(iter(color_dates))
next(b)
for (old_color, old_date), (new_color, new_date) in zip(a, b):
    diff = (new_date - old_date) / timedelta(days=1)
    print(f"{old_color} → {new_color}: {diff:0.1f} days")

In [None]:
color_dates = [
    ("orange", datetime(2023, 12, 10)),
    ("purple", datetime(2023, 12, 27)),
    ("pink", datetime(2024, 1, 14)),
    ("brown", datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)),
]

import itertools
a, b = itertools.tee(iter(color_dates))
next(b)
for (old_color, old_date), (new_color, new_date) in zip(a, b):
    diff = (new_date - old_date) / timedelta(days=1)
    print(f"{old_color} → {new_color}: {diff:0.1f} days")

# Analysis of responses

In [None]:
# Load the data from a history file
with open("data/cim_history.json") as f:
    json_data = json.load(f)

# Should only be one profile in here, so we'll extract that
assert len(json_data) == 1, "Need to select a profile in multi-profile histories"
json_data = next(iter(json_data.values()))

In [None]:
COLORS = [
    "red",
    "yellow",
    "blue",
    "black",
    "green",
    "orange",
    "purple",
    "pink",
    "brown"
]

def to_idx(color):
    return COLORS.index(color)

In [None]:
def set_date_formatter(ax):
    start = datetime.max
    end = datetime.min
    for line in ax.lines:
        x_data = line.get_xdata()
        x_min = min(x_data).replace(tzinfo=None)
        x_max = max(x_data).replace(tzinfo=None)
        start = min(start, x_min)
        end = max(end, x_max)
    
    if end - start < timedelta(days=120):
        rule = mdates.rrulewrapper(rrule.MONTHLY, bymonthday=range(1, 31, 3))
    elif end - start < timedelta(days=210):
        rule = mdates.rrulewrapper(rrule.MONTHLY, bymonthday=range(1,31,5))
    elif end - start < timedelta(days=365):
        rule = mdates.rrulewrapper(rrule.WEEKLY, bymonthday=1)
    else:
        rule = mdates.rrulewrapper(rrule.MONTHLY, bymonthday=1)

    xrange = (start, end)

    locator = mdates.RRuleLocator(rule)
    
    ax.set_xticks(ax.get_xticks(), labels=ax.get_xticklabels(), rotation=45)
    ax.xaxis.set_major_locator(locator)
    ax.xaxis.set_major_formatter(
        mdates.ConciseDateFormatter(
            ax.xaxis.get_major_locator(),
            show_offset=False,
            formats=[
                "%Y",
                r"$\bf{%b}$",
                "%d",
                "%H:%M",
                "%H:%M",
                "%S.%f",
            ],
        )
    )



In [None]:
def get_session_history(history):
    all_sessions = []

    def to_datetime(df, col):
        return pd.to_datetime(df[col], unit="s", utc=True).dt.tz_convert("America/New_York")
    
    for color, data in history.items():
        if not data:
            continue
        session_df = pd.DataFrame(data)
        session_df = session_df[["current_chord", "start_time", "updated_time", "identifications", "correct"]]
        all_sessions.append(session_df)

    joined_df = pd.concat(all_sessions)
    joined_df = joined_df.rename({"current_chord": "color"}, axis=1)
    joined_df["start_time"] = to_datetime(joined_df, "start_time")
    joined_df["updated_time"] = to_datetime(joined_df, "updated_time")
    joined_df = joined_df.sort_values(by="start_time")
    joined_df["percent_correct"] = (joined_df["correct"] / joined_df["identifications"]) * 100
    return joined_df

In [None]:
session_history = get_session_history(json_data)
session_history = session_history[session_history["color"] != "red"] # These are not interesting

In December 2023, we realized that we had moved on from Orange too quickly, because all through Purple and Pink, almost all the confusion was coming because he didn't have Orange down, so we jumped back to Orange. At the same time, we also switched to the "new" chords (using Tone.js), so I've divided the history into "old" and "new".

This history starts after we had already been using the program for a few months.

In [None]:
switch_point = datetime(2023, 12, 10, 10)
o_df = session_history.loc[session_history["start_time"].dt.tz_localize(None) < switch_point]
n_df = session_history.loc[session_history["start_time"].dt.tz_localize(None) >= switch_point]

o_df = o_df[o_df["color"] != "red"]

In [None]:
def plot_session_history(df):
    fig = plt.figure(figsize=(12, 5))
    plt.plot(df["start_time"], df["percent_correct"].rolling(window=12).mean(), "-", zorder=0)
    plt.scatter(df["start_time"], df["percent_correct"], color=df["color"], zorder=1)
    set_date_formatter(plt.gca())
    ax = fig.gca()
    ax.set_xlim(df["start_time"].min() - timedelta(days=2), df["start_time"].max() + timedelta(days=2))
    ax.set_ylim(50, 102)
    ax.set_ylabel("% Correct", weight="bold")
    plt.tight_layout()

with sns.axes_style("dark"):
    plot_session_history(o_df)
    plt.savefig("images/response_history.png", **SAVEFIG_OPTIONS)
    plot_session_history(n_df)
    plt.savefig("images/response_history_phase2.png", **SAVEFIG_OPTIONS)

In [None]:
def make_confusion_matrix(history, color):
    history_size = COLORS.index(color) + 1
    sessions = history[color]
    arr = np.zeros((history_size, history_size, len(sessions)))
    for ii, session in enumerate(sessions):
        for target_color in session["confusion_matrix"]:
            for act_color, num in session["confusion_matrix"][target_color].items():
                arr[to_idx(target_color), to_idx(act_color), ii] = num
    
    timestamps = np.asarray([session["start_time"] for session in sessions]).astype("datetime64[s]")
    return arr, timestamps

In [None]:
def replace_value(arr, value, new_value):
    arr_cp = arr.copy()
    arr_cp[arr_cp == value] = new_value
    return arr_cp

def normalize_cm(cm):
    off_diagonal = np.eye(cm.shape[0], dtype=bool)
    n_cm = cm.copy()
    n_cm[off_diagonal] = n_cm[off_diagonal] / (n_cm[off_diagonal].sum() or 1)
    n_cm[~off_diagonal] = n_cm[~off_diagonal] / (n_cm[~off_diagonal].sum() or 1)

    return n_cm

default_palette = sns.diverging_palette(10, 220, sep=10, n=20, as_cmap=True)

def plot_cm(cm, i, ts=None, cmap=default_palette, subsample=1, ax=None, title=None, suptitle=None, show_numbers=True):
    if ax is None:
        ax = plt.gca()

    off_diagonal_mask = ~np.eye(cm.shape[0], dtype=bool)
    all_off_diagonal_mask = np.tile(np.atleast_3d(off_diagonal_mask), (1, 1, cm.shape[2]))
    vmin = (-cm[all_off_diagonal_mask]).min()

    tick_labels = COLORS[:cm.shape[0]]
    sub_cm = cm[:, :, i*subsample:(i + 1)*subsample].sum(axis=2)
    new_cm = normalize_cm(sub_cm)
    if ts is not None:
        ts_ss = ts[i*subsample:(i + 1)*subsample]
        ts_start = ts_ss.min()
        ts_end = ts_ss.max()
    else:
        ts_start = None
        ts_end = None
    max_val = 1
    new_cm[off_diagonal_mask] = -new_cm[off_diagonal_mask]
    annot = sub_cm[::-1, :] if show_numbers else False
    hm = sns.heatmap(new_cm[::-1,:],
                cmap=cmap,
                vmax=max_val,
                vmin=vmin,
                norm=mcolors.TwoSlopeNorm(vcenter=0),
                xticklabels=tick_labels,
                yticklabels=tick_labels[::-1],
                cbar=False,
                annot=annot,
                fmt='g',
                ax=ax)

    if title is None:
        if ts_start is None:
            title = f"{i}"
        else:
            title = f"{np.datetime_as_string(ts_start, unit='D')} — {np.datetime_as_string(ts_end, unit='D')}"

    if suptitle is not None:
        if not title:
            title = suptitle
        elif title:
            title = f"{suptitle}\n{title}"
    if title:
        ax.set_title(title, weight="bold", fontsize=16 if suptitle is not None else 18)

    ax.set_xlabel("Guess", weight="bold")
    ax.set_ylabel("Correct", weight="bold")
    plt.tight_layout()

    return hm

def animated_cm(cm, ts = None, subsample = 10):
    fig = plt.figure(figsize=(10, 6))
    ax = fig.gca()
        
    def animate(i):
        ax.clear()
        plot_cm(cm, i, ts, subsample=subsample, ax=ax)
    animation = matplotlib.animation.FuncAnimation(fig, animate, frames=cm.shape[2] // subsample)
    plt.close()  # After the animation is done we don't need the plot anymore
    return animation

In [None]:
# Split between old and new
def split_cm(cm, ts):
    mask = ts < switch_point
    old_cm = cm[:, :, mask]
    old_ts = ts[mask]
    new_cm = cm[:, :, ~mask]
    new_ts = ts[~mask]

    return (old_cm, old_ts), (new_cm, new_ts)

In [None]:
# Generate confusion matrices
o_cm, o_ts = make_confusion_matrix(json_data, "orange")
(o_cm1, o_ts1), (o_cm2, o_ts2) = split_cm(o_cm, o_ts)

k_cm, k_ts = make_confusion_matrix(json_data, "black")
k_cm = k_cm[:, :, :-1]
k_ts = k_ts[:-1]

g_cm, g_ts = make_confusion_matrix(json_data, "green")
g_cm = g_cm[:, :, :-1]
g_ts = g_ts[:-1]

p_cm, p_ts = make_confusion_matrix(json_data, "pink")
(p_cm1, p_ts1), (p_cm2, p_ts2) = split_cm(p_cm, p_ts)

v_cm, v_ts = make_confusion_matrix(json_data, "purple")
(v_cm1, v_ts1), (v_cm2, v_ts2) = split_cm(v_cm, v_ts)



In [None]:
HEAT_MAP_FIGSIZE=(7, 4)

In [None]:
plt.figure(figsize=HEAT_MAP_FIGSIZE)
plot_cm(v_cm1, 0, subsample=20, ts=v_ts1, suptitle="Pink");
plt.savefig("images/purple_phase1.png", **SAVEFIG_OPTIONS)

plt.figure(figsize=HEAT_MAP_FIGSIZE)
plot_cm(v_cm2, 5, subsample=10, ts=v_ts, suptitle="Pink");
plt.savefig("images/purple_phase2.png", **SAVEFIG_OPTIONS)

In [None]:
plt.figure(figsize=HEAT_MAP_FIGSIZE)
plot_cm(p_cm1, 0, subsample=25, ts=p_ts1, suptitle="Pink");
plt.savefig("images/pink_phase1.png", **SAVEFIG_OPTIONS)

plt.figure(figsize=HEAT_MAP_FIGSIZE)
plot_cm(p_cm2, 2, subsample=50, ts=p_ts2, suptitle="Pink");
plt.savefig("images/pink_phase2.png", **SAVEFIG_OPTIONS)

In [None]:
HTML(animated_cm(o_cm1, ts=o_ts1).to_jshtml())

In [None]:
HTML(animated_cm(o_cm2, ts=o_ts2, subsample=2).to_jshtml())

In [None]:
# Purple
HTML(animated_cm(v_cm1, ts=v_ts1, subsample=2).to_jshtml())

In [None]:
# Purple
HTML(animated_cm(v_cm2, ts=v_ts2, subsample=2).to_jshtml())

In [None]:
# Black
HTML(animated_cm(k_cm, ts=k_ts, subsample=2).to_jshtml())

In [None]:
# Green
HTML(animated_cm(g_cm, ts=g_ts, subsample=2).to_jshtml())

In [None]:
# Pink
HTML(animated_cm(p_cm2, ts=p_ts2, subsample=2).to_jshtml())

In [None]:
# Pink
HTML(animated_cm(p_cm1, ts=p_ts1, subsample=2).to_jshtml())

# Adaptive Mode Figures

In [None]:
def matrix_to_json(arr):
    out = {}
    for i, ci in zip(range(arr.shape[0]), COLORS):
        out[ci] = {}
        for j, cj in zip(range(arr.shape[0]), COLORS):
            out[ci][cj] = int(arr[i, j])
            
    return json.dumps(out)
        

In [None]:
adaptive_mode_data = [
    {"confusion_matrix": np.array(
      [[ 100,   0,   0,   0,],
       [  0,  100,   0,   0,],
       [  0,   0,  100,   0,],
       [  0,   0,   0,  100,]
      ]),
     "coefficients": np.array([ 0.25, 0.25, 0.25, 0.25 ]),
    },
    {"confusion_matrix": np.array(
      [[ 100,   0,   0,   0,],
       [  0,  100,   0,   0,],
       [  1,   0,  110,   10,],
       [  0,   0,   20,  120,]
      ]),
     "coefficients": np.array([ 0.16072842438638163, 0.1583531274742676, 0.3087885985748218, 0.3721298495645289 ])},
    {"confusion_matrix": np.array(
      [[ 100,   0,   0,   0,],
       [  0,  100,   0,   0,],
       [  1,   0,  110,   0,],
       [  0,   0,   100,  120,]
      ]),
     "coefficients": np.array([ 0.09341923607915324, 0.09203865623561897, 0.24390243902439024, 0.5706396686608376 ])
    }
]

In [None]:
for i, data in enumerate(adaptive_mode_data):
    confusion_matrix = data["confusion_matrix"].astype("float64")
    coefficients = data["coefficients"]
    
    print(matrix_to_json(confusion_matrix))
    plt.figure(figsize=(8, 5))
    plot_cm(np.reshape(confusion_matrix,(*confusion_matrix.shape, 1)), 0, title="", show_numbers=False)
    plt.tight_layout()
    plt.savefig(f"images/adaptive_example_cm_{i}.png")
    plt.figure(figsize=(6, 5))
    plt.pie(coefficients, autopct="%1.1f%%",
            textprops={"color":"w", "fontsize": 14, "fontweight": "bold",
                       "path_effects": [
                           mpeffects.Stroke(linewidth=2, foreground="k"),
                           #mpeffects.SimplePatchShadow(alpha=0.5),
                           mpeffects.Normal()]},
            colors=COLORS[:len(coefficients)])
    plt.tight_layout()
    plt.savefig(f"images/adaptive_example_pie_{i}.png", **SAVEFIG_OPTIONS)