In [1]:
import numpy as np
import pandas as pd
import pymc3 as pm
import arviz as az
import theano.tensor as tt
import scipy as sp
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

In [2]:
# getting the data

data = pd.read_csv('cleaned_data.csv' , sep = ' ' , header = None)
data

Unnamed: 0,0,1,2,3,4
0,0,0,0,0,0
1,0,0,0,0,0
2,0,0,0,0,0
3,0,0,0,0,1
4,0,0,0,0,1
...,...,...,...,...,...
995,1,1,1,1,1
996,1,1,1,1,1
997,1,1,1,1,1
998,1,1,1,1,1


In [3]:
data_matrix = []

for row in data.iterrows():
    data_matrix.append(list(row[1].values))

data_matrix = np.array(data_matrix)
data_matrix

array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       ...,
       [1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1]])

Rasch Model

$$\Pr(X_{ij}=1) = \frac{e^{-\alpha_j + \beta\times\theta_i}}{1 + e^{-\alpha_j + \beta\times\theta_i}}$$

$\beta_n$ = ability of student n

$\delta_i$ = difficulty of item i

https://towardsdatascience.com/a-bayesian-approach-to-rasch-models-item-response-theory-cc08805cbb37

On doing some simple mathematics, we get the likelihood to be:

$\mathcal{L}(x | \{\alpha_j\}_{j=1}^Q , \{\theta_i\}_{i=1}^N , \beta) = \frac{\exp\left(\beta\sum\limits_{i=1}^N \theta_i r_i - \sum\limits_{j=1}^Q\alpha_j s_j\right)}{\prod\limits_{i=1}^Q\prod\limits_{j=1}^N 1 + \exp\left({\beta\theta_i - \alpha_j}\right)}$

where $r_i$ is the score of the $i^{th}$ student and $s_j$ is the score achieved on the $j^{th}$ question.


We also have our priors (all the means are zero):

$\pi(\alpha_j) = \frac{1}{\sqrt{2\pi\sigma_\alpha^2}} \exp\left({-\frac{\alpha_j^2}{2\sigma_\alpha^2}}\right)$

$\pi(\theta_i) = \frac{1}{\sqrt{2\pi\sigma_\theta^2}} \exp\left({-\frac{\theta_i^2}{2\sigma_\theta^2}}\right)$

$\pi(\beta) = \frac{1}{\sqrt{2\pi\sigma_\beta^2}} \exp\left({-\frac{\beta^2}{2\sigma_\beta^2}}\right) \times \mathbf{1}\left(\beta \geq 0\right)$

Thus, the posterior distribution is given by:

$\pi(\{\alpha_j\}_{j=1}^Q , \{\theta_i\}_{i=1}^N , \beta | x) = \mathcal{L}(x | \{\alpha_j\}_{j=1}^Q , \{\theta_i\}_{i=1}^N , \beta) \times \prod\limits_{j=1}^Q\pi(\alpha_j) \times \prod\limits_{i=1}^N\pi(\theta_i) \times \pi(\beta)$

For more details about the mathematics, please refer to the paper.

Since we wish to sample for $\{\alpha_j\}_{j=1}^Q , \{\theta_i\}_{i=1}^N , \beta$ , we would have to obtain the  conditional distributions of $\alpha_j , \theta_i , \beta$.

$\pi(\alpha_j | \alpha_1 , \ldots , \alpha_{j-1} , \alpha_{j+1} , \ldots, \alpha_Q , \{\theta_i\}_{i=1}^N , \beta, x) = \frac{\exp\left(-\alpha_j s_j - \frac{\alpha_j^2 }{2\sigma_\alpha^2}\right)}{\prod\limits_{i=1}^Q\prod\limits_{j=1}^N 1 + \exp\left({\beta\theta_i - \alpha_j}\right)}$

