In [93]:
import numpy as np
import pandas as pd
import math
 
# check independence, get variance and correlation from scratch

# Input: two variables and their joint probablity table

# you can use this table as an example
distr_table = pd.DataFrame({
    'X': [0, 0, 1, 1],
    'Y': [1, 2, 1, 2],
    'pr': [0.3, 0.25, 0.15, 0.3]
})

class CheckIndependence:

    def __init__(self):
        self.version = 1

    def get_X_value(self, distr_table: pd.DataFrame):
        return distr_table['X'].unique()

    def get_Y_value(self, distr_table: pd.DataFrame):
        return distr_table['Y'].unique()

    def get_prob_X(self, distr_table: pd.DataFrame, x_val):
        return distr_table.loc[distr_table['X']==x_val,['pr']].sum(axis=0).values[0]

    def get_prob_Y(self, distr_table: pd.DataFrame, y_val):
        return distr_table.loc[distr_table['Y']==y_val,['pr']].sum(axis=0).values[0]

    def get_joint_prob(self, distr_table: pd.DataFrame, x_val, y_val):

        joint_x_y = distr_table.loc[(distr_table['X']==x_val) & (distr_table['Y']==y_val),['pr']].iloc[0][0]
        #print(" get_joint_prob joint_x_y: ", x_val, y_val, joint_x_y,)

        return joint_x_y

    def is_indepence(self, distr_table: pd.DataFrame):
        # check independence

        independence = True
        values_X = self.get_X_value(distr_table)
        values_Y = self.get_Y_value(distr_table)

        for x_val in values_X:
            prob_x = self.get_prob_X(distr_table, x_val)
            print("prob_x: ", prob_x)

            for y_val in values_Y:
                prob_y = self.get_prob_Y(distr_table, y_val)

                joint_x_y = self.get_joint_prob(distr_table, x_val, y_val)
                print("joint_x_y: ", x_val, y_val, joint_x_y, prob_x, prob_y)
                if joint_x_y != prob_x * prob_y:
                    return False

        return True


    def get_mean_x(self, distr_table: pd.DataFrame):
        # get E(X)
        mean_x = 0.0

        values_X = self.get_X_value(distr_table)
        for x_val in values_X:
            prob_x = self.get_prob_X(distr_table, x_val)
            
            mean_x += prob_x * x_val

        return mean_x

    
    def get_mean_y(self, distr_table: pd.DataFrame):
        # get E(Y)

        mean_y = 0.0

        values_Y = self.get_Y_value(distr_table)
        for y_val in values_Y:
            prob_y = self.get_prob_Y(distr_table, y_val)
            
            mean_y += prob_y * y_val

        return mean_y
    

    def get_x_std(self, distr_table):
        # get std(X)

        values_X = self.get_X_value(distr_table)

        mean_x = self.get_mean_x(distr_table)

        std_x = 0.0
        for x_val in values_X:
            diff_sq = (x_val - mean_x)**2
            prob_x = self.get_prob_X(distr_table, x_val)
            std_x += prob_x * diff_sq
        
        return math.sqrt(std_x)


    def get_y_std(self, distr_table):
        # get std(Y)

        values_Y = self.get_Y_value(distr_table)
        mean_y = self.get_mean_y(distr_table)


        std_y = 0.0
        for y_val in values_Y:
            diff_sq = (y_val - mean_y)**2
            prob_y = self.get_prob_Y(distr_table, y_val)
            std_y += prob_y * diff_sq
        
        return math.sqrt(std_y)


    def get_cov_XY(self, distr_table):
        # get the correlation cov(X, Y)

        values_X = self.get_X_value(distr_table)
        values_Y = self.get_Y_value(distr_table)

        mean_x = self.get_mean_x(distr_table)
        mean_y = self.get_mean_y(distr_table)

        cov_xy = 0.0
        for x_val in values_X:
            for y_val in values_Y:
                joint_x_y = self.get_joint_prob(distr_table, x_val, y_val)
                cov_xy += joint_x_y * (x_val - mean_x) * (y_val - mean_y)
        return cov_xy

        
    def get_correlation(self, distr_table):
        cov_xy = self.get_cov_XY(distr_table)

        std_x = self.get_x_std(distr_table)
        std_y = self.get_y_std(distr_table)

        return cov_xy / (std_x * std_y)


    def check_independence(self, distr_table: pd.DataFrame):
        # write your solution here

        prob = self.get_prob_X(distr_table, 0)
        #print("prob :", prob)
        indep = self.is_indepence(distr_table)
        print("indep :", indep)

        cov_xy = self.get_cov_XY(distr_table)

        corr = self.get_correlation(distr_table)

        print("prob: ", prob, cov_xy, corr)
        
        return {'are_independent': indep, 'cov': cov_xy, 'corr':corr}


CheckIndependenceObj = CheckIndependence()

CheckIndependenceObj.check_independence(distr_table)

prob_x:  0.55
joint_x_y:  0 1 0.3 0.55 0.44999999999999996
indep : False
prob:  0.55 0.0525 0.21212121212121213


{'are_independent': False, 'corr': 0.21212121212121213, 'cov': 0.0525}

In [94]:
# USE numpy inside
# you can use this table as an example
distr_table = pd.DataFrame({
    'X': [0, 0, 1, 1],
    'Y': [1, 2, 1, 2],
    'pr': [0.3, 0.25, 0.15, 0.3]
})

