In [14]:
import pandas as pd
import numpy as np
from collections import Counter, defaultdict
import math
import re
import spacy

In [15]:
train_df = pd.read_csv('split/train.csv')
val_df = pd.read_csv('split/val.csv')
test_df = pd.read_csv('split/test.csv')

In [16]:
lex_df = pd.read_csv('NRC-VAD-Lexicon-v2.1/NRC-VAD-Lexicon-v2.1.txt', sep='\t')
lex_df = lex_df.rename(columns={'valence':'v', 'arousal':'a'})[['term','v','a']].dropna()
lex_df['term'] = lex_df['term'].str.strip().str.lower()

lex_df['v'] = pd.to_numeric(lex_df['v'], errors='coerce').clip(-1, 1)
lex_df['a'] = pd.to_numeric(lex_df['a'], errors="coerce").clip(-1, 1)

lex_df = lex_df.dropna(subset=['v','a']).groupby('term', as_index=False).mean()
lex_df = {r.term:(float(r.v), float(r.a)) for r in lex_df.itertuples(index=False)}

In [17]:
NEG_BETA = 0.8
EXCL_BUMP = 0.05
DECAY = [1.0, 0.95, 0.90]
BUT = {'but', 'however', 'though', 'although', 'yet'}
NEG = {'not', 'n\'t', 'never', 'no', 'without', 'none', 'hardly', 'scarcely', 'barely'}
INT = {'very', 'really', 'extremely', 'so', 'super', 'absolutely', 'totally', 'too', 'quite', 'highly', 'truly', 'deeply', 'incredibly', 'terribly', 'awfully'}
DOWN = {'slightly', 'somewhat', 'rather', 'kinda', 'a bit', 'bit', 'mildly', 'partly'}
PUNCT_STOP = {'.', ',', ';', '?', '!', '—', '-', '…', ':', '–'}

nlp = spacy.load('en_core_web_sm', disable=['ner'])


def norm_elong(s: str) -> str:
    return re.sub(r'(.)\1{2,}', r'\1\1', s)


def parse_text(x: str, is_words: bool = False):
    x = norm_elong(x or '')
    ex = x.count('!')
    if is_words:
        items = [w.strip().lower() for w in re.split(r'[,\|;/]+', x) if w.strip()]
        toks = [
            {'lemma': w, 'pos': 'ADJ', 'txt': w, 'punct': False}
            for w in items
        ]
        return toks, ex
    doc = nlp(x)
    toks = [
        {
            'lemma': t.lemma_.lower(),
            'pos': t.pos_,
            'txt': t.text,
            'punct': t.is_punct,
        }
        for t in doc
        if not t.is_space
    ]
    return toks, ex


def build_idf(texts, flags, lex_df):
    N = len(texts)
    df = defaultdict(int)
    for x, iw in zip(texts, flags):
        toks, _ = parse_text(x, bool(iw))
        seen = {t['lemma'] for t in toks if t['lemma'] in lex_df}
        for w in seen:
            df[w] += 1
    return {
        t: math.log((N + 1) / (df.get(t, 0) + 1)) + 1.0
        for t in lex_df.keys()
    }



def left_idxs(toks, i: int, k: int = 3):
    out = []
    j = i - 1
    while j >= 0 and len(out) < k:
        if toks[j]['punct'] and toks[j]['txt'] in PUNCT_STOP:
            break
        out.append(j)
        j -= 1
    return out


def adjust_va(v: float, a: float, toks, i: int):
    g = 0.0
    neg = False
    for k, j in enumerate(left_idxs(toks, i, 3)):
        w = toks[j]['lemma']
        d = DECAY[k] if k < len(DECAY) else DECAY[-1]
        if w in INT:
            g += 0.3 * d
        if w in DOWN:
            g += -0.2 * d
        if w in NEG:
            neg = True
    v = (1.0 + g) * v
    a = (1.0 + g) * a
    if neg:
        v = -NEG_BETA * v
    v = float(np.clip(v, -2.0, 2.0))
    a = float(np.clip(a, -1.0, 1.0))
    return v, a


def but_idx(toks):
    for i, t in enumerate(toks):
        if t['lemma'] in BUT:
            return i
    return None


def token_weights(toks, idf, lex_df):
    wV, wA, vs, as_ = [], [], [], []
    for i, t in enumerate(toks):
        lem = t['lemma']
        if lem not in lex_df:
            wV.append(0.0)
            wA.append(0.0)
            vs.append(0.0)
            as_.append(0.0)
            continue
        v0, a0 = lex_df[lem]
        v, a = adjust_va(v0, a0, toks, i)
        wb = idf.get(lem, 1.0)
        wV.append(wb)
        wA.append(wb)
        vs.append(v)
        as_.append(a)
    return (
        np.asarray(wV, float),
        np.asarray(wA, float),
        np.asarray(vs, float),
        np.asarray(as_, float),
    )



