In [169]:
import numpy as np
from pandas import rolling_sum
from matplotlib import pyplot as plt
from statsmodels.tsa.arima_model import ARIMA
from collections import defaultdict
from scipy.optimize import fmin_slsqp, fmin_cobyla
from scipy.stats import gamma, norm, chi2, ncx2
%matplotlib inline

In [718]:
class NormalNoiseModel(object):
    """
    The attacker strategy is a 1-d array.  The first tmax elements are his intensity
    along a specfic edge at time equal to index.  The order is determined by 
    self.positive_edges.
    
    """
    def __init__(self, tau, d, sigmamat, r,lrd,b1,b2,c1,c2,pi, **kwargs):
        self.tau = tau # Numer of Time periods
        self.d = d # Simple Likelihood Log Threshold
        self.sigmamat = sigmamat # Normal Distribution Parameters
        self.r = r # Reward Matrix
        self.b1 = b1
        self.b2 = b2
        self.c1 = c1
        self.c2 = c2
        self.pi = pi
        self.positive_edges = [(np.nonzero(self.sigmamat)[0][i],np.nonzero(self.sigmamat)[1][i])
                               for i in range(np.sum(sigmamat>0))]
        self.edges_into_node = self.get_node_edge_into()
        self.edges_outof_node = self.get_node_edge_out()
        self.sumlogsigma = np.sum([np.log(x) for x in np.nditer(sigmamat) if x >0])
        self.param_contribution = -2 * (d + 
                                        tau * len(self.positive_edges)/2.*np.log(2*np.pi)
                                        + self.tau*self.sumlogsigma )
        self.lrd = lrd # Likelihood Ratio Log Threshold
        
    def get_node_edge_out(self):
        outin = defaultdict(list)
        for e in self.positive_edges:
            outin[e[0]].append(e)
        return outin
    
    def get_node_edge_into(self):
        inout = defaultdict(list)
        for e in self.positive_edges:
            inout[e[1]].append(e)
        return inout
    
    def get_attacker_dict(self, attackerstrat):
        astratdict = {}
        for ix, elem in enumerate(self.positive_edges):
            astratdict[elem] = np.asarray(attackerstrat[ix*self.tau:self.tau*(ix+1)])
        return astratdict
    
    def general_constraint(self, x):
        astrat = self.get_attacker_dict(x)
        l1 = self.nonzero_constraint(x)
        #l2 = self.effort_constraint(astrat)
        l3 = self.traversal_constraint(astrat)
        #return np.array(list(l1) + list(l2) + list(l3))
        return list(l1) +list(l3)
    
    def nonzero_constraint(self, x):
        return x
    
    def effort_constraint(self, astrat):
        activity = np.asarray(astrat.values())
        totalact = np.sum(activity, axis=0)
        return -totalact + self.effort
    
    def traversal_constraint(self, astrat):
        allbalances = []
        for key, val in self.edges_outof_node.iteritems():
            if key != 0 : # No traversal for first node
                total_out_at_t = np.sum(np.asarray([astrat[e] for e in val]), axis=0)
                total_in_at_t = np.sum(np.asarray(
                        [astrat[e] for e in self.edges_into_node[key]]), axis=0)
                total_in_by_t = np.cumsum(np.hstack((np.array([0]), total_in_at_t)))[:-1]
                allbalances.append(total_in_by_t - total_out_at_t)
        return list(np.hstack(tuple(allbalances)))
    

    def prob_no_alarm_simple(self, astrat):
        """
        Given by P(chi2_nk(mu) < -2(ln(d) + nk/2 ln(2pi) + k \sum_J ln(\sigma_j)))
        
        where mu is given by \sum_IJ (aij/sigma_j)^2
        """
        muhat = 0
        for edge in self.positive_edges:
            muhat +=  np.sum(astrat[edge]**2) / float(self.sigmamat[edge])**2
        if muhat > 0:
            p_no_alarm = ncx2.cdf(self.param_contribution, self.tau * len(self.positive_edges), muhat)
        if muhat ==0:
            p_no_alarm = chi2.cdf(self.param_contribution, self.tau * len(self.positive_edges))
        return p_no_alarm
    
    def prob_no_alarm_lr(self, astrat, astratbelief):
        stdev = self.get_sd_from_belief(astratbelief)
        evalat = -2*self.lrd
        for key, val in astratbelief.iteritems():
            evalat += np.sum(astratbelief[key]**2 - 2*astratbelief[key]*astrat[key]) \
                / self.sigmamat[key]**2
        return  norm.cdf(evalat, loc=0, scale=stdev)
        
    def get_sd_from_belief(self, astratbelief):
        var = 0
        for key, val in astratbelief.iteritems():
            var += np.sum(4*astratbelief[key]**2) / float(self.sigmamat[key]**2)
        return var**.5

    
    def attacker_expected_utility_simple(self, x):
        astrat = self.get_attacker_dict(x)
        u = 0
        pnoalarm = self.prob_no_alarm_simple(astrat)
        reward = 0
        for edge in self.positive_edges:
            reward += self.r[edge] * np.sum(astrat[edge])
        return - pnoalarm*reward
        
    def attacker_expected_utility_lr(self, x, astratbelief=None):
        astrat = self.get_attacker_dict(x)
        u = 0
        pnoalarm = self.prob_no_alarm_lr(astrat, astratbelief)
        reward = 0
        for edge in self.positive_edges:
            reward += self.r[edge] * np.sum(astrat[edge])
        return - pnoalarm*reward
    
    def defender_expected_utility_simple(d, astrat)
        

    def attacker_strat_from_res(self, res):
        astar = {}
        for ix, edge in enumerate(self.positive_edges):
            astar[edge] = np.round(res[ix*self.tau: (ix + 1)*self.tau], decimals=3)
        return astar
    
    def x_from_astrat(self,astrat):
        x=np.array([])
        for edge in self.positive_edges:
            x=np.hstack((x, astrat[edge]))
        return list(x)

