<a href="https://colab.research.google.com/github/open-spaced-repetition/fsrs4anki/blob/main/fsrs4anki_optimizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Upload your Anki Collection Package file or Anki Deck Package file. No need to include media. Need to include scheduling information.

You can export it via `File -> Export...` or `Ctrl + E` in the main window of Anki.

Then replace the `filename` with yours in the next code cell.

After that, just run all (`Runtime -> Run all` or `Ctrl + F9`).

In [1]:
filename = "ALL__Learnig.apkg" 
# If you upload deck file, replace it with your deck filename. E.g., ALL__Learnig.apkg 
# If you upload collection file, replace it with your colpgk filename. E.g., collection-2022-09-18@13-21-58.colpkg

import zipfile
# Extract the collection file to get the .anki21 database.
with zipfile.ZipFile(f'./{filename}', 'r') as zip_ref:
    zip_ref.extractall('./')
    print("Extract successfully!")

Extract successfully!


In [2]:
import sqlite3
import time
import tqdm
import pandas as pd
from datetime import timedelta
from tqdm import tqdm

The following code cell will extract the review logs from your Anki collection and preprocess them to a trainset which is saved in `revlog_history.tsv`.

 The time-series features are important in optimizing the model's parameters. For more detail, please see my paper: https://www.maimemo.com/paper/

In [3]:
timezone = 'Asia/Shanghai'  # Replace it with your timezone. I'm in China, so I use Asia/Shanghai.
next_day_starts_at = 6  # Replace it with your Anki's setting in Prefernces -> Scheduling.

con = sqlite3.connect("collection.anki21")
cur = con.cursor()
res = cur.execute("SELECT * FROM revlog")
revlog = res.fetchall()