class CheckIndependence:

    def __init__(self):
        self.version = 1

    def get_X_values(self, distr_table: pd.DataFrame):
        return np.array(sorted(distr_table['X'].unique()))

    def get_Y_values(self, distr_table: pd.DataFrame):
        return np.array(sorted(distr_table['Y'].unique()))

    def get_one_prob_X(self, distr_table: pd.DataFrame, x_val):
        return distr_table.loc[distr_table['X']==x_val,['pr']].sum(axis=0).values[0]

    def get_one_prob_Y(self, distr_table: pd.DataFrame, y_val):
        return distr_table.loc[distr_table['Y']==y_val,['pr']].sum(axis=0).values[0]

    def get_all_corresponding_prob_X(self, distr_table: pd.DataFrame):
        x_probs = []

        for x in self.get_X_values(distr_table):
          prob = self.get_one_prob_X(distr_table, x)
          x_probs.append(prob)

        return np.array(x_probs)
        
    def get_all_corresponding_prob_Y(self, distr_table: pd.DataFrame):
        y_probs = []

        for y in self.get_Y_values(distr_table):
          prob = self.get_one_prob_Y(distr_table, y)
          y_probs.append(prob)

        return np.array(y_probs)

    def get_joint_prob(self, distr_table: pd.DataFrame, x_val, y_val):

        joint_x_y = distr_table.loc[(distr_table['X']==x_val) & (distr_table['Y']==y_val),['pr']].iloc[0][0]
        #print(" get_joint_prob joint_x_y: ", x_val, y_val, joint_x_y,)

        return joint_x_y

    def is_indepence(self, distr_table: pd.DataFrame):
        # check independence

        independence = True
        values_X = self.get_X_values(distr_table)
        values_Y = self.get_Y_values(distr_table)

        probs_X = self.get_all_corresponding_prob_X(distr_table)
        probs_Y = self.get_all_corresponding_prob_Y(distr_table)
        
        for i, x_val in enumerate(values_X):
            prob_x = probs_X[i]
            print("prob_x: ", prob_x)

            for j, y_val in enumerate(values_Y):
                prob_y = probs_Y[j]
                joint_x_y = self.get_joint_prob(distr_table, x_val, y_val)
                print("joint_x_y: ", x_val, y_val, joint_x_y, prob_x, prob_y)
                if joint_x_y != prob_x * prob_y:
                    return False

        return True


    def get_mean_X(self, distr_table: pd.DataFrame):
        # get E(X)
        values_X = self.get_X_values(distr_table)

        probs_X = self.get_all_corresponding_prob_X(distr_table)

        mean_X = np.sum(np.dot(values_X, probs_X))

        print("values_X, probs_X: ", values_X, probs_X, mean_X)

        return mean_X

    
    def get_mean_Y(self, distr_table: pd.DataFrame):
        # get E(Y)
        values_Y = self.get_Y_values(distr_table)
        probs_Y = self.get_all_corresponding_prob_Y(distr_table)
        mean_Y = np.sum(np.dot(values_Y, probs_Y))

        print("values_Y, probs_Y: ", values_Y, probs_Y, mean_Y)

        return mean_Y

    

    def get_std_X(self, distr_table):
        # get std(X)

        values_X = self.get_X_values(distr_table)

        mean_X = self.get_mean_X(distr_table)
        std_X = np.sqrt(np.sum((values_X - mean_X)**2))
        
        return std_X


    def get_std_Y(self, distr_table):
        # get std(Y)

        values_Y = self.get_Y_values(distr_table)

        mean_Y = self.get_mean_Y(distr_table)
        std_Y = np.sqrt(np.sum((values_Y - mean_Y)**2))
        
        return std_Y


    def get_cov_XY(self, distr_table):
        # get the correlation cov(X, Y)

        values_X = self.get_X_values(distr_table)
        values_Y = self.get_Y_values(distr_table)

        mean_X = self.get_mean_X(distr_table)
        mean_Y = self.get_mean_Y(distr_table)

        cov_XY = 0.0
        for x_val in values_X:
            for y_val in values_Y:
                joint_x_y = self.get_joint_prob(distr_table, x_val, y_val)
                cov_XY += joint_x_y * (x_val - mean_X) * (y_val - mean_Y)
        return cov_XY

        
    def get_correlation(self, distr_table):
        cov_XY = self.get_cov_XY(distr_table)

        std_X = self.get_std_X(distr_table)
        std_Y = self.get_std_Y(distr_table)

        return cov_XY / (std_X * std_Y)


    def check_independence(self, distr_table: pd.DataFrame):
        # write your solution here

        indep = self.is_indepence(distr_table)
        print("indep :", indep)

        cov_XY = self.get_cov_XY(distr_table)

        corr = self.get_correlation(distr_table)

        print("prob: ", cov_XY, corr)
        
        return {'are_independent': indep, 'cov': cov_XY, 'corr':corr}
        

CheckIndependenceObj = CheckIndependence()

CheckIndependenceObj.check_independence(distr_table)


prob_x:  0.55
joint_x_y:  0 1 0.3 0.55 0.44999999999999996
indep : False
values_X, probs_X:  [0 1] [0.55 0.45] 0.44999999999999996
values_Y, probs_Y:  [1 2] [0.45 0.55] 1.55
values_X, probs_X:  [0 1] [0.55 0.45] 0.44999999999999996
values_Y, probs_Y:  [1 2] [0.45 0.55] 1.55
values_X, probs_X:  [0 1] [0.55 0.45] 0.44999999999999996
values_Y, probs_Y:  [1 2] [0.45 0.55] 1.55
prob:  0.0525 0.10396039603960398


{'are_independent': False, 'corr': 0.10396039603960398, 'cov': 0.0525}