In [2]:
%matplotlib inline

import pandas as pd
import numpy as np

from pandas.api.types import CategoricalDtype

from collections import defaultdict, Counter

import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns

import datetime as dt
import matplotlib.dates as mdates

import scipy.stats as stats
import statsmodels.api as sm
import statsmodels.stats.api as sms
import statsmodels.formula.api as smf

import dataStatsAnalysis as dsa
import dataStatsPlotting as dsp

dsp.SetParams()

In [3]:
# Load the Power Test superclass
class PowerTest():
    """Power test superclass. 
    All child classes must provide PrepareData and ComputeRVandTestStat methods.
    """
    
    def __init__(self, data, alpha=0.05, alternative='two-sided', num_runs=1000):
        self.data = data
        self.alpha = alpha
        self.alternative = alternative
        self.num_runs = num_runs
        self.PrepareData()
    
    # Provide functionality to convert the data into format needed for use in BuildRv
    # Ex. Convert to array, split data into component groups, etc.
    # See child classes for examples
    def PrepareData(self):
        UnimplementedMethodException()
    
    # Provide functionality that creates the run data and then computes the run's test stat and rv
    # This involves doing one resample to simulate pulling an additional sample from the population,
    # then calculating the test_stat, building a sampling distribution, and computing the rv
    # See child classes for examples
    def ComputeRVandTestStat(self):
        UnimplementedMethodException()
    
    # Computes the pvalue of test stat from an rv,
    # and adds to pvalue_count if less than significance level
    def _RunPvalueCount(self):
        test_stat, rv = self.ComputeRVandTestStat() # pylint: disable=assignment-from-no-return
        
        p_value_right = 1 - rv.cdf(test_stat)
        p_value_left = rv.cdf(test_stat)
        
        # Two-sided test
        if self.alternative == 'two-sided':
            if (p_value_right < self.alpha/2) or (p_value_left < self.alpha/2):
                self.pvalue_count+= 1
        
        # One-sided test using the right side of the distribution
        elif self.alternative == 'right': 
            if p_value_right < self.alpha:
                self.pvalue_count += 1
        
        # One-sided test using the left side of the distribution
        elif self.alternative == 'left': 
            if p_value_left < self.alpha:
                self.pvalue_count += 1
        
        else:
            raise ValueError("alternative has to be 'two-sided', 'right', or 'left")
    
    # Method for computing power 
    def Power(self):
        self.pvalue_count = 0
        for _ in range(self.num_runs):
            self._RunPvalueCount()
            
        return self.pvalue_count / self.num_runs

In [4]:
# Load the chi square power test
class PTChiSquare(PowerTest):
    """Calculates the power of a chi square hypothesis test 
    using resampling of the expected sequence to simulate the null hypothesis 
    and build the null hypothesis sampling distribution. 
    Takes data in the form of two sequences: data = observed, expected
    """    
    def PrepareData(self):
        self.observed, self.expected = self.data
        self.observed = np.array(self.observed)
        self.expected = np.array(self.expected)
    
    def ComputeRVandTestStat(self):
        # Create run data (run_observed) by resampling the observed sequence (assuming the alternative hypothesis)
        n = sum(self.observed)
        values_obs = list(range(len(self.observed)))
        p_obs = self.observed/sum(self.observed)
        
        hist = Counter({x:0 for x in values_obs})
        hist.update(np.random.choice(values_obs, size=n, replace=True, p=p_obs))
        sorted_hist = sorted(hist.items())
        run_observed = np.array([x[1] for x in sorted_hist])
        
        # Calculate chi square test_stat for the run data
        test_stat = sum((run_observed - self.expected)**2 / self.expected)
        
        chis = []
        
        # Build a chi square sampling distribution for the run using the expected sequence (null hypothesis)
        for _ in range(100):
            n = sum(self.expected)
            values = list(range(len(self.expected)))
            p_exp = self.expected/sum(self.expected)
            
            hist = Counter({x:0 for x in values}) # Initialize a Counter with zero values
            hist.update(np.random.choice(values, size=n, replace=True, p=p_exp))
            sorted_hist = sorted(hist.items())
            model_observed = np.array([x[1] for x in sorted_hist])
            chi = sum((model_observed - self.expected)**2 / self.expected)
            chis.append(chi)
        
        rv = DiscreteRv(chis)
        
        return test_stat, rv

In [5]:
# Load the chi sqaure contingency resampling function
def ResampleChiSquareContingency(observed, iters=1000):
    """Generates a chisquared statistic sampling distribution 
    from a contingency table. 
    Can then make an rv of this distribution to plot cdf and  
    compute a p-value for the actual chi-squared statistic (eg. rv.cdf at actual statistic (test_chi)). 
    Can also use the 'min' and 'max' built-ins to find what the most extreme values are from the simluations.

    Args:
        observed (array-like): observed contingency table
        iters (int, optional): Number of iterations to run when building distribution. Defaults to 1000.

    Returns:
        test_chi: Original actual chi squared value
        chis (array): Sampling distribution for the null hypothesis obtained from resampling
    """
    # Put the data into array form
    observed = np.asarray(observed, dtype=np.float64)
    
    # Calculate the test chi square statistic and the expected array
    test_chi,_,_,expected = stats.chi2_contingency(observed)
    
    # Calculate variables to be used in resampling
    expected = np.asarray(expected, dtype=np.float64)
    expected_shape = expected.shape
    expected_ps = expected / np.sum(expected)
    values = np.array(list(range(len(expected.ravel())))) # Flatten the array and then reshape it later
    n= int(np.sum(expected))
      
    # Compute resampled expected values and compute chi square 
    # to build a sampling distribution that represents the null hypothesis
    chis=[]
    for _ in range(iters):
        hist = Counter({x:0 for x in values}) # Initiate an empty histogram to hold resampled values
        hist.update(np.random.choice(values, size=n, replace=True, p=expected_ps.ravel()))
        sorted_hist = sorted(hist.items())
        resampled_expected = np.array([x[1] for x in sorted_hist])
        resampled_expected_reshaped = resampled_expected.reshape(expected_shape) # Put back into original shape

        chi = stats.chi2_contingency(resampled_expected_reshaped)[0]
        chis.append(chi)

    return test_chi, np.array(chis)