$\pi(\beta | \{\alpha_j\}_{j=1}^Q , \{\theta_i\}_{i=1}^N , x) = \frac{\exp\left(\beta\sum\limits_{i=1}^N \theta_i r_i  - \frac{\beta^2 }{2\sigma_\beta^2}\right)}{\prod\limits_{i=1}^Q\prod\limits_{j=1}^N 1 + \exp\left({\beta\theta_i - \alpha_j}\right)}$

$\pi(\theta_i | \theta_1 , \ldots , \theta_{i-1} , \theta_{i+1} , \ldots, \theta_N , \{\alpha_j\}_{j=1}^Q , \beta, x) = \frac{\exp\left(\beta\theta_i r_i  - \frac{\theta_i^2 }{2\sigma_\theta^2}\right)}{\prod\limits_{i=1}^Q\prod\limits_{j=1}^N 1 + \exp\left({\beta\theta_i - \alpha_j}\right)}$

In [4]:
score_per_question = np.sum(data, axis = 0)
score_per_student = np.sum(data, axis = 1)

In [5]:
# define some constants according to the constraints in the question
N = 1000
Q = 5
sigma_alpha = 1
mean_alpha = 0
mean_beta = 0
sigma_beta = 10
mean_theta = 0
sigma_theta = 1

In [6]:
def initialize_chain():

    alpha = np.random.normal(mean_alpha , sigma_alpha , size = Q)
    alpha -= alpha.mean()

    # halfFlat is a uniform distribution between 0 and infinity
    # beta = 1

    beta = np.random.normal(mean_beta , sigma_beta)
    beta *= (beta >= 0)
    # beta = 1.5
    theta = np.random.normal(mean_theta , sigma_theta , size = N)
    return alpha, beta, theta

In [7]:
def log_den(alpha , beta , theta):
    log_den = np.float(0)
    for i in range(N):
        for j in range(Q):
            log_den += np.log(1 + np.exp(beta * theta[i] - alpha[j]))
    if np.isnan(log_den) or log_den > 2e5:
        return 2e5
    return log_den

def target_alpha(alpha, beta, theta , j , log_den):
    m = (-alpha[j] * score_per_question[j] - 0.5 * (alpha[j]/sigma_alpha)**2)# - log_den)
    return m

def target_beta(alpha, beta, theta , log_den):
    m = (np.sum(np.dot(theta , score_per_student)) - 0.5 * (beta/sigma_beta)**2)# - log_den)
    return m

def target_theta(alpha, beta, theta , i , log_den):
    m = (beta * theta[i] * score_per_student[i] - 0.5 * (theta[i]/sigma_theta)**2)# - log_den)
    return m

