-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
174 lines (151 loc) · 7.09 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import random
import collections
# General code for representing a weighted CSP (Constraint Satisfaction Problem).
# All variables are being referenced by their index instead of their original
# names.
class CSP:
def __init__(self):
# Total number of variables in the CSP.
self.numVars = 0
# The list of variable names in the same order as they are added. A
# variable name can be any hashable objects, for example: int, str,
# or any tuple with hashtable objects.
self.variables = []
# Each key K in this dictionary is a variable name.
# values[K] is the list of domain values that variable K can take on.
self.values = {}
# Each entry is a unary factor table for the corresponding variable.
# The factor table corresponds to the weight distribution of a variable
# for all added unary factor functions. If there's no unary function for
# a variable K, there will be no entry for K in unaryFactors.
# E.g. if B \in ['a', 'b'] is a variable, and we added two
# unary factor functions f1, f2 for B,
# then unaryFactors[B]['a'] == f1('a') * f2('a')
self.unaryFactors = {}
# Each entry is a dictionary keyed by the name of the other variable
# involved. The value is a binary factor table, where each table
# stores the factor value for all possible combinations of
# the domains of the two variables for all added binary factor
# functions. The table is represented as a dictionary of dictionary.
#
# As an example, if we only have two variables
# A \in ['b', 'c'], B \in ['a', 'b']
# and we've added two binary functions f1(A,B) and f2(A,B) to the CSP,
# then binaryFactors[A][B]['b']['a'] == f1('b','a') * f2('b','a').
# binaryFactors[A][A] should return a key error since a variable
# shouldn't have a binary factor table with itself.
self.binaryFactors = {}
def add_variable(self, var, domain):
"""
Add a new variable to the CSP.
"""
if var in self.variables:
raise Exception("Variable name already exists: %s" % str(var))
self.numVars += 1
self.variables.append(var)
self.values[var] = domain
self.unaryFactors[var] = None
self.binaryFactors[var] = dict()
def get_neighbor_vars(self, var):
"""
Returns a list of variables which are neighbors of |var|.
"""
return self.binaryFactors[var].keys()
def add_unary_factor(self, var, factorFunc):
"""
Add a unary factor function for a variable. Its factor
value across the domain will be *merged* with any previously added
unary factor functions through elementwise multiplication.
How to get unary factor value given a variable |var| and
value |val|?
=> csp.unaryFactors[var][val]
"""
factor = {val:float(factorFunc(val)) for val in self.values[var]}
if self.unaryFactors[var] is not None:
assert len(self.unaryFactors[var]) == len(factor)
self.unaryFactors[var] = {val:self.unaryFactors[var][val] * \
factor[val] for val in factor}
else:
self.unaryFactors[var] = factor
def add_binary_factor(self, var1, var2, factor_func):
"""
Takes two variable names and a binary factor function
|factorFunc|, add to binaryFactors. If the two variables already
had binaryFactors added earlier, they will be *merged* through element
wise multiplication.
How to get binary factor value given a variable |var1| with value |val1|
and variable |var2| with value |val2|?
=> csp.binaryFactors[var1][var2][val1][val2]
"""
# never shall a binary factor be added over a single variable
try:
assert var1 != var2
except:
print '!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'
print '!! Tip: !!'
print '!! You are adding a binary factor over the same variable... !!'
print '!! Please check your code and avoid doing this. !!'
print '!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'
raise
self.update_binary_factor_table(var1, var2,
{val1: {val2: float(factor_func(val1, val2)) \
for val2 in self.values[var2]} for val1 in self.values[var1]})
self.update_binary_factor_table(var2, var1, \
{val2: {val1: float(factor_func(val1, val2)) \
for val1 in self.values[var1]} for val2 in self.values[var2]})
def update_binary_factor_table(self, var1, var2, table):
"""
Private method you can skip for 0c, might be useful for 1c though.
Update the binary factor table for binaryFactors[var1][var2].
If it exists, element-wise multiplications will be performed to merge
them together.
"""
if var2 not in self.binaryFactors[var1]:
self.binaryFactors[var1][var2] = table
else:
currentTable = self.binaryFactors[var1][var2]
for i in table:
for j in table[i]:
assert i in currentTable and j in currentTable[i]
currentTable[i][j] *= table[i][j]
def get_delta_weight(csp, assignment, var, val):
"""
Given a CSP, a partial assignment, and a proposed new value for a variable,
return the change of weights after assigning the variable with the proposed
value.
@param assignment: A dictionary of current assignment. Unassigned variables
do not have entries, while an assigned variable has the assigned value
as value in dictionary. e.g. if the domain of the variable A is [5,6],
and 6 was assigned to it, then assignment[A] == 6.
@param var: name of an unassigned variable.
@param val: the proposed value.
@return w: Change in weights as a result of the proposed assignment. This
will be used as a multiplier on the current weight.
"""
assert var not in assignment
w = 1.0
if csp.unaryFactors[var]:
w *= csp.unaryFactors[var][val]
if w == 0: return w
for var2, factor in csp.binaryFactors[var].iteritems():
if var2 not in assignment: continue # Not assigned yet
w *= factor[val][assignment[var2]]
if w == 0: return w
return w
def weightedRandomChoice(weightDict):
weights = []
elems = []
for elem in weightDict:
weights.append(weightDict[elem])
elems.append(elem)
total = sum(weights)
key = random.uniform(0, total)
runningTotal = 0.0
chosenIndex = None
for i in range(len(weights)):
weight = weights[i]
runningTotal += weight
if runningTotal > key:
chosenIndex = i
return elems[chosenIndex]
raise Exception('Should not reach here')