결합 확률 분포를 정의하고 이 확률에 기반하여 추론하는 방법을 이해하기 위한 실습. 여기 제공하는 코드는 GitHub aima-python의 코드를 기반으로 일부 수정한 것임.

## 확률 분포

In [1]:
import numpy as np

class ProbDist:
    """이산 확률 분포를 정의하는 클래스.
    생성자에 확률변수(random variable)를 정의함.
    이후에 확률변수 각 값에 대한 확률값을 지정함(딕셔너리 이용)."""

    def __init__(self, var_name='?', freq=None):
        """var_name: 확률변수. freq가 None이 아니라면, (확률변수의 값, 빈도) 쌍으로 구성된 딕셔너리여야 함.
        이후, 빈도를 바탕으로 합이 1이 되는 확률이 되도록 정규화함."""
        self.prob = {} # 실제 확률값은 여기에 저장됨
        self.var_name = var_name
        self.values = []
        if freq:
            for (v, p) in freq.items():
                self[v] = p
            self.normalize()

    def __getitem__(self, val):
        """확률변수의 값(val)이 주어지면, 확률 P(val) 리턴."""
        try:
            return self.prob[val]
        except KeyError:
            return 0

    def __setitem__(self, val, p):
        """확률변수의 값 val에 대한 확률 P(val) = p로 설정"""
        if val not in self.values:
            self.values.append(val)
        self.prob[val] = p

    def normalize(self):
        """모든 확률값의 합이 1이 되도록 정규화하여 정규화된 분포를 리턴함.
        확률값 합이 0이면, ZeroDivisionError가 발생됨."""
        total = sum(self.prob.values())
        if not np.isclose(total, 1.0):
            for val in self.prob:
                self.prob[val] /= total
        return self

    def show_approx(self, numfmt='{:.3g}'):
        """확률변수의 값에 따라 정렬하고 확률값은 반올림하여 리턴함."""
        return ', '.join([('{}: ' + numfmt).format(v, p) for (v, p) in sorted(self.prob.items())])

    def __repr__(self):
        return "P({})".format(self.var_name)

In [2]:
# 동전 던지기 확률 분포
p = ProbDist('Flip') # 확률변수(Flip)
p['H'], p['T'] = 0.25, 0.75 # 각 값에 대한 확률 설정; P(Flip=H)=0.25, P(Flip=T)=0.75
p['T']

0.75

In [3]:
# 확률변수 정의와 각 값에 대한 빈도를 생성자에 설정.
# 확률값이 되도록 총합이 1이 되도록 정규화됨.
p = ProbDist('X', freq={'low': 125, 'medium': 375, 'high': 500})
print(p.var_name)
print(p.values)
print((p['low'], p['medium'], p['high']))

X
['low', 'medium', 'high']
(0.125, 0.375, 0.5)


In [4]:
# 빈도를 생성자에 한꺼번에 설정하지 않고, 별도로 지정하는 경우 자동으로 정규화되지 않음.
# 이 경우에는 normalize() 함수를 호출하여 정규화 가능
p = ProbDist('Y')
p['Cat'] = 50
p['Dog'] = 114
p['Mice'] = 64
(p['Cat'], p['Dog'], p['Mice'])

(50, 114, 64)

In [5]:
p.normalize()
(p['Cat'], p['Dog'], p['Mice'])

(0.21929824561403508, 0.5, 0.2807017543859649)

In [6]:
p.show_approx()

'Cat: 0.219, Dog: 0.5, Mice: 0.281'

## 결합 확률 분포

In [7]:
from collections import defaultdict

class JointProbDist(ProbDist):
    """변수 집합에 대한 이산 확률 분포(결합 확률 분포) 클래스. ProbDist의 서브 클래스."""

    def __init__(self, variables):
        """variables: 대상 확률변수들."""
        self.prob = {}
        self.variables = variables
        self.vals = defaultdict(list) # k: [v1, v2, ...]

    def __getitem__(self, values):
        """주어진 변수 값들에 대한 결합 확률 리턴."""
        values = event_values(values, self.variables)
        return ProbDist.__getitem__(self, values)

    def __setitem__(self, values, p):
        """확률값 P(values) = p로 설정. values는 각 변수에 대해 하나의 값을 가진 튜플/딕셔너리.
        또한 각 변수에 대해 지금까지 관측된 각 변수의 값을 기록함."""
        values = event_values(values, self.variables)
        self.prob[values] = p
        for var, val in zip(self.variables, values):
            if val not in self.vals[var]:
                self.vals[var].append(val)

    def values(self, var):
        """변수에 대해 가능한 값의 집합을 리턴."""
        return self.vals[var]

    def __repr__(self):
        return "P({})".format(self.variables)

    
