`_230210-0310_stats-library`

In [43]:
import aqt

from pylib.anki.cards import Card
from pylib.anki.notes import Note

from matplotlib import pyplot as plt
import pandas as pd
import seaborn as sns
import scipy

from IPython.display import display, Markdown, Latex
from jupyter_dash import JupyterDash

import dash
from dash.dependencies import Input, Output
import dash_bio as dashbio
from dash import html, dcc

from waitress import create_server

import itertools
import logging
import threading

In [335]:
def get_rtag_scorecard_for_card(card, rtag_filter_fn, weight_factor = 0.10):
    """
    Computes and returns this card's decaying average review-tag scorecard.
    Higher scores indicate the tag has been seen more often or more recently.
    
    A score for a tag increases each time the tag is encountered in a review.
    The amount of score increase is weighted: "easy" reviews have lower weight,
    "hard" reviews have higher weight. A previously seen tag's score *decreases*
    whenever it *doesn't* appear in a given new review.
    
    - Setup an empty scoring dict for the card. This will keep track of this
      card's decaying average review-tag scores.
    - Get all reviews for given card.
    - Sort reviews by date (equivalently sort by review ID).
    - For each review:
        - Filter desired subset of tags.
        - For each of the review's tags, add a weighted score for the tag.
        - For all other tags, decay the existing score.
            - These are tags that were seen in earlier reviews for this card,
              but that aren't in this review.
    """
    decay_factor = 1. - weight_factor
    rtag_scores = {}
    review_ids = mw.col.review_ids_of_card(card.id)
    #review_ids = model._state.review_ids_from_card_ids([card.id,])
    review_ids.sort()
    for review_id in review_ids:
        review = mw.col.get_revlog_entry(review_id)
        rtags = list(filter(rtag_filter_fn, review.tags))
        # For every tag we've already seen,
        # we want a weighted decaying average of its score.
        # The reason is that if the user had trouble with a
        # particular tag during a particular review, but
        # not in the subsequent review, then we consider
        # that trouble point to be improved for the card,
        # and therefore we want to reduce its score.
        if review.button_chosen == 1:
          penalty_factor = 1.00
        elif review.button_chosen == 2:
          penalty_factor = 0.75
        elif review.button_chosen == 3:
          penalty_factor = 0.50
        else:
          penalty_factor = 0.25
        for rtag, score in rtag_scores.items():
            if rtag in rtags:
                score = (decay_factor*score) + penalty_factor*weight_factor
                rtags.remove(rtag)
            else:
                score = (decay_factor*score)
            rtag_scores[rtag] = score
        # For the remaining tags, this is the first time
        # we've seen them. The previous score for unseen
        # tags is zero, so we don't bother decaying the
        # previous score.
        for rtag in rtags:
            rtag_scores[rtag] = penalty_factor*weight_factor
    return rtag_scores

In [39]:
def get_rtag_scorecards_for_cards(cards, rtag_filter_fn, weight_factor = 0.10):
    """
    Builds an rtag scorecard dict for each card.
    Stores the scorecards in a dict of dicts keyed by card ID.
    """
    rtag_scores_dict = {}
    for card in cards:
        rtag_scores_dict[card.id] = get_rtag_scorecard_for_card(
            card,
            rtag_filter_fn,
            weight_factor
        )
    return rtag_scores_dict

In [40]:
def get_ntag_card_id_filter_fn(ntag):
    def ntag_card_id_filter_fn(card_id):
        card = mw.col.get_card(card_id)
        return (ntag in card.note().tags) if card else False
    return ntag_card_id_filter_fn