def aggregate_doc(toks, idf, lex_df):
    lem = [t['lemma'] for t in toks if t['lemma'] in lex_df]
    if not lem:
        return 0.0, 0.0
    wV, wA, vs, as_ = token_weights(toks, idf, lex_df)
    p = but_idx(toks)
    if p is not None:
        pre = np.arange(len(toks)) < p
        post = ~pre
        wV[pre] *= 0.3
        wV[post] *= 0.7
        wA[pre] *= 0.3
        wA[post] *= 0.7
    eps = 1e-8
    V = float(np.nan_to_num(np.nansum(wV * vs) / (np.nansum(wV) + eps)))
    A = float(np.nan_to_num(np.nansum(wA * as_) / (np.nansum(wA) + eps)))
    V = float(np.clip(V, -2.0, 2.0))
    A = float(np.clip(A, -1.0, 1.0))
    return V, A


def bump_excl(A: float, ex: int):
    return float(
        np.clip(A + EXCL_BUMP * min(3, int(ex)), -1.0, 1.0)
    )


def fit_idf(train_df: pd.DataFrame, lex_df):
    return build_idf(
        train_df['text'].tolist(),
        train_df['is_words'].tolist(),
        lex_df,
    )



def score_df(df: pd.DataFrame, idf, lex_df):
    VV, AA = [], []
    for x, iw in zip(df['text'], df['is_words']):
        toks, ex = parse_text(str(x), bool(iw))
        v, a = aggregate_doc(toks, idf, lex_df)
        a = bump_excl(a, ex)
        VV.append(v)
        AA.append(a)
    out = df.copy()
    out['V'] = VV
    out['A'] = AA
    return out


def fit_lin(y_true, y_pred):
    x = np.asarray(y_pred)
    y = np.asarray(y_true)
    x = np.c_[np.ones_like(x), x]
    w, *_ = np.linalg.lstsq(x, y, rcond=None)
    return float(w[0]), float(w[1])


def apply_lin(x, w0: float, w1: float, lo: float, hi: float):
    return np.clip(w0 + w1 * np.asarray(x), lo, hi)


def apply_global(dfsc: pd.DataFrame, w0v: float, w1v: float, w0a: float, w1a: float):
    dfsc = dfsc.copy()
    dfsc['V_g'] = apply_lin(dfsc['V'], w0v, w1v, -2.0, 2.0)
    dfsc['A_g'] = apply_lin(dfsc['A'], w0a, w1a, -1.0, 1.0)
    return dfsc


def fit_local(train_g: pd.DataFrame, min_user_n: int = 6):
    cal_v = {}
    cal_a = {}
    for uid, g in train_g.groupby('user_id'):
        if len(g) >= min_user_n:
            cal_v[uid] = fit_lin(g['valence'], g['V_g'])
            cal_a[uid] = fit_lin(g['arousal'], g['A_g'])
    return cal_v, cal_a


def apply_local(dfsc: pd.DataFrame, cal_v, cal_a):
    dfsc = dfsc.copy()
    Vh, Ah = [], []
    for r in dfsc.itertuples(index=False):
        uid = getattr(r, 'user_id')
        v_in = getattr(r, 'V_g')
        a_in = getattr(r, 'A_g')
        if uid in cal_v and uid in cal_a:
            w0, w1 = cal_v[uid]
            vv = apply_lin(v_in, w0, w1, -2.0, 2.0)
            w0, w1 = cal_a[uid]
            aa = apply_lin(a_in, w0, w1, -1.0, 1.0)
        else:
            vv, aa = v_in, a_in
        Vh.append(float(vv))
        Ah.append(float(aa))
    dfsc['V_local'] = Vh
    dfsc['A_local'] = Ah
    return dfsc


def eval(df: pd.DataFrame, v_col_pred: str, a_col_pred: str):
    dv = df['valence'] - df[v_col_pred]
    da = df['arousal'] - df[a_col_pred]
    out = {
        'V_MAE': float(np.mean(np.abs(dv))),
        'V_RMSE': float(np.sqrt(np.mean(dv**2))),
        'A_MAE': float(np.mean(np.abs(da))),
        'A_RMSE': float(np.sqrt(np.mean(da**2))),
    }
    return out


In [18]:
idf = fit_idf(train_df, lex_df)

train_s = score_df(train_df, idf, lex_df)
val_s = score_df(val_df, idf, lex_df)
test_s = score_df(test_df, idf, lex_df)

w0v, w1v = fit_lin(train_df['valence'], train_s['V'])
w0a, w1a = fit_lin(train_df['arousal'], train_s['A'])
train_g = apply_global(train_s, w0v, w1v, w0a, w1a)
val_g = apply_global(val_s, w0v, w1v, w0a, w1a)
test_g = apply_global(test_s, w0v, w1v, w0a, w1a)
cal_v, cal_a = fit_local(train_g, min_user_n=6)
val_final = apply_local(val_g, cal_v, cal_a)
test_final = apply_local(test_g, cal_v, cal_a)

res_val = eval(val_final, 'V_local', 'A_local')
res_test = eval(test_final, 'V_local', 'A_local')

print(res_test)

{'V_MAE': 0.8545317522903663, 'V_RMSE': 1.0674621103034159, 'A_MAE': 0.558384987161822, 'A_RMSE': 0.6760707365269816}