In [719]:
parameters = { 'tau': 5,
                'd': -50.,
                'lrd': 0.,
                'sigmamat' : np.array([[0,1.,0], [1.,0,1.],[1.,1.,0]]),
                'r' : np.array([[0,0,0], [0,0,1],[0,0,0]]),
                'b1': 1,
                'b2': 1,
                'c1': 1,
                'c2': 1,
                'pi' : .1,
             }
Model1 = NormalNoiseModel(**parameters)

In [499]:
Model1.prob_no_alarm_lr(astratres, astratres)

1.0387815325489515e-06

In [500]:
x = np.zeros(shape=25)+.01
res = fmin_slsqp(Model1.expected_utility, x, f_ieqcons = Model1.general_constraint)
astratres = Model1.attacker_strat_from_res(res)

Optimization terminated successfully.    (Exit mode 0)
            Current function value: -6.58527595213
            Iterations: 9
            Function evaluations: 245
            Gradient evaluations: 9


# Tests 

We want to test the "probability of an alarm" functions both when an attacker is present and
when he is not under both the simple likelihood and the likelihood ratio.  We will test the calculations (and the math) by checking to see if the closed form solutions are "sufficiently close" to the solution obtained via Monte Carlo simulation.

In [701]:
    testparams = { 'tau': 5,
                'd': -50,
                'sigmamat' : np.array([[0,1.,0], [2.,0,2.],[1.,1.,0]]),
                'r' : np.array([[0,0,0], [0,0,1],[0,0,0]]),
                'lrd':-.1,
             }

In [578]:
def test_p_no_alarm_simple_no_attacker(paramtest, mcsamps=10000):
    TestModel = NormalNoiseModel(**paramtest)
    astrat = TestModel.attacker_strat_from_res(np.zeros(paramtest['tau']*len(TestModel.positive_edges)))
    p_no_alarm_exact = TestModel.prob_no_alarm_simple(astrat)
    noalarm = 0
    sigmas = (paramtest['sigmamat'][paramtest['sigmamat']>0])
    for s in xrange(mcsamps):
        data = np.random.normal(size=(5,5), scale=sigmas)
        lhood = np.sum(np.log(norm.pdf(data, scale=sigmas)))
        if lhood>paramtest['d']:
            noalarm +=1
    return p_no_alarm_exact, noalarm / float(mcsamps)

In [579]:
test_p_no_alarm_simple_no_attacker(testparams)

(0.99934597956146143, 0.99912)

In [424]:
def test_p_no_alarm_simple_random_attacker(paramtest, mcsamps=10000):
    TestModel = NormalNoiseModel(**paramtest)
    stratarray = np.random.random(paramtest['tau']*len(TestModel.positive_edges))
    astrat = TestModel.attacker_strat_from_res(stratarray)
    p_no_alarm_exact = TestModel.prob_no_alarm_simple(astrat)
    noalarm = 0
    sigmas = []
    stratmat = []
    for i in xrange(paramtest['sigmamat'].shape[0]):
        for j in xrange(paramtest['sigmamat'].shape[0]):
            if paramtest['sigmamat'][(i,j)] > 0:
                sigmas.append(paramtest['sigmamat'][(i,j)])
                stratmat.append(astrat[(i,j)])
    stratmat = np.asarray(stratmat).T # Scale is for columns
    for s in xrange(mcsamps):
        data = np.random.normal(size=(5,5), scale=sigmas) + stratmat
        lhood = np.sum(np.log(norm.pdf(data, scale=sigmas)))
        if lhood>paramtest['d']:
            noalarm +=1
    return p_no_alarm_exact, noalarm / float(mcsamps)

In [425]:
test_p_no_alarm_simple_random_attacker(testparams)

(0.65590116545019628, 0.655744)