In [41]:
def get_avg_rtag_scores_for_ntag(ntag, rtag_scores_dict, empty_rtag_scores):
    """
    Build a dict of average rtag scores for this ntag
    (the average rtag score across all cards with this ntag).
    """
    rtag_score_sums_for_ntag = {}
    ntag_card_id_filter_fn = get_ntag_card_id_filter_fn(ntag)
    # Find all cards with rtag scores.
    card_ids = rtag_scores_dict.keys()
    # Filter to find just cards that also have this ntag.
    card_ids_for_ntag = list(filter(ntag_card_id_filter_fn, card_ids))
    # For each card in the resulting list:
    for card_id in card_ids_for_ntag:
        rtag_scores_for_card_id = rtag_scores_dict[card_id]
        # For each rtag score for the card:
        for rtag, score in rtag_scores_for_card_id.items():
            # Add to the rtag's running sum for this ntag.
            rtag_score_sum = rtag_score_sums_for_ntag.get(rtag, 0)
            rtag_score_sum += score
            rtag_score_sums_for_ntag[rtag] = rtag_score_sum
    num_cards_for_ntag = len(card_ids_for_ntag)
    # Convert rtag score sums to rtag score averages for this ntag.
    rtag_score_for_ntag = empty_rtag_scores.copy()
    for rtag, rtag_score_sum in rtag_score_sums_for_ntag.items():
        rtag_score_for_ntag[rtag] = rtag_score_sum/num_cards_for_ntag
    return rtag_score_for_ntag

In [21]:
def get_tags(items, filter_fn):
    tags = list(filter(
        filter_fn,
        set(
            itertools.chain(
                *(
                    item.tags
                    for item in items
                )
            )
        )
    ))
    return tags

In [59]:
def get_rtag_ntag_scoring_dataframe(
    weight_factor = None,
    rtag_filter_fn = None,
    ntag_filter_fn = None,
):
    if weight_factor is None:
        weight_factor = 0.50
    if rtag_filter_fn is None:
        rtag_filter_fn = lambda tag: tag.startswith('.feedback::trouble::')
    if ntag_filter_fn is None:
        ntag_filter_fn = lambda tag: tag.startswith('.Texts::CET::')
    
    browser = aqt.dialogs._dialogs['Browser'][1]
    table = browser.table
    model = table._model

    # Get cards for browser rows.
    cards = [model._state.get_card(item) for item in model._items]
    card_ids = [card.id for card in cards]
    # Get corresponding reviews and notes.
    review_ids = set(model._state.review_ids_from_card_ids(card_ids))
    reviews = [mw.col.get_revlog_entry(review_id) for review_id in review_ids]
    note_ids = set(card.nid for card in cards)
    notes = [Note(mw.col, id=note_id) for note_id in note_ids]

    # Filter tags.
    rtags = get_tags(reviews, rtag_filter_fn)
    ntags = get_tags(notes, ntag_filter_fn)
    
    # Get scorecards.
    rtag_scores_dict = get_rtag_scorecards_for_cards(cards, rtag_filter_fn, weight_factor)
    empty_rtag_scores = {rtag:0 for rtag in rtags}

    data = {
        ntag: list(
            get_avg_rtag_scores_for_ntag(
                ntag,
                rtag_scores_dict,
                empty_rtag_scores,
            ).values()
        )
        for ntag in ntags
    }

    df = pd.DataFrame(data)
    df.index = empty_rtag_scores.keys()

    return df.T

In [44]:
class FlaskAppWorker(threading.Thread):
  _ready = threading.Event()
  daemon = True

  def __init__(self, flask) -> None:
    super().__init__()
    self.flask = flask
    self.is_shutdown = False

  def run(self):
    try:
      # idempotent if logging has already been set up
      logging.basicConfig()
      logging.getLogger("waitress").setLevel(logging.ERROR)

      desired_host = "127.0.0.1"
      desired_port = 5000
      self.server = create_server(
        self.flask,
        host=desired_host,
        port=desired_port,
        clear_untrusted_proxy_headers=True,
      )
      print("Serving on http://%s:%s" % (self.server.effective_host, self.server.effective_port))
      self._ready.set()
      self.server.run()

    except Exception:
      if not self.is_shutdown:
        raise

  def shutdown(self) -> None:
    self.is_shutdown = True
    sockets = list(self.server._map.values())  # type: ignore
    for socket in sockets:
      socket.handle_close()
    self.server.task_dispatcher.shutdown()