In [None]:
# Half-Life as implemented by Duolingo 

In [1]:
import math
import numpy as np
from collections import defaultdict
import pandas as pd

In [None]:
current_dir = os.getcwd()
filename = 'df_processed.csv'
filepath = os.path.normpath(os.path.join(current_dir, '../data/processed/', filename))

chunk_size = 10000
chunks = []

for chunk in pd.read_csv(filepath, chunksize=chunk_size):
    chunk.drop_duplicates(inplace=True)
    chunk.dropna(inplace=True)
    chunks.append(chunk)

df = pd.concat(chunks, ignore_index=True)
df_users = pd.read_csv(os.path.normpath(os.path.join(current_dir, '../data/features/', 'users_behaviur.csv')))
df_words = pd.read_csv(os.path.normpath(os.path.join(current_dir, '../data/features/', 'word_complexity_features.csv')), sep='\t')

df_1 = df.merge(df_words, on = 'lexeme_id', how='inner')
df_2 = df_1.merge(df_users, on = ['user_id', 'lang_combination'], how='inner')
dff = df_2.drop(columns=['timestamp', 'lexeme_id', 'word', 'user_id', 'POS', 'person', 'number', 'gender', 'tense', 'def'])

In [2]:
a = pd.DataFrame({'lang':[1,2,3], 'halo':[0.01, 0, 0.99]})

In [None]:
# Changes to dataset before fitting 
dff['p_recall'] = pclip(dff['p_recall'])
dff['delta'] = dff['delta']/(60*60*24) # convert time delta to days
dff['half_life'] = hclip(-dff['delta']/math.log(dff['p_recall'],2)))

dff_final = dff.drop(columns=['learning_language', 'ui_language'])


# original                 
# h = hclip(-t/(math.log(p, 2)))
# lang = '%s->%s' % (row['ui_language'], row['learning_language'])
# lexeme_id = row['lexeme_id']
# lexeme_string = row['lexeme_string']
# timestamp = int(row['timestamp'])
# user_id = row['user_id']
# seen = int(row['history_seen'])
# right = int(row['history_correct'])
# wrong = seen - right
# right_this = int(row['session_correct'])
# wrong_this = int(row['session_seen']) - right_this

# they left only lang combination 
# used lexeme id 
# used timestamp and lexeme_string and user_id 
# h = hclip(-t/(math.log(p, 2)))

In [None]:
# data instance object
instances = list()
Instance = namedtuple('Instance', 'p_recal t feature_vector h_recall a lang right wrong ts user_id lexeme'.split())

# feature vector is a list of (feature, value) tuples
fv = []
fv.append((intern('right'), math.sqrt(1+right)))
fv.append((intern('wrong'), math.sqrt(1+wrong)))
fv.append((intern('%s:%s' % (row['learning_language'], lexeme_string)), 1.))

instances.append(Instance(p, t, fv, h, (right+2.)/(seen+4.), lang, right_this, wrong_this, timestamp, user_id, lexeme_string))
    # read data set
    trainset, testset = read_data(args.input_file, args.method, args.b, args.l, args.max_lines)
    sys.stderr.write('|train| = %d\n' % len(trainset))
    sys.stderr.write('|test|  = %d\n' % len(testset))

    # train model & print preliminary evaluation info
    model = SpacedRepetitionModel(method=args.method, omit_h_term=args.t)
    model.train(trainset)
    model.eval(testset, 'test')

# to namedtuples 
list(df.itertuples(name='Row', index=False))


In [5]:
# Constants
min_half_life = 15.0 / (24 * 60)  # 15 minutes in days
max_half_life = 274.0            # 9 months
LN2 = math.log(2)

# Utility functions
def pclip(p):
    """Clip recall probability to avoid numerical issues."""
    return np.clip(p, 0.001, 0.9999)

def hclip(h):
    """Clip half-life to a reasonable range."""
    return np.clip(h, min_half_life, max_half_life)

def mae(l1, l2):
    # mean average error
    return mean([abs(l1[i] - l2[i]) for i in range(len(l1))])

def mean(lst):
    # the average of a list
    return float(sum(lst))/len(lst)

In [None]:
# Define the HLR Model
class HalfLifeRegression:
    def __init__(self, learning_rate=0.001, hlwt=0.01, l2wt=0.1, sigma=1.):
        self.weights = defaultdict(float)  # Feature weights
        self.fcounts = defaultdict(int)    # Feature counts for adaptive learning rates
        self.learning_rate = learning_rate # Base learning rate
        self.hlwt = hlwt                   # Weight for half-life loss
        self.l2wt = l2wt                   # L2 regularization weight
        self.sigma = sigma
        if initial_weights is not None:
            self.weights.update(initial_weights)

    def halflife(self, inst):
        """Compute predicted half-life based on feature vector."""
        try:
            dp = sum([self.weights[k]*x_k for (k, x_k) in inst.fv]) # where inst.fb is the feature vector
            return hclip(2 ** dp)
        except: 
            return MAX_HALF_LIFE
            

    def predict(self, inst):
        """Predict recall probability and half-life."""
        h = self.halflife(inst)
        p = 2 ** (-inst.t / h) # where inst.t is the delta t 
        return pclip(p), h

                
     def train_update(self, inst):
        """Update weights using one training instance."""
        p_pred, h_pred = self.predict(inst)

        # Compute gradients
        dlp_dw = 2*(p_pred - inst.p)*(LN2 ** 2)*p_pred*(inst.t/h_pred)
        dlh_dw = 2*(h_pred - inst.h)*LN2*h_pred

        # Update weights
        for (k, x_k) in inst.fv:
            rate = (1./(1+inst.p)) * self.lrate / math.sqrt(1 + self.fcounts[k])
            self.weights[k] -= rate * dlp_dw * x_k  # Update for recall probability loss
            self.weights[k] -= rate * self.hlwt * dlh_dw * x_k  # Update for half-life loss
            self.weights[k] -= rate * self.l2wt * self.weights[k] / self.sigma**2  # L2 regularization
            self.fcounts[k] += 1


    def train(self, trainset):
        random.shuffle(trainset)
        for instance in trainset:
            self.train_update(inst)

    def evaluate(self, testset):
        """Evaluate the model on a test dataset."""
        results = {'p': [], 'h': [], 'pp': [], 'hh': [], 'slp': [], 'slh': []}
        for instance in testset:
            slp, slh, p, h = self.losses(inst)
            results['p'].append(inst.p)     # ground truth
            results['h'].append(inst.h)
            results['pp'].append(p)         # predictions
            results['hh'].append(h)
            results['slp'].append(slp)      # loss function values
            results['slh'].append(slh)
            mae_p = mae(results['p'], results['pp'])
            mae_h = mae(results['h'], results['hh'])
            total_slp = sum(results['slp'])
            total_slh = sum(results['slh'])
            total_l2 = sum([x**2 for x in self.weights.values()])
            total_loss = total_slp + self.hlwt*total_slh + self.l2wt*total_l2
        print(f"SLP Loss: {total_slp}, SLH Loss: {total_slh}, MAE_P: {mae_p}, MAE_H: {mae_h}, total loss {total_loss}")

In [None]:
model = HalfLifeRegression()
model.train(trainset)
model.evaluate(testset)