Tests must start with 'test'

In [1]:
import numpy as np
import itertools

In [2]:
def setUpBayesianInference(eventAMeanSampleSpaceSize, eventBMeanSampleSpaceSize, numberOfObservedSamples, randomSeed, isPriorUniform=False):
    np.random.seed(randomSeed)
    aProbabilities = constructRandomPMF(eventAMeanSampleSpaceSize, 'a')
    bProbabilities = constructRandomPMF(eventBMeanSampleSpaceSize, 'b')
    jointProbabilities = getJointPMFOfTwoEvents(aProbabilities, bProbabilities)
    
    sampleSpaceANames = list(aProbabilities.keys())
    sampleSpaceBNames = list(bProbabilities.keys())
    
    aPrior = constructPrior(sampleSpaceANames,isPriorUniform)
    bPrior = constructPrior(sampleSpaceBNames,isPriorUniform)
    
    likelihood = generateLikelihood(numberOfObservedSamples, jointProbabilities)
    
    return({'jointProbabilities':jointProbabilities, 'priorOfA': aPrior, 'priorOfB': bPrior, 'likelihood': likelihood})


In [8]:
generativeDistribution, priorA, priorB, likelihood = setUpBayesianInference(5, 5, 100, 2).values()
print(generativeDistribution)
print(priorA)
print(priorB)
print(likelihood)

{('a0', 'b0'): 0.17570541041399762, ('a0', 'b1'): 0.14968292489841234, ('a0', 'b2'): 0.03806977382780534, ('a0', 'b3'): 0.14528021147121237, ('a1', 'b0'): 0.1696694184426647, ('a1', 'b1'): 0.1445408810034426, ('a1', 'b2'): 0.03676196635259063, ('a1', 'b3'): 0.14028941358987443}
{'a0': 0.1901883045926098, 'a1': 0.8098116954073902}
{'b0': 0.3754696289252865, 'b1': 0.21730244836408819, 'b2': 0.3722099802058252, 'b3': 0.03501794250480023}
{('a0', 'b0'): 0.16, ('a0', 'b1'): 0.13, ('a0', 'b2'): 0.05, ('a0', 'b3'): 0.21, ('a1', 'b0'): 0.2, ('a1', 'b1'): 0.11, ('a1', 'b2'): 0.03, ('a1', 'b3'): 0.11}


## Helper Functions

In [4]:
def normalizeDictionaryValues(unnormalizedDictionary):
    totalSum = sum(unnormalizedDictionary.values())
    normalizedDictionary = {originalKey: val/totalSum for originalKey, val in unnormalizedDictionary.items()}
    return(normalizedDictionary)

def constructRandomPMF(meanSampleSpaceSize, eventIDString):
    sampleSpaceSize = np.maximum(2, np.random.poisson(meanSampleSpaceSize))
    outcomeNames = [eventIDString+str(outcomeIndex) for outcomeIndex in range(sampleSpaceSize)]
    unnormalizedProbabilities = {outcome: np.random.random() for outcome in outcomeNames}
    eventPMF = normalizeDictionaryValues(unnormalizedProbabilities)
    return(eventPMF)

def getJointPMFOfTwoEvents(eventAPMF, eventBPMF):
    jointPMF = {(keya, keyb):eventAPMF[keya]*eventBPMF[keyb] for keya, keyb in itertools.product(eventAPMF.keys(), eventBPMF.keys())}
    return(jointPMF)

def generateLikelihood(numberSamples, jointDistribution):
    jointSampleSpaceSize = len(jointDistribution.items())
    observedData = np.random.choice(a=jointSampleSpaceSize, size = numberSamples, replace = True, p = list(jointDistribution.values()))
    likelihood = {key: list(observedData).count(index)/numberSamples for index, key in enumerate(jointDistribution.keys())}
    return(likelihood)

def constructPrior(namesOfAllOutcomes, isUniform = False):
    if isUniform:
        prior = {a: 1/len(namesOfAllOutcomes) for a in namesOfAllOutcomes}
    else:
        unnormalizedPrior = {outcome: np.random.random() for outcome in namesOfAllOutcomes}
        prior = normalizeDictionaryValues(unnormalizedPrior)
    return(prior)


In [13]:
def getPosterior(priorOfA, priorOfB, likelihood):
    normalizedPriorOfA = normalizeDictionaryValues(priorOfA)
    normalizedPriorOfB = normalizeDictionaryValues(priorOfB)
    
    unnormalizedJointPosterior = {(aEvent, bEvent): calculateOutcomePosterier(normalizedPriorOfA, normalizedPriorOfB, likelihood, aEvent, bEvent) \
                      for aEvent, bEvent in likelihood.keys()}
    jointPosterior = normalizeDictionaryValues(unnormalizedJointPosterior)
    
    
    marginalOfA = {keyOfA : getMarginalizedOutcomeProbability(keyOfA, jointPosterior) for keyOfA in normalizedPriorOfA.keys()}
    marginalOfB = {keyOfB : getMarginalizedOutcomeProbability(keyOfB, jointPosterior) for keyOfB in normalizedPriorOfB.keys()}
    
    return(jointPosterior)



def getMarginalizedOutcomeProbability(outcome, jointPMF):
    probabilityOfOutcome = sum([jointPMF[(keyOne, keyTwo)] if keyOne==outcome else 0 for (keyOne, keyTwo) in jointPMF.keys()])
    return(probabilityOfOutcome)

def calculateOutcomePosterier(priorOfA, priorOfB, likelihood, aEvent, bEvent):
    posterior = priorOfA[aEvent]*priorOfB[bEvent]*likelihood[(aEvent, bEvent)]
    return(posterior)

In [14]:
getPosterior(priorA, priorB, likelihood)

{('a0', 'b0'): 0.10016521177788656,
 ('a0', 'b1'): 0.04710099584037416,
 ('a0', 'b2'): 0.031029882829566164,
 ('a0', 'b3'): 0.01226117348995686,
 ('a1', 'b0'): 0.5331226869104922,
 ('a1', 'b1'): 0.1696991472030058,
 ('a1', 'b2'): 0.0792741553998165,
 ('a1', 'b3'): 0.027346746548901565}

## Examples

In [19]:
toyPMFofA = {'a0': .8, 'a1': .2}
toyPMFofB = {'b0': .25, 'b1': .25, 'b2': .25, 'b3': .25}

In [20]:
toyJointPMF = getJointPMFOfTwoEvents(toyPMFofA, toyPMFofB)
toyJointPMF

{('a0', 'b0'): 0.2,
 ('a0', 'b1'): 0.2,
 ('a0', 'b2'): 0.2,
 ('a0', 'b3'): 0.2,
 ('a1', 'b0'): 0.05,
 ('a1', 'b1'): 0.05,
 ('a1', 'b2'): 0.05,
 ('a1', 'b3'): 0.05}

In [21]:
toypriorofA = {'a0': .5, 'a1': .5}
toypriorofB = {'b0': .25, 'b1': .25, 'b2': .25, 'b3': .25}

In [22]:
getPosterior(toypriorofA, toypriorofB, toyJointPMF)

{('a0', 'b0'): 0.19999999999999996,
 ('a0', 'b1'): 0.19999999999999996,
 ('a0', 'b2'): 0.19999999999999996,
 ('a0', 'b3'): 0.19999999999999996,
 ('a1', 'b0'): 0.04999999999999999,
 ('a1', 'b1'): 0.04999999999999999,
 ('a1', 'b2'): 0.04999999999999999,
 ('a1', 'b3'): 0.04999999999999999}