df = pd.DataFrame(revlog)
df.columns = ['id', 'cid', 'usn', 'r', 'ivl', 'last_lvl', 'factor', 'time', 'type']
df = df[(df['cid'] <= time.time() * 1000) & (df['id'] <= time.time() * 1000)]
df['create_date'] = pd.to_datetime(df['cid'] // 1000, unit='s')
df['create_date'] = df['create_date'].dt.tz_localize('UTC').dt.tz_convert(timezone)
df['review_date'] = pd.to_datetime(df['id'] // 1000, unit='s')
df['review_date'] = df['review_date'].dt.tz_localize('UTC').dt.tz_convert(timezone)
df.sort_values(by=['cid', 'id'], inplace=True, ignore_index=True)
df.to_csv("revlog.csv", index=False)
df = df[(df['type'] == 0) | (df['type'] == 1)]
df['real_date'] = df['review_date'].map(lambda x: x - timedelta(days=1) if x.hour < next_day_starts_at else x)
df['real_date'] = df['real_date'].dt.floor('D')
df.drop(df[df['real_date'].dt.year < 2006].index, inplace=True)
df.drop_duplicates(['cid', 'real_date'], keep='first', inplace=True)
df['delta_t'] = df.real_date.diff().dt.days
df.dropna(inplace=True)
df['delta_t'] = df['delta_t'].astype(dtype=int)
df['i'] = 1
df['r_history'] = ""
df['t_history'] = ""
col_idx = {key: i for i, key in enumerate(df.columns)}


# code from https://github.com/L-M-Sherlock/anki_revlog_analysis/blob/main/revlog_analysis.py
def get_feature(x):
    for idx, log in enumerate(x.itertuples()):
        if idx == 0:
            x.iloc[idx, col_idx['delta_t']] = 0
        if idx == x.shape[0] - 1:
            break
        x.iloc[idx + 1, col_idx['i']] = x.iloc[idx, col_idx['i']] + 1
        x.iloc[idx + 1, col_idx['t_history']] = f"{x.iloc[idx, col_idx['t_history']]},{x.iloc[idx, col_idx['delta_t']]}"
        x.iloc[idx + 1, col_idx['r_history']] = f"{x.iloc[idx, col_idx['r_history']]},{x.iloc[idx, col_idx['r']]}"
    return x


tqdm.pandas()
df = df.groupby('cid', as_index=False).progress_apply(get_feature)
df["t_history"] = df["t_history"].map(lambda x: x[1:] if len(x) > 1 else x)
df["r_history"] = df["r_history"].map(lambda x: x[1:] if len(x) > 1 else x)
df.to_csv('revlog_history.tsv', sep="\t", index=False)
print("Trainset saved")

100%|██████████| 5166/5166 [01:05<00:00, 79.32it/s] 


Trainset saved


In [4]:
import math
import sys
import torch
import datetime
import numpy as np
import matplotlib.pyplot as plt
from torch import nn
from sklearn.utils import shuffle

The default parameters of FSRS.

In [5]:
defaultDifficulty = 5
defaultStability = 2
difficultyDecay = -0.7
stabilityDecay = -0.2
increaseFactor = 3
lapsesBase = -0.3

FSRS is a time-series model for predicting memory states.

In [6]:
class FSRS(nn.Module):
    def __init__(self):
        super(FSRS, self).__init__()
        self.f_s = nn.Parameter(torch.FloatTensor([defaultStability]))
        # init stability
        self.f_d = nn.Parameter(torch.FloatTensor([defaultDifficulty]))
        # init difficulty
        self.s_w = nn.Parameter(torch.FloatTensor(
            [increaseFactor, difficultyDecay, stabilityDecay, lapsesBase]))
        self.zero = torch.FloatTensor([0.0])

    def forward(self, x, s, d, l):
        '''
        :param x: [review interval, review response]
        :param s: stability
        :param d: difficulty
        :param l: lapses
        :return:
        '''
        if torch.equal(s, torch.FloatTensor([0.0])):
            # first learn, init memory states
            next_s = self.f_s[0] * 0.25 * torch.pow(2, x[1] - 1)
            next_d = self.f_d[0] - x[1] + 3
            next_l = torch.relu(2-x[1])
        else:
            next_s = (1 - torch.relu(2-x[1])) * s * \
                             (1 + torch.exp(self.s_w[0]) * torch.pow(d + 0.1, self.s_w[1]) *
                              torch.pow(s, self.s_w[2]) *
                              (torch.exp(1 - torch.exp(np.log(0.9) * x[0] / s)) - 1)) + \
                             torch.relu(2-x[1]) * self.f_s[0] * torch.exp(self.s_w[3] * l)
            next_d = torch.relu(d + torch.exp(np.log(0.9) * x[0] / s) - 0.25 * torch.pow(2, x[1] - 1) + 0.1)
            next_l = l + torch.relu(2-x[1])
        return next_s, next_d, next_l

    def loss(self, s, t, r):
        return - (r * np.log(0.9) * t / s + (1 - r) * torch.log(1 - torch.exp(np.log(0.9) * t / s)))


class WeightClipper(object):
    def __init__(self, frequency=1):
        self.frequency = frequency

    def __call__(self, module):
        if hasattr(module, 'f_s'):
            w = module.f_s.data
            w = w.clamp(0.1, 10)
            module.f_s.data = w
        if hasattr(module, 'f_d'):
            w = module.f_d.data
            w = w.clamp(1, 10)
            module.f_d.data = w
        if hasattr(module, 's_w'):
            w = module.s_w.data
            w[0] = w[0].clamp(0.01, 10)
            w[1] = w[1].clamp(-1, -0.01)
            w[2] = w[2].clamp(-1, -0.01)
            w[3] = w[3].clamp(-1, -0.01)
            module.s_w.data = w


def lineToTensor(line):
    ivl = line[0].split(',')
    response = line[1].split(',')
    tensor = torch.zeros(len(response), 2)
    for li, response in enumerate(response):
        tensor[li][0] = int(ivl[li])
        tensor[li][1] = int(response)
    return tensor

Training approximately spends (the number of logs / 10000) minutes to optimize the parameters.

In [7]:
model = FSRS()
clipper = WeightClipper()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

dataset = pd.read_csv("./revlog_history.tsv", sep='\t', index_col=None)
dataset = dataset[dataset['i'] > 1]

n_epoch = 1
print_len = dataset.shape[0] // 10

checkpoint = {
    "net": model.state_dict(),
    'optimizer': optimizer.state_dict(),
    "epoch": -1
}

for k in range(n_epoch):
    dataset = shuffle(dataset, random_state=2022 + k)
    epoch_len = len(dataset)
    for i, (_, row) in enumerate(tqdm(dataset.iterrows(), total=epoch_len, position=0, leave=False, desc="train",file=sys.stdout)):
        model.train()
        optimizer.zero_grad()
        line_tensor = lineToTensor(
            list(zip([row['t_history']], [row['r_history']]))[0])
        output_t = [(model.zero, model.zero, model.zero)]
        for input_t in line_tensor:
            output_t.append(model(input_t, *output_t[-1]))
        loss = model.loss(output_t[-1][0], row['delta_t'], {1: 0, 2: 1, 3: 1, 4: 1}[row['r']])
        loss.backward()
        optimizer.step()
        model.apply(clipper)

        if (k * epoch_len + i) % print_len == 0:
            tqdm.write(f"iteration: {k * epoch_len + i + 1}")
            for name, param in model.named_parameters():
                tqdm.write(f"{name}: {param}")

            checkpoint = {
                "net": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "iteration": (k * epoch_len + i) // print_len
            }

end = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
torch.save(checkpoint, f'./model-{end}.pth')

defaultStability = round(float(dict(model.named_parameters())['f_s'].data),4)
defaultDifficulty = round(float(dict(model.named_parameters())['f_d'].data),4)
increaseFactor, difficultyDecay, stabilityDecay, lapsesBase = map(lambda x: round(float(x), 4), dict(model.named_parameters())['s_w'].data)

print("Training finished!")

iteration: 0
f_s: Parameter containing:
tensor([2.0001], requires_grad=True)
f_d: Parameter containing:
tensor([5.0001], requires_grad=True)
s_w: Parameter containing:
tensor([ 2.9999, -0.7001, -0.2001, -0.3001], requires_grad=True)
iteration: 5692
f_s: Parameter containing:
tensor([2.0582], requires_grad=True)
f_d: Parameter containing:
tensor([4.8983], requires_grad=True)
s_w: Parameter containing:
tensor([ 3.0868, -0.6227, -0.1480, -0.2712], requires_grad=True)
iteration: 11384
f_s: Parameter containing:
tensor([2.1142], requires_grad=True)
f_d: Parameter containing:
tensor([4.8193], requires_grad=True)
s_w: Parameter containing:
tensor([ 3.1483, -0.5716, -0.1110, -0.2471], requires_grad=True)
iteration: 17076
f_s: Parameter containing:
tensor([2.1938], requires_grad=True)
f_d: Parameter containing:
tensor([4.7605], requires_grad=True)
s_w: Parameter containing:
tensor([ 3.1875, -0.5427, -0.0908, -0.2096], requires_grad=True)
iteration: 22768
f_s: Parameter containing:
tensor([2.261

Copy the optimal parameters for FSRS for you in the output of next code cell after running.

The code of Anki custom scheduling is at https://github.com/open-spaced-repetition/fsrs4anki

In [8]:
print(f"const defaultDifficulty = {defaultDifficulty};")
print(f"const defaultStability = {defaultStability};")
print(f"const difficultyDecay = {difficultyDecay};")
print(f"const stabilityDecay = {stabilityDecay};")
print(f"const increaseFactor = {increaseFactor};")
print(f"const lapsesBase = {lapsesBase};")

const defaultDifficulty = 4.5839;
const defaultStability = 2.5952;
const difficultyDecay = -0.5624;
const stabilityDecay = -0.1172;
const increaseFactor = 3.2327;
const lapsesBase = -0.0412;


You can see the memory states and intervals generated by FSRS as if you press the good in each review at the due date scheduled by FSRS.

In [9]:
class Collection:
    def __init__(self):
        self.model = model

    def next_states(self, t_history, r_history):
        with torch.no_grad():
            line_tensor = lineToTensor(list(zip([t_history], [r_history]))[0])
            output_t = [(self.model.zero, self.model.zero, self.model.zero)]
            for input_t in line_tensor:
                output_t.append(self.model(input_t, *output_t[-1]))
            return output_t[-1]

my_collection = Collection()
t_history = "0"
r_history = "3"
for i in range(15):
    states = my_collection.next_states(t_history, r_history)
    print(states)
    next_t = round(float(states[0]))
    t_history += f',{int(next_t)}'
    r_history += ",3"
print(t_history)

(tensor(2.5952), tensor(4.5839), tensor(0.))
(tensor(5.5947), tensor(4.5692), tensor(0.))
(tensor(11.0884), tensor(4.5624), tensor(0.))
(tensor(20.3944), tensor(4.5631), tensor(0.))
(tensor(36.1469), tensor(4.5649), tensor(0.))
(tensor(62.6541), tensor(4.5653), tensor(0.))
(tensor(106.1411), tensor(4.5648), tensor(0.))
(tensor(174.9307), tensor(4.5649), tensor(0.))
(tensor(282.0350), tensor(4.5649), tensor(0.))
(tensor(445.2276), tensor(4.5649), tensor(0.))
(tensor(689.3261), tensor(4.5650), tensor(0.))
(tensor(1048.3844), tensor(4.5650), tensor(0.))
(tensor(1568.3268), tensor(4.5650), tensor(0.))
(tensor(2310.3726), tensor(4.5651), tensor(0.))
(tensor(3355.0225), tensor(4.5651), tensor(0.))
0,3,6,11,20,36,63,106,175,282,445,689,1048,1568,2310,3355