In [6]:
# Create the chi square contingency power test
class PTChiSquareContingency(PowerTest):
    """Calculates the power of a chi square contingency table hypothesis test 
    using resampling of the expected sequence to simulate the null hypothesis 
    and build the null hypothesis sampling distribution. 
    Takes data in the form of a single observed contingency table (array-like)
    """    
    def PrepareData(self):
        self.observed = self.data
        self.observed = np.array(self.observed)
    
    def ComputeRVandTestStat(self):
        # Create run data (resampled_observed_reshaped) by resampling the observed data (assuming the alternative hypothesis)    
        observed_shape = self.observed.shape
        observed_ps = self.observed / np.sum(self.observed)
        values = np.array(list(range(len(self.observed.ravel())))) # Flatten the array and then reshape it later
        n= int(np.sum(self.observed))
        
        hist = Counter({x:0 for x in values}) # Initiate an empty histogram to hold resampled values
        hist.update(np.random.choice(values, size=n, replace=True, p=observed_ps.ravel()))
        sorted_hist = sorted(hist.items())
        resampled_observed = np.array([x[1] for x in sorted_hist])
        resampled_observed_reshaped = resampled_observed.reshape(observed_shape) # Put back into original shape
        
        # Calculate chi square test_stat and expected contingency table from the run data
        test_stat,_,_,expected = stats.chi2_contingency(resampled_observed_reshaped)
        
        chis = []
        
        # Build a chi square sampling distribution for the run using the expected sequence (null hypothesis)
        for _ in range(100):
            expected_shape = expected.shape
            expected_ps = expected / np.sum(expected)
            values = np.array(list(range(len(expected.ravel())))) # Flatten the array and then reshape it later
            n= int(np.sum(expected))
            
            hist = Counter({x:0 for x in values}) # Initiate an empty histogram to hold resampled values
            hist.update(np.random.choice(values, size=n, replace=True, p=expected_ps.ravel()))
            sorted_hist = sorted(hist.items())
            resampled_expected = np.array([x[1] for x in sorted_hist])
            resampled_expected_reshaped = resampled_expected.reshape(expected_shape) # Put back into original shape

            chi = stats.chi2_contingency(resampled_expected_reshaped)[0]
            chis.append(chi)
        
        rv = dsa.DiscreteRv(chis)
        
        return test_stat, rv

In [7]:
observed = [[16,24,8],
            [7,8,17]]

In [8]:
# It seems to work
# Now need to find a way to confirm the results are correct 
ptchicont = PTChiSquareContingency(observed)
ptchicont.Power()

0.849

In [9]:
results = ResampleChiSquareContingency(observed)
results

(12.043478260869566,
 array([3.72975138e-01, 1.74633877e+00, 4.89707886e+00, 2.20634791e-02,
        7.96586060e-02, 1.67708578e-01, 1.37560620e+00, 4.08888889e+00,
        2.12420452e+00, 2.14860067e-01, 6.04290816e+00, 1.28174494e+00,
        8.92857143e-02, 2.81200281e-01, 7.73507747e-01, 5.84781989e+00,
        7.56921702e-01, 6.39422751e-01, 1.77107447e-01, 2.96174620e+00,
        3.68528553e+00, 3.38681850e+00, 1.24070355e+00, 1.65046888e+00,
        2.13514022e+00, 8.00000000e-01, 1.11538658e-01, 3.06397306e+00,
        9.34729296e-01, 3.47089947e+00, 2.79241643e+00, 5.84238191e+00,
        7.42229199e+00, 1.06012378e+00, 1.96714458e+00, 8.47916894e-01,
        2.83957675e+00, 1.65208852e-01, 2.77591051e-01, 3.07692308e+00,
        2.25063939e+00, 1.41099198e+00, 7.34708774e+00, 3.07070707e+00,
        7.72677687e-01, 3.36992923e+00, 1.52732061e+00, 4.83514058e+00,
        2.55824544e-01, 1.92096690e-01, 4.76190476e-01, 3.60591789e-01,
        4.07592408e+00, 6.02519432e+00, 1.1

In [11]:
dsa.PValueFromEstimates(results[1], results[0])

0.0019999999999992246

In [12]:
# Found one example of computing power for a contingency table here:
# https://stats.idre.ucla.edu/stata/faq/how-can-i-compute-power-for-contingency-tables-in-stata/
data = [[35,35],
        [20,10]]

In [13]:
# Comes out fairly close (0.24~0.26), the result from the example above is 0.345
ptchicont2 = PTChiSquareContingency(data)
ptchicont2.Power()

0.245

In [14]:
# In the same example above the also compute power after multiplying the number of values in each cell by 2
data2 = [[70,70],
         [40,20]]

In [15]:
# Still close but lower (0.47~0.51), the result from above in the online example is 0.591
ptchicont3 = PTChiSquareContingency(data2)
ptchicont3.Power()

0.462

In [16]:
# And the same example multiplied by 4
data3 = [[140,140],
         [80,40]]

In [18]:
# Still close but lower (0.80~0.82), the result from above in the online example is 0.872
# Almost exactly the same if use a one-sided test
ptchicont4 = PTChiSquareContingency(data3, alternative='right')
ptchicont4.Power()

0.875