In [8]:
def mcmc_sampler(num_chains , num_steps, burn_in):
    eps = 1e-6
    alpha_samples = []
    beta_samples = []
    theta_samples = []
    
    for chain_num in range(num_chains):
        alpha , beta , theta = initialize_chain()

        alpha_chain = []
        beta_chain = []
        theta_chain = []
        
        # we assume proposal distribution to be symmetric
        for step in range(num_steps):
            proposal_std_alpha_theta = 1
            proposal_std_beta = 0.5
            # proposal_std = num_steps
            proposal_alpha = lambda x: np.random.normal(x , proposal_std_alpha_theta)
            proposal_beta = lambda x: np.random.normal(x , proposal_std_beta)
            proposal_theta = lambda x: np.random.normal(x , proposal_std_alpha_theta)

            alpha_new = np.array([proposal_alpha(alpha_j) for alpha_j in alpha])
            alpha_new -= alpha_new.mean()
            beta_new = proposal_beta(beta)
            beta_new = beta_new * (beta_new >= 0)
            theta_new = np.array([proposal_theta(theta_i) for theta_i in theta])

            target_new = 0
            target_old = 0
            log_den_new = log_den(alpha_new , beta_new , theta_new)
            log_den_old = log_den(alpha , beta , theta)


            for j in range(Q):
                target_new += target_alpha(alpha_new, beta_new, theta_new , j , log_den_new)
                target_old += target_alpha(alpha, beta, theta , j , log_den_old)
            
            for i in range(N):
                target_new += target_theta(alpha_new, beta_new, theta_new , i , log_den_new)
                target_old += target_theta(alpha, beta, theta , i , log_den_old)
            
            target_new += target_beta(alpha_new, beta_new, theta_new , log_den_new)
            target_old += target_beta(alpha, beta, theta , log_den_old)

            acceptance_prob = target_new / target_old
            # print('(Chain: {}, Step: {}/{})=> Acceptance Probability = {:.4f}'.format(chain_num+1, step+1, num_steps, acceptance_prob) , end = '\r')

            u = np.random.rand()
            if u < min(1 , acceptance_prob):
                if step > burn_in-1:
                    alpha_chain.append(alpha_new)
                    beta_chain.append(beta_new)
                    theta_chain.append(theta_new)
                
                alpha = alpha_new
                beta = beta_new
                theta = theta_new

            else:
                alpha_chain.append(alpha)
                beta_chain.append(beta)
                theta_chain.append(theta)

        
        alpha_samples.append(np.array(alpha_chain))
        beta_samples.append(np.array(beta_chain))
        theta_samples.append(np.array(theta_chain))
    return alpha_samples , beta_samples , theta_samples


In [9]:
def runner_and_accuracy(num_chains , num_samples , burn_in):
    print("Number of chains: {} , Number of samples = {}".format(num_chains , num_samples))
    
    s_alpha, s_beta, s_theta = mcmc_sampler(num_chains ,  num_samples, burn_in)
    
    means_alpha = np.zeros(5)
    for i in range(len(s_alpha)):
        means_alpha+=np.mean(s_alpha[i], axis = 0)
    means_alpha /= len(s_alpha)
    print("Means of alpha: {}".format(means_alpha))

    means_beta = np.zeros(1)
    for i in range(len(s_beta)):
        means_beta+=np.mean(s_beta[i], axis = 0)
    means_beta /= len(s_beta)
    print("Means of beta: {}".format(means_beta))

    means_theta = np.zeros(1000)
    for i in range(len(s_theta)):
        means_theta+=np.mean(s_theta[i], axis = 0)
    means_theta /= len(s_theta)

    # calculating the accuracy
    error = 0

    for i in range(1000):
        for j in range(5):
            linear_term = means_beta[0] * means_theta[i] - means_theta[j]
            prob = np.exp(linear_term) / (1 + np.exp(linear_term))
            if prob > 0.5:
                term = 1
            else:
                term = 0

            if term != data_matrix[i][j]:
                error += 1

    print("Using {} chains and {} samples, the error is {}, resulting in an accuracy of {}"\
        .format(num_chains , num_samples , error , (1 - error/5000)*100))
    
    return

In [10]:
runner_and_accuracy(25 , 150 , 10)

Number of chains: 25 , Number of samples = 150
Means of alpha: [ 1.56571195 -0.59503457 -1.29266581  1.37136127 -1.04937284]
Means of beta: [5.00816939]
Using 25 chains and 150 samples, the error is 2525, resulting in an accuracy of 49.5


In [11]:
runner_and_accuracy(50 , 150 , 10)

Number of chains: 50 , Number of samples = 150
Means of alpha: [-0.40791813 -0.70830124  0.52051596  0.05942165  0.53628176]
Means of beta: [7.98053909]
Using 50 chains and 150 samples, the error is 2430, resulting in an accuracy of 51.4


In [13]:
runner_and_accuracy(100 , 150 , 10)

Number of chains: 100 , Number of samples = 150
Means of alpha: [-0.77506111  0.70022724 -0.12192929 -0.58529777  0.78206093]
Means of beta: [5.92588193]
Using 100 chains and 150 samples, the error is 2404, resulting in an accuracy of 51.92