In [664]:
def test_p_no_alarm_lr_no_attacker(paramtest, mcsamps=1000000):
    TestModel = NormalNoiseModel(**paramtest)
    stratarraybelief = np.random.random(paramtest['tau']*len(TestModel.positive_edges)) * .05
    astratbelief = TestModel.attacker_strat_from_res(stratarraybelief)
    stratarray = np.zeros(paramtest['tau']*len(TestModel.positive_edges))
    astrat = TestModel.attacker_strat_from_res(stratarray)
    p_no_alarm_exact_lr = TestModel.prob_no_alarm_lr(astrat, astratbelief)
    noalarm = 0
    sigmas = []
    stratmat = []
    stratmatbelief = []
    for i in xrange(paramtest['sigmamat'].shape[0]):
        for j in xrange(paramtest['sigmamat'].shape[0]):
            if paramtest['sigmamat'][(i,j)] > 0:
                sigmas.append(paramtest['sigmamat'][(i,j)])
                stratmat.append(astrat[(i,j)])
                stratmatbelief.append(astratbelief[(i,j)])
    stratmat = np.asarray(stratmat).T # Scale is for columns
    stratmatbelief = np.asarray(stratmatbelief).T
    for s in xrange(mcsamps):
        data = np.random.normal(size=(5,5), scale=sigmas)
        lhoodnum = np.sum(np.log(norm.pdf(data+ stratmat, scale=sigmas)))
        lhooddenom = np.sum(np.log(norm.pdf(data-stratmatbelief + stratmat, scale=sigmas)))
        lr = lhoodnum - lhooddenom
        if lr>paramtest['lrd']:
            noalarm +=1
    return p_no_alarm_exact_lr, noalarm / float(mcsamps)

In [714]:
def test_p_no_alarm_lr_correct_belief(paramtest, mcsamps=10000):
    TestModel = NormalNoiseModel(**paramtest)
    stratarraybelief = np.random.random(paramtest['tau']*len(TestModel.positive_edges)) * .05
    astratbelief = TestModel.attacker_strat_from_res(stratarraybelief)
    stratarray = stratarraybelief
    astrat = astratbelief
    p_no_alarm_exact_lr = TestModel.prob_no_alarm_lr(astrat, astratbelief)
    noalarm = 0
    sigmas = []
    stratmat = []
    stratmatbelief = []
    for i in xrange(paramtest['sigmamat'].shape[0]):
        for j in xrange(paramtest['sigmamat'].shape[0]):
            if paramtest['sigmamat'][(i,j)] > 0:
                sigmas.append(paramtest['sigmamat'][(i,j)])
                stratmat.append(astrat[(i,j)])
                stratmatbelief.append(astratbelief[(i,j)])
    stratmat = np.asarray(stratmat).T # Scale is for columns
    stratmatbelief = np.asarray(stratmatbelief).T
    for s in xrange(mcsamps):
        data = np.random.normal(size=(5,5), scale=sigmas)
        lhoodnum = np.sum(np.log(norm.pdf(data+ stratmat, scale=sigmas)))
        lhooddenom = np.sum(np.log(norm.pdf(data-stratmatbelief + stratmat, scale=sigmas)))
        lr = lhoodnum - lhooddenom
        if lr>paramtest['lrd']:
            noalarm +=1
    return p_no_alarm_exact_lr, noalarm / float(mcsamps)

In [715]:
test_p_no_alarm_lr_correct_belief(testparams)

(0.73912439764243332, 0.7309)

In [716]:
def test_p_no_alarm_lr_incorrect_belief(paramtest, mcsamps=10000):
    TestModel = NormalNoiseModel(**paramtest)
    stratarraybelief = np.random.random(paramtest['tau']*len(TestModel.positive_edges)) * .05
    astratbelief = TestModel.attacker_strat_from_res(stratarraybelief)
    stratarray = np.random.random(paramtest['tau']*len(TestModel.positive_edges)) * .05
    astrat = TestModel.attacker_strat_from_res(stratarray)
    p_no_alarm_exact_lr = TestModel.prob_no_alarm_lr(astrat, astratbelief)
    noalarm = 0
    sigmas = []
    stratmat = []
    stratmatbelief = []
    for i in xrange(paramtest['sigmamat'].shape[0]):
        for j in xrange(paramtest['sigmamat'].shape[0]):
            if paramtest['sigmamat'][(i,j)] > 0:
                sigmas.append(paramtest['sigmamat'][(i,j)])
                stratmat.append(astrat[(i,j)])
                stratmatbelief.append(astratbelief[(i,j)])
    stratmat = np.asarray(stratmat).T # Scale is for columns
    stratmatbelief = np.asarray(stratmatbelief).T
    for s in xrange(mcsamps):
        data = np.random.normal(size=(5,5), scale=sigmas)
        lhoodnum = np.sum(np.log(norm.pdf(data+ stratmat, scale=sigmas)))
        lhooddenom = np.sum(np.log(norm.pdf(data-stratmatbelief + stratmat, scale=sigmas)))
        lr = lhoodnum - lhooddenom
        if lr>paramtest['lrd']:
            noalarm +=1
    return p_no_alarm_exact_lr, noalarm / float(mcsamps)

In [717]:
test_p_no_alarm_lr_incorrect_belief(testparams)

(0.78107721744980274, 0.7835)