def event_values(event, variables):
    """이벤트(event)에 존재하는 변수들(variables)에 대한 값들의 튜플을 리턴함."""
    if isinstance(event, tuple) and len(event) == len(variables):
        return event
    else:
        return tuple([event[var] for var in variables])

In [8]:
event = {'A': 10, 'B': 9, 'C': 8}
variables = ['C', 'A']
event_values(event, variables)

(8, 10)

In [9]:
# P(X, Y)
# 확률변수 정의
variables = ['X', 'Y']
j = JointProbDist(variables)
j

P(['X', 'Y'])

In [19]:
# 각 변수 값에 대한 확률값 설정
j[1,1] = 0.2
j[0,1] = 0.5

(j[1,1], j[0,1])

(0.2, 0.5)

## 완전 결합 분포 기반 추론

In [11]:
# 완전 결합 확률 분포 정의
full_joint = JointProbDist(['Cavity', 'Toothache', 'Catch'])
full_joint[dict(Cavity=True, Toothache=True, Catch=True)] = 0.108
full_joint[dict(Cavity=True, Toothache=True, Catch=False)] = 0.012
full_joint[dict(Cavity=True, Toothache=False, Catch=True)] = 0.016
full_joint[dict(Cavity=True, Toothache=False, Catch=False)] = 0.064
full_joint[dict(Cavity=False, Toothache=True, Catch=True)] = 0.072
full_joint[dict(Cavity=False, Toothache=False, Catch=True)] = 0.144
full_joint[dict(Cavity=False, Toothache=True, Catch=False)] = 0.008
full_joint[dict(Cavity=False, Toothache=False, Catch=False)] = 0.576

In [12]:
def enumerate_joint(variables, e, P):
    """증거 e에 포함되지 않은 확률분포 P의 나머지 변수들이 주어졌을 때,
    증거 e와 일관된 P의 엔트리들의 합을 리턴함."""
    if not variables:
        return P[e]
    Y, rest = variables[0], variables[1:]
    return sum([enumerate_joint(rest, extend(e, Y, y), P) for y in P.values(Y)])


def extend(s, var, val):
    """딕셔너리 s를 복사하고 var에 값 val을 세팅하여 확장하여 그 복사본을 리턴함."""
    return {**s, var: val}

In [22]:
# P(Toothache=True)
evidence = dict(Toothache=True)
variables = ['Cavity', 'Catch'] # 증거에 포함되지 않은 변수들
ans1 = enumerate_joint(variables, evidence, full_joint)
ans1

0.19999999999999998

In [14]:
# P(Cavity=True and Toothache=True)
evidence = dict(Cavity=True, Toothache=True)
variables = ['Catch'] # 증거에 포함되지 않은 변수들
ans2 = enumerate_joint(variables, evidence, full_joint)
ans2

0.12

In [15]:
# P(Cavity=True | Toothache=True) = P(Cavity=True and Toothache=True) / P(Toothache=True)
ans2/ans1

0.6

In [16]:
def enumerate_joint_ask(X, e, P):
    """결합 확률 분포 P에서 {var:val} 관측들(증거들) e가 주어졌을 때,
    질의 변수 X의 값들에 대한 확률 분포를 리턴함."""
    assert X not in e, "질의 변수는 증거에 포함된 변수일 수 없음"
    Q = ProbDist(X)  # X에 대한 확률 분포, 초기에는 비어 있음.
    Y = [v for v in P.variables if v != X and v not in e]  # 미관측 변수
    for xi in P.values(X):
        Q[xi] = enumerate_joint(Y, extend(e, X, xi), P)
    return Q.normalize()

In [17]:
# P(Cavity | Toothache=True)
query_variable = 'Cavity'
evidence = dict(Toothache=True)
ans = enumerate_joint_ask(query_variable, evidence, full_joint)
(ans[True], ans[False])

(0.6, 0.39999999999999997)