In [1]:
import torch
import numpy as np

In [5]:
def _quadratic_weighted_kappa(rater_a, rater_b, min_rating=0, max_rating=4):
    """
    Calculates the quadratic weighted kappa
    quadratic_weighted_kappa calculates the quadratic weighted kappa
    value, which is a measure of inter-rater agreement between two raters
    that provide discrete numeric ratings.  Potential values range from -1
    (representing complete disagreement) to 1 (representing complete
    agreement).  A kappa value of 0 is expected if all agreement is due to
    chance.
    quadratic_weighted_kappa(rater_a, rater_b), where rater_a and rater_b
    each correspond to a list of integer ratings.  These lists must have the
    same length.
    The ratings should be integers, and it is assumed that they contain
    the complete range of possible ratings.
    quadratic_weighted_kappa(X, min_rating, max_rating), where min_rating
    is the minimum possible rating, and max_rating is the maximum possible
    rating

    https://github.com/benhamner/Metrics/blob/master/Python/ml_metrics/quadratic_weighted_kappa.py
    """
    rater_a = np.array(rater_a, dtype=int)
    rater_b = np.array(rater_b, dtype=int)
    assert(len(rater_a) == len(rater_b))
    conf_mat = confusion_matrix(rater_a, rater_b, min_rating, max_rating)
    num_ratings = len(conf_mat)
    num_scored_items = float(len(rater_a))

    hist_rater_a = histogram(rater_a, min_rating, max_rating)
    hist_rater_b = histogram(rater_b, min_rating, max_rating)

    numerator = 0.0
    denominator = 0.0

    for i in range(num_ratings):
        for j in range(num_ratings):
            expected_count = (hist_rater_a[i] * hist_rater_b[j] / num_scored_items)
            d = pow(i - j, 2.0) / pow(num_ratings - 1, 2.0)
            numerator += d * conf_mat[i][j] / num_scored_items
            denominator += d * expected_count / num_scored_items

    return 1.0 - numerator / denominator


def confusion_matrix(rater_a, rater_b, min_rating=0, max_rating=4):
    """
    Returns the confusion matrix between rater's ratings
    """
    assert(len(rater_a) == len(rater_b))
    num_ratings = int(max_rating - min_rating + 1)
    conf_mat = [[0 for i in range(num_ratings)]
                for j in range(num_ratings)]
    for a, b in zip(rater_a, rater_b):
        conf_mat[a - min_rating][b - min_rating] += 1
    return conf_mat


def histogram(ratings, min_rating=0, max_rating=4):
    """
    Returns the counts of each type of rating that a rater made
    """
    num_ratings = int(max_rating - min_rating + 1)
    hist_ratings = [0 for x in range(num_ratings)]
    for r in ratings:
        hist_ratings[r - min_rating] += 1
    return hist_ratings

In [6]:
a = [1, 4, 2, 3, 1, 1, 1, 2, 4]

In [7]:
histogram(a)

[0, 4, 2, 1, 2]

In [9]:
np.histogram(np.array(a))

(array([4, 0, 0, 2, 0, 0, 1, 0, 0, 2]),
 array([1. , 1.3, 1.6, 1.9, 2.2, 2.5, 2.8, 3.1, 3.4, 3.7, 4. ]))

In [10]:
np.unique(a, return_counts=True)

(array([1, 2, 3, 4]), array([4, 2, 1, 2]))

In [11]:
np.bincount(a)

array([0, 4, 2, 1, 2])

In [27]:
np.random.randint(5, size=10)

array([1, 3, 2, 0, 4, 2, 4, 1, 0, 0])

In [29]:
from sklearn.metrics import cohen_kappa_score

In [47]:
%%timeit -n 100
_quadratic_weighted_kappa(
    np.random.randint(5, size=256),
    np.random.randint(5, size=256),
)

331 µs ± 29.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [48]:
%%timeit -n 100
cohen_kappa_score(
    np.random.randint(5, size=256),
    np.random.randint(5, size=256),
    labels=[0, 1, 2, 3, 4], weights='quadratic'
)

398 µs ± 18 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [103]:
dmx = dist_mx(5)

def custom_quadratic_weighted_kappa(rater_a, rater_b, n_values):
    rater_a = np.array(rater_a, dtype=int)
    rater_b = np.array(rater_b, dtype=int)
    assert(len(rater_a) == len(rater_b))
    conf_mat = custom_confusion_matrix(rater_a, rater_b, n_values)
    num_scored_items = rater_a.shape[0]

    # get distance matrix
    dmx = dist_mx(n_values)

    # get numerator matrix
    numerators = dmx * conf_mat
    expected_counts = np.zeros((n_values, n_values))

    # get denominator matrix
    hist_rater_a = np.bincount(rater_a, minlength=n_values)
    hist_rater_b = np.bincount(rater_b, minlength=n_values)
    for j in range(n_values):
        expected_counts[:, j] = hist_rater_a * hist_rater_b[j]
    denominators = dmx * expected_counts

    return 1.0 - (numerators.sum() * num_scored_items) / denominators.sum()

def dist_mx(size):
    mx = np.zeros((size, size))
    values = np.linspace(0, 1, size) ** 2
    for i in range(size - 1):
        mx[i, i:] = values[:size - i]
        mx[i:, i] = values[:size - i]
    return mx

def custom_confusion_matrix(rater_a, rater_b, n_values):
    """
    Returns the confusion matrix between rater's ratings
    """
    assert(len(rater_a) == len(rater_b))
    conf_mat = np.zeros((n_values, n_values))
    for i, a in enumerate(rater_a):
        conf_mat[a, rater_b[i]] += 1
    return conf_mat

In [104]:
%%timeit -n 100
custom_quadratic_weighted_kappa(
    np.random.randint(5, size=256),
    np.random.randint(5, size=256),
    5
)

290 µs ± 39.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [60]:
is_ = np.arange(5)
js_ = np.arange(5)

for j in js_:
    ds = ((is_ - j) / (5 - 1)) ** 2
    print(ds)

[0.     0.0625 0.25   0.5625 1.    ]
[0.0625 0.     0.0625 0.25   0.5625]
[0.25   0.0625 0.     0.0625 0.25  ]
[0.5625 0.25   0.0625 0.     0.0625]
[1.     0.5625 0.25   0.0625 0.    ]


In [61]:
np.linspace(0, 1, 5)

array([0.  , 0.25, 0.5 , 0.75, 1.  ])

In [88]:
d = np.zeros((5, 5))
d

array([[0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.]])

In [89]:
values = np.linspace(0, 1, 5) ** 2
values

array([0.    , 0.0625, 0.25  , 0.5625, 1.    ])

In [90]:
i = 0

In [95]:
d[i, i:] = values[:5-i]
d[i:, i] = values[:5-i]
i += 1
d

array([[0.    , 0.0625, 0.25  , 0.5625, 1.    ],
       [0.0625, 0.    , 0.0625, 0.25  , 0.5625],
       [0.25  , 0.0625, 0.    , 0.0625, 0.25  ],
       [0.5625, 0.25  , 0.0625, 0.    , 0.0625],
       [1.    , 0.5625, 0.25  , 0.0625, 0.    ]])

In [67]:
a = np.concatenate((np.flip(values), values), axis=None)
a

array([1.    , 0.5625, 0.25  , 0.0625, 0.    , 0.    , 0.0625, 0.25  ,
       0.5625, 1.    ])