Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
1962 lines (1758 sloc) 76.7 KB
#!/usr/bin/python
import os, re, math, gc
import cPickle as pickle
from bz2 import BZ2File
from gzip import GzipFile
import wx
from Simulation import *
from Strategies import *
import PrisonersDilemma as PD
from PopulationDynamics import Dynamics
from PyPlotter import Gfx, Simplex, Graph, wxGfx, psGfx
import Logging
import gtk
from gtk import gdk
from PyPlotter import gtkGfx
try:
import psyco
psyco.profile()
psyco.cannotcompile(re.compile)
psyco.cannotcompile(re.sub)
psyco.bind(PD.ReiteratedPD)
psyco.bind(PD.GenPayoffMatrix)
psyco.bind(RichPDDeme)
psyco.bind(RichSuperDeme)
psyco.bind(Simulation.dynamics)
psyco.bind(Simulation.extendedDynamics)
psyco.bind(Graph.Cartesian)
psyco.bind(Gfx.nilDriver)
except ImportError:
pass
BZ2_EXTENSION = ".bz2"
GZIP_EXTENSION = ".gz"
#ZIPFile = BZ2File
#ZIP_EXTENSION = BZ2_EXTENSION
ZIPFile = GzipFile
ZIP_EXTENSION = GZIP_EXTENSION
SAVE_IMAGE_WIDTH, SAVE_IMAGE_HEIGHT = 1280, 960
HTML_IMAGE_WIDTH, HTML_IMAGE_HEIGHT = 640, 400
PROGRESS_DIALOG_OFF = True
NUM_SAMPLES = 10
MAX_GENERATIONS = 20000
AUTOMATA_SERIES_SIZE = 500
REDO_IMAGES = True
REDO_AUTOMATA_ONLY = True
REDO_PAGES = False
REPICKLE_RESULTS = False
REPLAY = True
FIX_AUTOMATA_HEADINGS = True
FRAME_SET = """
<html>
<head>
<title>CoopSim - Simulation Series</title>
</head>
<frameset rows="160,*">
<frame src="$NAV" name="navigation" />
<frame src="$CONT" name="content" />
</frameset>
"""
FRAME_SCRIPT = """
<script type="text/javascript">
function switchnav(nav) {
parent["navigation"].location.href = nav;
}
</script>
"""
def genericName(strgySet):
"""Produces a generic name for a strategy set. (Basically a
concatenation of the strategy names."""
l = [re.sub(" |\(|\)|\.","_",str(s)) for s in strgySet]
return "x".join(l)
def genStrategySubsets(strgySet, minSize, maxSize, N=-1):
"""Generates N strategy subsets from strgySet with sizes varying
between minSize and maxSize. (If N < 0, the generator never
stops.) Does not return the same set twice, DANGER OF INFINITE LOOP
due to poor programming!"""
generated = set()
while N != 0:
num = random.randint(minSize, maxSize)
pool = copy.copy(strgySet); selection = []
while num > 0:
s = random.choice(pool)
pool.remove(s)
selection.append(s)
num -= 1
name = genericName(selection)
if not name in generated:
generated.add(name)
yield selection
if N > 0: N -= 1
def extList(l, item):
"""Add 'item' to list 'l', return index of 'item'"""
l.append(item)
return len(l)-1
def genAutomataDegenerators(automata, p):
"""Create mutators for the given automata. Depending on whether the
automat is more cooperative than not it either degenerates to 'Dove' or
to 'Hawk' with the given probability of p.
"""
assert p >= 0.0 and p <= 1.0, "Mutation probability p=%1.3f out "+\
"of range [0.0, 1.0] !"%p
if p == 0.0: return[]
l = []; dove, hawk = -1, -1
for i in xrange(len(automata)):
if automata[i].progString == "DDDDD": dove = i
if automata[i].progString == "HHHHH": hawk = i
## assert dove >= 0, "automata list does not contain 'Dove'!"
## assert hawk >= 0, "automata list does not contain 'Hawk'!"
if dove < 0: dove = extList(automata, TwoStateAutomaton("DDDDD"))
if hawk < 0: hawk = extList(automata, TwoStateAutomaton("HHHHH"))
for i in xrange(len(automata)):
a = automata[i]; c = 0
for ch in a.progString:
if ch == "D": c += 1
if c >= 3: d = Mutator(i, dove, p)
else: d = Mutator(i, hawk, p)
l.append(d)
return l
# generator functions for parameterized TFTs
def genParameterizedTFTs(grFrom, grTo, grStep,
erFrom, erTo, erStep):
"""Generate a list of parameterized TitFotTat strategies.
The gr* and er* parameters define the range for the
good- and evilrates (see class ParameterizedTFT for an exmplanation).
"""
l = []
goodrate = grFrom
while goodrate <= grTo:
evilrate = erFrom
while evilrate <= erTo:
l.append(ParameterizedTFT(goodrate, evilrate))
evilrate += erStep
goodrate += grStep
return l
## manyTFTs = genParameterizedTFTs(0.0, 1.0, 0.25, 0.0, 1.0, 0.25)
def genTFTDegenerators(tfts, p):
"""Generate a list of mutators for parameterized TFT strategies.
TFTs are degenerated by rounding their good- and evilrates to 0 or 1.
"""
def xround(f):
"""Round f, but if f = 0.5 round f randomly to 1 or 0."""
if f == 0.5: return round(random.random())
else: return round(f)
assert p >= 0.0 and p <= 1.0, "Mutation probability p=%1.3f out "+\
"of range [0.0, 1.0] !"%p
if p == 0.0: return[]
l = []; d = {}; dove, hawk, tft, inverted = -1, -1, -1, -1
for i in xrange(len(tfts)):
if tfts[i].goodrate == 1.0 and tfts[i].evilrate == 0.0: dove = i
if tfts[i].goodrate == 0.0 and tfts[i].evilrate == 1.0: hawk = i
if tfts[i].goodrate == 0.0 and tfts[i].evilrate == 0.0: tft = i
if tfts[i].goodrate == 1.0 and tfts[i].evilrate == 1.0: inverted = i
## assert dove >= 0, "TFT list does not contain 'dove'"
## assert hawk >= 0, "TFT list does not contain 'hawk'"
## assert tft >= 0, "TFT list does not contain 'tft'"
## assert inverted >= 0, "TFT list does not contain 'inverted TFT'"
if dove < 0: dove = extList(tfts, ParameterizedTFT(1.0, 0.0))
if hawk < 0: hawk = extList(tfts, ParameterizedTFT(0.0, 1.0))
if tft < 0: tft = extList(tfts, ParameterizedTFT(0.0, 0.0))
if inverted < 0: inverted = extList(tfts, ParameterizedTFT(1.0, 1.0))
d[(1., 0.)] = dove; d[(0., 1.)] = hawk
d[(0., 0.)] = tft; d[(1., 1.)] = inverted
for i in xrange(len(tfts)):
gr = xround(tfts[i].goodrate); er = xround(tfts[i].evilrate)
l.append(Mutator(i, d[(gr,er)], p))
return l
twostateautomata = genAllAutomata()
manyTFTs = genParameterizedTFTs(0.0, 1.0, 0.2, 0.0, 1.0, 0.2)
strategy_mix = (Tester(), TitForTat(), Pavlov(), Grim(), Random(),
Hawk(), Tranquillizer(), Dove(), Joss())
def strategyColor(s):
"""Returns a color (r,g,b) that indicates the degree of
altruims of the strategy "s"."""
if isinstance(s, ParameterizedTFT):
return (s.evilrate, s.goodrate, 0.35)
elif isinstance(s, TwoStateAutomaton):
g = sum([1.0 for ch in s.progString if ch == "D"]) / 5.0
return (1.0-g, g, 0.0)
else:
if isinstance(s, Dove): return (0.0, 1.0, 0.0)
if isinstance(s, Hawk): return (1.0, 0.0, 0.0)
if isinstance(s, TitForTat): return (0.0, 0.0, 1.0)
if isinstance(s, Grim): return (0.5, 0.0, 1.0)
if isinstance(s, Pavlov): return (0.6, 0.0, 0.5)
if isinstance(s, Tester): return (0.8, 0.0, 0.3)
if isinstance(s, Random): return (0.5, 0.5, 0.5)
if isinstance(s, Tranquillizer): return (0.9, 0.0, 0.2)
if isinstance(s, Joss): return (0.7, 0.0, 0.3)
def altruismIndex_crude(s):
"""Returns one of the three values 0,1,2. Where 0 means that
a strategy tries to exploit other strategies (like Hawk, Tester etc.),
1 means that it is cooperative (Grim, TitForTat) and 2 means that it
is altruistic (Dove, TitForTwoTats)."""
if isinstance(s, ParameterizedTFT):
if s.goodrate - s.evilrate >= 0.5: return 2
elif s.goodrate - s.evilrate < 0.0: return 0
else: return 1
elif isinstance(s, TwoStateAutomaton):
g = sum([1 for ch in s.progString if ch == "D"])
if g >= 4: return 2
elif g < 2: return 0
elif (g == 3) and s.progString[1] == "D" \
and s.progString[3] == "D": return 1
else:
if s.progString[:2] == "DD": return 1
else: return 0
else:
if isinstance(s, Dove): return 2
if isinstance(s, Hawk): return 0
if isinstance(s, TitForTat): return 1
if isinstance(s, Grim): return 1
if isinstance(s, Pavlov): return 0
if isinstance(s, Tester): return 0
if isinstance(s, Random): return 0
if isinstance(s, Tranquillizer): return 0
if isinstance(s, Joss): return 0
def discreteColors(s):
"""Returns one of three colors that indicate the degree of
altruism of a strategy. Green means that the strategy is
very altruistic, blue means that it is cooperative and
red means that it is exploitive."""
i = altruismIndex_crude(s)
if i == 2: return (0.0, 1.0, 0.0)
elif i == 1: return (0.0, 0.0, 1.0)
else: return (1.0, 0.0, 0.0)
########################################################################
#
# Simulation series
#
########################################################################
# The following generator functions act as filters on the
# paramter values defined in the SeriesDescriptor records.
# For example, the 'BoundariesOperator' yields the lowest
# and highest value in the list.
def BoundariesOperator(valuelist):
"""Picks the lowest and highest value from the list. If
the list is not a list of floats or ints, the left most
and the right most value."""
assert valuelist != [], "'valuelist' must not be empty!"
if len(valuelist) > 1:
if isinstance(valuelist[0], float) or \
isinstance(valuelist[0], int):
yield min(valuelist)
yield max(valuelist)
else:
yield valuelist[0]
yield valuelist[-1]
else: yield valuelist[0]
def LeftRightOperator(valuelist):
"""Picks the highest left most and the right most value
from the list. (If the value list is an ordered list of numbers
this function yields the same results as the boundaries
operator.)
"""
assert valuelist != [], "'valuelist' must not be empty!"
yield valuelist[0]
if len(valuelist) > 1:
yield valuelist[-1]
def LeftOperator(valuelist):
"""Picks only the leftmost value from the list. This may
be usefull """
assert valuelist != [], "'valuelist' must not be empty!"
yield valuelist[0]
def MonteCarloOperator(valuelist):
"""Picks a random value within the left most and right most value
of the list if the list contains floating point numbers. Otherwise
a random value from the list is picked."""
assert valuelist != [], "'valuelist' must not be empty!"
vl_type = "other"
if isinstance(valuelist[0], float): vl_type = "float"
elif isinstance(valuelist[0], int): vl_type = "int"
if vl_type == "float" or vl_type == "int":
v_min = min(valuelist)
v_max = max(valuelist)
if vl_type == "float": yield random.uniform(v_min, v_max)
elif vl_type == "int": yield random.randint(v_min, v_max)
else: yield random.choice(valuelist)
def DiscreteMCOperator(valuelist):
"""Randomly picks a value from the valuelist."""
assert valuelist != [], "'valuelist' must not be empty!"
yield random.choice(valuelist)
def NullOperator(valuelist):
"""Delivers the values of the list one by one."""
for v in valuelist: yield v
def MetaOperator(operator, valuelist, entryType = str):
"""Delviers all entries from valuelist if its entries are of type
entryType, otherwise apply the operator `operator' to the valuelist.
"""
if isinstance(valuelist[0], entryType):
for v in valuelist: yield v
else:
for v in operator(valuelist): yield v
class SeriesDescriptor(object):
"""object that describes the characteristics of a simulation series
by keeping a list of different values for every parameter of the
simulation.
"""
def __init__(self):
self.name = "SeriesDescriptor"
self.strategySets = tuple() # tuple of strings (i.e. names)
self.strategyLists = {}
self.correlation = tuple()
self.gameNoise = tuple()
self.noise = tuple()
self.payoff = tuple()
self.demes = tuple() # (demes, minSize, maxSize, interval)
self.mutators = tuple() # the probabilities of the degs.
self.mutatorDicts = {} # dictionary of mutator generating functions
self.nameTemplate = "%s_C%1.3f_G%1.3f_N%1.3f_P%1.1f_%1.1f_%1.1f_%1.1f_M%i_%i_%i_%i_D%1.3f"
self.samples = NUM_SAMPLES
self.state = None # state of the generator
def getParameterList(self, operator):
"""-> parameter tuples in one ordered list."""
return (tuple([s for s in operator(self.strategySets)]),
tuple([c for c in operator(self.correlation)]),
tuple([g for g in operator(self.gameNoise)]),
tuple([n for n in operator(self.noise)]),
tuple([p for p in operator(self.payoff)]),
tuple([d for d in operator(self.demes)]),
tuple([d for d in operator(self.mutators)]))
def getParameterNames(self):
"""-> (ordered) list of parameter names."""
return ("strategy set", "correlation", "game noise",
"evolutionary noise", "payoff", "deme structure", "mutation rate")
def numberOfParameters(self):
"""-> number of parameters."""
return 6
def parameterVariations(self, operator = NullOperator):
"""->tuple that contains the number of variations for
each variable parameter.
"""
if operator == NullOperator:
return (len(self.strategySets), len(self.correlation),
len(self.gameNoise), len(self.noise),
len(self.payoff), len(self.demes), len(self.mutators))
else:
pl = self.getParameterList(operator)
pv = [len(p) for p in pl]
return tuple(pv)
def numberOfVariableParameters(self, operator = NullOperator):
"""-> number of variable parameters."""
pv = self.parameterVariations(operator)
return sum([pv[i] > 1 for i in range(len(pv))])
def numberOfSimulations(self, operator = NullOperator):
"""-> the number of single simulations in the series, if no
filters are applied."""
pv = self.parameterVariations(operator)
return reduce(lambda a,b: a*b, pv, 1)
def deriveStandardSetup(self, strgys = ""):
"""-> SimSetup object. Picks the first values as default values."""
if strgys == "": sgl = self.strategyLists[self.strategySets[0]]
else: sgl = self.strategyLists[strgys]
deg = self.mutators[0]
deg_list = self.mutatorDicts[self.strategySets[0]](sgl,deg)
setup = SimSetup(name = self.__class__.__name__+"_reference_setup",
strategyList = sgl,
correlation = self.correlation[0],
gameNoise = self.gameNoise[0],
noise = self.noise[0],
iterations = 200,
samples = self.samples,
payoff = self.payoff[0],
demes = self.demes[0],
mutators = deg_list)
return setup
def deriveSetup(self, **kw):
setup = self.deriveStandardSetup()
wrongKeys = list(set(kw.keys())-set(setup.__dict__.keys()))
if wrongKeys != []: raise KeyError, wrongKeys
setup.__dict__.update(kw)
ns = len(setup.strategyList)
if len(setup.population) != ns:
setup.population = Dynamics.UniformDistribution(ns)
## for d in setup.mutators:
## if d.original >= ns or d.mutated >= ns:
## setup.mutators = []
## break
return setup
def _getMutators(self, s, sgl, d):
"""Returns the mutator list for strategy list 'sgl', named 's' and
a mutation frequency of 'd'.
"""
if d > 0.0: return self.mutatorDicts[s](sgl, d)
else: return []
def generate(self, operator = NullOperator, repeat = 1):
"""Generate simulation setups from the series descriptor with
'operator' acting as filter (or modifier respectively).
"""
for i in xrange(repeat):
for s in operator(self.strategySets):
for c in operator(self.correlation):
for g in operator(self.gameNoise):
for n in operator(self.noise):
for p in operator(self.payoff):
for m in operator(self.demes):
for d in operator(self.mutators):
if m: x = m
else: x = (1,0,0,0)
self.state = (s,c,g,n,p[0],p[1],p[2],p[3],
x[0],x[1],x[2],x[3],d)
nameStr = self.nameTemplate % self.state
sgl = self.strategyLists[s]
deg_list = self._getMutators(s, sgl,d)
setup = SimSetup(name = nameStr,
strategyList = sgl,
correlation = c,
gameNoise = g,
noise = n,
iterations = 200,
samples = self.samples,
payoff = p,
demes = m,
mutators = deg_list)
yield setup
def genLimitedSeries(self, operator = NullOperator, repeat = 1):
"""Generate simulation setups by varying only one parameter
at a time while the other parameters stick to the standard
value (which is assumed to be the first in the list)."""
def genName():
if m: x = m
else: x = (1,0,0,0)
self.state = (s,c,g,n,p[0],p[1],p[2],p[3],x[0],x[1],x[2],x[3],d)
return self.nameTemplate % self.state
for i in xrange(repeat):
s = self.strategySets[0]; c = self.correlation[0]
g = self.gameNoise[0]; n = self.noise[0]
p = self.payoff[0]; m = self.demes[0]
d = self.mutators[0]
for s in operator(self.strategySets):
sgl = self.strategyLists[s]
deg_list = self._getMutators(s, sgl,d)
std = self.deriveStandardSetup(s)
for c in operator(self.correlation):
setup = self.deriveSetup(name = genName(), strategyList = sgl,
correlation = c, mutators = deg_list)
if setup != std: yield setup
c = self.correlation[0]
for g in operator(self.gameNoise):
setup = self.deriveSetup(name = genName(), strategyList = sgl,
gameNoise = g, mutators = deg_list)
if setup != std: yield setup
g = self.gameNoise[0]
for n in operator(self.noise):
setup = self.deriveSetup(name = genName(), strategyList = sgl,
noise = n, mutators = deg_list)
if setup != std: yield setup
n = self.noise[0]
for p in operator(self.payoff):
setup = self.deriveSetup(name = genName(), strategyList = sgl,
payoff = p, mutators = deg_list)
if setup != std: yield setup
p = self.payoff[0]
for m in operator(self.demes):
setup = self.deriveSetup(name = genName(), strategyList = sgl,
demes = m, mutators = deg_list)
if setup != std: yield setup
m = self.demes[0]
for d in operator(self.mutators):
deg_list = self._getMutators(s, sgl,d)
setup = self.deriveSetup(name = genName(), strategyList = sgl,
mutators = deg_list)
if setup != std: yield setup
d = self.mutators[0]
deg_list = self._getMutators(s, sgl,d)
def elimDoublettes(generator, emergencyExit=1000):
"""Eliminates any doulbe setups from the stream of setups
generated by 'generator'. If no new setup has appeared after
a number of 'emergencyExit' setups in sequence, the iterator
stops. A negative number for 'emergencyExit' disables this
features.
(To save memory and increase speed only the names of the
setups are compared and two setups are treated as equal
if they have the same name.)
"""
previous = set(); exitCounter = 0
for s in generator:
if not (s.name in previous):
previous.add(s.name)
exitCounter = 0
yield s
else:
if exitCounter == emergencyExit: break
exitCounter += 1
def emptyMutatorGenerator(sl, d):
return []
class TestSeries(SeriesDescriptor):
"""A simple test series.
"""
def __init__(self):
SeriesDescriptor.__init__(self)
self.name = "TestSeries"
self.strategySets = ("SET_A", "SET_B", "SET_C")
self.strategyLists = { "SET_A": [TitForTat(), Random(), Tester()],
"SET_B": [Grim(), Dove(), Tester()],
"SET_C": [Hawk(), Random(), TitForTat()]}
self.correlation = (0.0, 0.1, 0.2)
self.gameNoise = (0.0, 0.05)
self.noise = (0.0,)
self.payoff = ((5., 3., 1., 0.),)
self.demes = (None, (10,1,2,2))
self.mutators = (0.0,)
for key in self.strategySets:
self.mutatorDicts[key] = emptyMutatorGenerator #lambda sl, d: []
class BigSimulationSeries(SeriesDescriptor): # refined
"""This simulation series checks two different strategy sets
under a variety of different conditions: correlation,
in game noise, evolutionary noise, different (but fixed) payoffs,
degenerating mutations of varying strength.
"""
def __init__(self):
SeriesDescriptor.__init__(self)
self.name = "BigSimulationSeriesRefined"
self.strategySets = ("Automata", "TFTs")
self.strategyLists = { "Automata": twostateautomata,
"TFTs": manyTFTs }
self.correlation = (0.0, 0.1, 0.2)
self.gameNoise = (0.0, 0.05, 0.10)
self.noise = (0.0, 0.05, 0.10, 0.15)
self.payoff = ((5., 3., 1., 0.), (3.5, 3., 1., 0.),
(5.5, 3., 1., 0.), (5., 3., 2., 0.))
self.demes = (None,)
self.mutators = (0.0, 0.01, 0.05)
self.mutatorDicts["Automata"] = genAutomataDegenerators
self.mutatorDicts["TFTs"] = genTFTDegenerators
class AlternativeSeries(SeriesDescriptor):
"""Simulation series with three different strategy sets and
no degenration."""
def __init__(self):
SeriesDescriptor.__init__(self)
self.name = "AlternativeSeries"
self.strategySets = ("Automata", "TFTs", "Small")
self.strategyLists = { "Automata": twostateautomata,
"TFTs": manyTFTs,
"Small": strategy_mix }
self.correlation = (0.0, 0.1, 0.2)
self.gameNoise = (0.0, 0.05, 0.10)
self.noise = (0.0, 0.08, 0.16)
self.payoff = ((5., 3., 1., 0.), (3.5, 3., 1., 0.),
(5.5, 3., 1., 0.), (5., 3., 2., 0.))
self.demes = (None,)
self.mutators = (0.0,)
for key in self.strategySets:
self.mutatorDicts[key] = emptyMutatorGenerator #lambda sl, d: []
class StdSimulation(SeriesDescriptor):
"""A Simulation series that contains just the default values with
the set of two state automata as default set.
"""
def __init__(self):
SeriesDescriptor.__init__(self)
self.name = "StdSimulation"
self.strategySets = ("Automata",)
self.strategyListe = { "Automata": twostateautomata }
self.correlation = (0.0,)
self.gameNoise = (0.0,)
self.noise = (0.0, )
self.payoff = ((5., 3., 1., 0.),)
self.mutators = (0.0, )
self.mutatorDicts["Automata"] = emptyMutatorGenerator
class SubsetSeries(SeriesDescriptor):
"""A series descriptor for a series of subsets of
a set of strategey.
"""
def __init__(self, name, strgys, N, a, b):
"""Build N subsets of list 'strgys' with sizes rangeing
between a and b."""
SeriesDescriptor.__init__(self)
self.name = name+str(N)+"-"+str(a)+"-"+str(b)
digits = int(math.log10(N))+1
sname = "Subset"+"0"*digits
l = [sname]; cnt = 1
self.strategyLists[sname] = strgys
for s in genStrategySubsets(strgys, a, b, N):
sname = (("Subset%"+str(digits)+"i") % cnt).replace(" ","0")
l.append(sname); self.strategyLists[sname] = s; cnt += 1
self.strategySets = tuple(l)
def generate(self, operator = NullOperator):
return SeriesDescriptor.generate(self,
lambda vl: MetaOperator(operator, vl, str))
def genLimitedSeries(self, operator = NullOperator):
return SeriesDescriptor.genLimitedSeries(self,
lambda vl: MetaOperator(operator, vl, str))
class AutomataSubsets(SubsetSeries):
"""A series of simulations with subsets of the set of
two state automata.
"""
def __init__(self):
SubsetSeries.__init__(self, "Automata", twostateautomata,
AUTOMATA_SERIES_SIZE, 3, 3)
self.correlation = (0.0, 0.1, 0.2)
self.gameNoise = (0.0, 0.05, 0.10)
self.noise = (0.0, 0.05, 0.10, 0.15)
self.payoff = ((5., 3., 1., 0.), (3.5, 3., 1., 0.),
(5.5, 3., 1., 0.), (5., 3., 2., 0.))
self.mutators = (0.0,)
class TestAutomata(SubsetSeries):
def __init__(self):
SubsetSeries.__init__(self, "TestAutomata", twostateautomata,
10, 3, 3)
self.correlation = (0.0, 0.1)
self.gameNoise = (0.0, 0.05)
self.noise = (0.0,)
self.payoff = ((5., 3., 1., 0.),)
self.mutators = (0.0,)
##bigsim_descriptor = BigSimulationSeries()
##seriesA = [stp for stp in bigsim_descriptor.genLimitedSeries()]
######################################################################
#
# classes for generating and storing statistical information
# on the simulation series
#
######################################################################
class StrategySetStatistics(object):
"""Records the statistics for one strategy set over a series of
simulations.
"""
def __init__(self, strategyList):
self.strategyDict = dict([(str(s),s) for s in strategyList])
l = len(strategyList)
keys = [str(s) for s in strategyList]
self.strategies = keys
self.tournament_rankings = {}
self.evolutionary_rankings = {}
self.population_average = {}
for key in keys:
self.tournament_rankings[key] = [0]*l
self.evolutionary_rankings[key] = [0]*l
self.population_average[key] = 0.0
self.participation = dict.fromkeys(keys, 0)
self.tournaments = 0
self.evolutionarySims = 0
def _rank(self, strategies, ranking, points):
"""Record the ranking to the dictionary 'ranking' accorging
to the gained 'points'."""
rk = [(strategies[i], points[i]) \
for i in xrange(len(strategies))]
rk.sort(key = lambda item: item[1], reverse = True)
for i in xrange(len(rk)):
ranking[rk[i][0]][i] += 1
def recordSim(self, sim):
"""Record the results of another simulation."""
strategies = [str(s) for s in sim.setup.strategyList]
l = len(strategies)
points = [sum(sim.payoffMatrix[i])/float(l) for i in xrange(l)]
self._rank(strategies, self.tournament_rankings, points)
self.tournaments += 1
self._rank(strategies, self.evolutionary_rankings,
sim.setup.population)
self.evolutionarySims += 1
for i in xrange(len(self.strategies)):
s = self.strategies[i]
self.population_average[s] += sim.setup.population[i]
for s in self.strategies: self.participation[s] += 1
def _relativeRk(self, ranking):
relRk = {}
for key in self.strategies:
div = float(self.participation[key])
relRk[key] = [rk*100 / div for rk in ranking[key]]
return relRk
def _writeTable(self, f, ranking):
"""Write ranking as table to file f."""
TAB = "\t"
f.write("Strategy" + " "*(30 - len("Strategy")) + TAB)
for i in xrange(len(self.strategies)): f.write(str(i+1)+TAB)
f.write("\n\n")
for s in self.strategies:
f.write(s + " " * max(0, 30 - len(s)) + TAB)
for x in ranking[s]:
f.write(str(x) + TAB)
f.write("\n")
f.write("\n\n")
def _writeSimpleTable(self, f, ranking):
"""Write population to file."""
TAB = "\t"
for s in self.strategies:
f.write(s + " " * max(0, 30 - len(s)) + TAB)
f.write(str(ranking[s] / float(self.participation[s])))
f.write("\n")
f.write("\n\n")
def _lexical_order(self, ranking):
"""Returns a list of (strategy, rating) pairs, ranked in
lexical order."""
def cmp(x,y):
for k in range(len(self.strategies)):
if ranking[x][k] > ranking[y][k]: return 1
elif ranking[x][k] < ranking[y][k]: return -1
return 0
s = self.strategies[:]
s.sort(cmp, reverse = True)
return zip(s, range(1,len(s)+1))
def _weighted_order(self, ranking):
"""Returns a list of (stratgey, rating) pairs ranked in a
weighted order, where the first rank counts twice as much
as the second, and the second twice as much as the third..."""
rating = {}
for x in self.strategies:
rating[x] = sum([ranking[x][k]/float(2**k) \
for k in range(len(self.strategies))])
def cmp(x,y):
Sx = rating[x]
Sy = rating[y]
if Sx > Sy: return 1
elif Sx < Sy: return -1
return 0
s = self.strategies[:]
s.sort(cmp, reverse = True)
return zip(s, [rating[x] for x in s])
def _popav_order(self):
"""Returns a list of (strategy, population share) pairs ordered
by the population share."""
def comp(x,y):
Px = self.population_average[x] / self.participation[x]
Py = self.population_average[y] / self.participation[y]
if Px > Py: return 1
elif Px < Py: return -1
return 0
s = self.strategies[:]
s.sort(comp, reverse = True)
return zip(s, [self.population_average[x] / self.participation[x] \
for x in s])
def _writeOrder(self, f, order):
"""Write list of (strategy, rating) pairs to file."""
TAB = "\t"
for s, r in order:
f.write(s + " " * max(0, 30 - len(s)) + TAB)
f.write(str(r))
f.write("\n")
f.write("\n\n")
def _checkParticipations(self):
for s in self.strategies:
if self.participation[s] == 0: self.participation[s] = -1
def createCharts(self, fName, colorScheme, suffix = ""):
def write(gfx, text, region,
font=Gfx.SERIF, size=Gfx.NORMAL, weight=Gfx.PLAIN):
gfx.setColor(Gfx.BLACK)
gfx.setFont(font, size, weight)
x1, y1, x2, y2 = Graph.screenRegion(gfx, region)
w,h = gfx.getTextSize(text)
x = x1 + (x2-x1)/2 - w/2
y = y1 + (y2-y1)/2 - h/2
gfx.writeStr(x, y, text)
def drawChart(gfx, region, order, prop=False):
x1, y1, x2, y2 = Graph.screenRegion(gfx, region)
x,y = x1,y1
w = x2-x1+1
h = y2-y1+1
strategies = [item[0] for item in order]
if prop:
ratings = [item[1] for item in order]
vH = sum(ratings)
else:
ratings = [1.0] * len(order)
vH = float(len(ratings))
if vH == 0.0: vH = 1.0
dY = 0.0; bl = []
ft,fs,fw = gfx.fontType, gfx.fontSize, gfx.fontWeight
gfx.setFont(Gfx.SANS, Gfx.SMALL, "")
for i in range(len(strategies)):
s = strategies[i]; r = ratings[i]
dH = h * r / vH
gfx.setColor(colorScheme(self.strategyDict[s]))
bl.append(int(y+h-dY-0.00000001))
base = max(y,int(y+h-0.00000001-dY-dH))
gfx.fillRect(x, base, w, max(1,bl[-1]-base+1))
tw, th = gfx.getTextSize(s)
if th < int(dH)-3: # should be int(dH)-4
#while tw >= w-10 and len(s) > 0:
# s = s[:-1]
# tw, th = gfx.getTextSize(s)
gfx.setColor(Gfx.BLACK)
gfx.writeStr(x+5, int(y+h-dY-th)-3, s)
gfx.setColor(Gfx.WHITE)
gfx.writeStr(x+4, int(y+h-dY-th)-2, s)
dY += dH
gfx.setFont(ft, fs, fw)
bl.append(int(y));
bl = [by for by in bl if by > y]+[y] # <- just a little patch
gfx.setColor(Gfx.BLACK)
for by in bl: gfx.drawLine(x, by, x+w-1, by)
gfx.drawRect(x, y, w, h)
return bl
self._checkParticipations()
w, h = SAVE_IMAGE_WIDTH, SAVE_IMAGE_HEIGHT
buf = wx.EmptyBitmap(w, h)
gdList = [None, None]
gdList[0] = wxGfx.Driver(wx.BufferedDC(None, buf))
gdList[1] = psGfx.Driver()
screenR = (0.0, 0.0, 1.0, 1.0)
titleR = Graph.relativeRegion((0.1, 0.95, 0.9, 1.00), screenR)
chartsR = Graph.relativeRegion((0.05, 0.00, 0.95, 0.90), screenR)
chartA = Graph.relativeRegion((0.1, 0.0, 0.3, 1.0), chartsR)
chartB = Graph.relativeRegion((0.4, 0.0, 0.6, 1.0), chartsR)
chartC = Graph.relativeRegion((0.7, 0.0, 0.9, 1.0), chartsR)
rel_cap = (0.0, 0.03, 1.0, 0.13)
rel_ch = (0.05, 0.15, 0.95, 1.0)
capA = Graph.relativeRegion(rel_cap, chartA)
graphA = Graph.relativeRegion(rel_ch, chartA)
capB = Graph.relativeRegion(rel_cap, chartB)
graphB = Graph.relativeRegion(rel_ch, chartB)
capC = Graph.relativeRegion(rel_cap, chartC)
graphC = Graph.relativeRegion(rel_ch, chartC)
for gd in gdList:
gd.clear()
write(gd, 'Results for strategy set: "'+fName+'" ', titleR,
size = Gfx.LARGE)
drawChart(gd, graphA, self._lexical_order(self.tournament_rankings))
write(gd, "Tournament Ranking", capA)
## drawChart(gd, chartB1, self._lexical_order(self.evolutionary_rankings))
## drawChart(gd, chartB2, self._weighted_order(self.evolutionary_rankings))
# drawChart(gd, chartB, self._lexical_order(self.evolutionary_rankings)
## write(gd, "Evolutionary Ranking", capB)
pop_order = self._popav_order()
bl1 = drawChart(gd, graphB, pop_order, False)
write(gd, "Evolutionary Ranking", capB)
bl2 = drawChart(gd, graphC, pop_order, True)
x1 = Graph.screenRegion(gd, graphB)[2]
x2 = Graph.screenRegion(gd, graphC)[0]
for i in xrange(len(bl2)):
gd.drawLine(x1, bl1[i], x2, bl2[i])
write(gd, "Average Final Population", capC)
image = wx.ImageFromBitmap(buf)
image.SaveFile(fName+suffix+".png", wx.BITMAP_TYPE_PNG)
image.Destroy()
gdList[1].save(fName+suffix+".eps")
def writeTables(self, fName):
"""Write the ranking information as TAB seperated tables
to an ASCII text file.
"""
self._checkParticipations()
f = file(fName + ".txt", "w")
f.write("Tournament Rankings\n\n")
self._writeTable(f, self._relativeRk(self.tournament_rankings))
f.write("Evolutionary Rankings\n\n")
self._writeTable(f, self._relativeRk(self.evolutionary_rankings))
f.write("Average Final Population\n\n")
self._writeSimpleTable(f, self.population_average)
f.write("Tournament ranking (lexical order)\n\n")
self._writeOrder(f, self._lexical_order(self.tournament_rankings))
f.write("Tournament ranking (weighted order)\n\n")
self._writeOrder(f, self._weighted_order(self.tournament_rankings))
f.write("Evolutionary ranking (lexical order)\n\n")
self._writeOrder(f, self._lexical_order(self.evolutionary_rankings))
f.write("Evolutionary ranking (weighted order)\n\n")
self._writeOrder(f, self._weighted_order(self.evolutionary_rankings))
f.write("Ranking by average population shares\n\n")
self._writeOrder(f, self._popav_order())
self.createCharts(fName, strategyColor)
self.createCharts(fName, discreteColors, suffix="_scheme2")
f.close()
f = ZIPFile(fName + ".pickle"+ZIP_EXTENSION, "w")
pickle.dump(self.tournament_rankings, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.evolutionary_rankings, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.population_average, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.strategyDict, f, protocol=pickle.HIGHEST_PROTOCOL)
f.close()
f = file("recover_me.pickle", "w")
pickle.dump(self, f, protocol=pickle.HIGHEST_PROTOCOL)
f.close()
## def readTable(self, fName):
## """Unpickle the rankings."""
## if fName.endswith(BZ2_EXTENSION): f = BZ2File(fName, "r")
## elif fName.endswith(GZIP_EXTENSION): f = GzipFile(fName, "r")
## else: f = file(fName)
## try:
## self.tournament_rankings = pickle.load(f)
## self.evolutionary_rankings = pickle.load(f)
## f.close()
class StatisticsInterface(object):
"""Interface for recording and storing the statistical
information of a simulation series.
"""
def __init__(self, series, sfilter="lambda x:True", name=""):
self.series = series
self.filterString = sfilter
self.sfilter = eval(self.filterString)
if name == "": self.name = "Statistics"
else: self.name = "Statistics_"+name
def __getstate__(self):
d = copy.copy(self.__dict__)
d["sfilter"] = None
return d
def __setstate__(self, d):
self.__dict__.update(d)
print "Rebuilding: ", self.name, self.filterString
self.sfilter = eval(self.filterString)
def record(self, sim):
"""Record the information of the (finished) simulation
'sim' for statistical analysis."""
raise NotImplementedError
def write(self):
"""Save the results of the statistical analysis to the
hard disk."""
raise NotImplementedError
class SeriesStatistics(StatisticsInterface):
"""This class records statistical information for a simulation
series. The information can be written into a text file for
postprocessing with a spreadsheet calculator. Statistical information
is collected separately for each strategy set.
"""
def __init__(self, series, sfilter="lambda x:True", name=""):
StatisticsInterface.__init__(self, series, sfilter, name)
self.stgySetStats = {}; self.crossref = {}
for s in series.strategySets:
sl = series.strategyLists[s]
key = self._genKey(sl)
self.crossref[key] = s
self.stgySetStats[key] = StrategySetStatistics(sl)
def _genKey(self, strategyList):
"""Generate a dictionary key for identifying a strategy list.
"""
return " ".join([str(s) for s in strategyList])
def record(self, sim):
## if self.filterString.find("mutators") >= 0:
## print sim.setup.mutators
if self.sfilter(sim):
stats = self.stgySetStats[self._genKey(sim.setup.strategyList)]
stats.recordSim(sim)
def write(self):
print "Writing Statistics: "+self.name
try:
os.mkdir(self.name)
except OSError, errobj: # catch dir exists error
if errobj.errno != 17: raise OSError, errobj
os.chdir(self.name)
for key, stats in self.stgySetStats.iteritems():
stats.writeTables(self.crossref[key])
os.chdir("../")
class SubsetSeriesStatistics(StatisticsInterface):
"""This class records statistical information for a simulation
series. The information can be written into a text file for
postprocessing with a spreadsheet calculator. It is assumed that
all strategy sets are Statistical information
is collected collectively for each strategy set.
"""
def __init__(self, series, sfilter="lambda x:True", name=""):
StatisticsInterface.__init__(self, series, sfilter, name)
superset = set(); sl = []
for sset in series.strategySets:
for s in series.strategyLists[sset]:
if s not in superset:
sl.append(s)
superset.add(s)
self.stats = StrategySetStatistics(sl)
def record(self,sim):
if self.sfilter(sim):
self.stats.record(sim)
def write(self):
print "Writing Statistics: "+self.name
self.stats.writeTables(self.name)
def CreateStatisticsObject(series, sfilter="lambda x:True", name=""):
"""Returns a suitable Statistics Interface object."""
print "Creating: ", name
if isinstance(series, SubsetSeries):
return SubsetSeriesStatistics(series, sfilter, name)
else: return SeriesStatistics(series, sfilter, name)
def SpawnStatisticsObjects(series):
"""Returns a list of Statistics objects. The first obejct in the
list records the statistics for the whole simulation series.
All following objects record statistics if one parameter
value (correlation, gameNoise, noise, payoff, demes, mutators)
is fixed."""
l = [CreateStatisticsObject(series)]
if len(series.correlation) > 1:
for c in series.correlation:
sf = "lambda s:s.setup.correlation == %s"%repr(c)
l.append(CreateStatisticsObject(series, sf, name="C%1.3f"%c))
if len(series.gameNoise) > 1:
for g in series.gameNoise:
sf = "lambda s:s.setup.gameNoise == %s"%repr(g)
l.append(CreateStatisticsObject(series, sf, name="G%1.3f"%g))
if len(series.noise) > 1:
for n in series.noise:
sf = "lambda s:s.setup.noise == %s"%repr(n)
l.append(CreateStatisticsObject(series, sf, name="N%1.3f"%n))
if len(series.payoff) > 1:
for p in series.payoff:
sf = "lambda s:s.setup.payoff == %s"%repr(p)
l.append(CreateStatisticsObject(series, sf,
name="P%1.2f_%1.2f_%1.2f_%1.2f"%p))
if len(series.demes) > 1:
for d in series.demes:
sf = "lambda s:(s.setup.demes and s.setup.demes.asTuple() == %s) or (s.setup.demes==%s)"%(repr(d),repr(d))
if d == None:
l.append(CreateStatisticsObject(series, sf, name="M0"))
else: l.append(CreateStatisticsObject(series, sf,
name="M%i_%i_%i_%i"%d))
if len(series.mutators) > 1:
for m in series.mutators:
sf = "lambda s:(len(s.setup.mutators) > 0 and s.setup.mutators[0].rate == %s) or (len(s.setup.mutators)==0 and %s == 0.0)"%(repr(m),repr(m))
l.append(CreateStatisticsObject(series, sf, name="D%1.3f"%m))
return l
######################################################################
#
# functions for running simulation series
#
######################################################################
def checkEquilibrium(record):
"""Checks if the evolutionary process has reached an equilibrium
or quasi equilibrium (when no significant changes in the
population shares occur any more.) 'record' is a list of the
population shares. If an equilibrium is reached, 'True' is returned,
otherwise 'False'.
WRANING: The algorithm used is only very rough,
it is inteded to stop automoated simulations 'when nothing is
happening any more', but it is certainly no proof for the existence
of an equilibrium.
"""
rlen = len(record); N = len(record[0])
if rlen < 50: return False
check = [record[i] for i in xrange(rlen*4/5, rlen, rlen/50)]
for i in xrange(N):
values = [check[v][i] for v in xrange(len(check))]
a, b = min(values), max(values)
diff = b-a
if b > 1.0 / (100.0 * N) and diff > b/500.0: return False
return True
def pickleSimulationResults(fName, sim, record):
"""Writes a list of the strategy names, the payoff matrix
and the record of the evolutionary development to a file.
"""
try:
os.mkdir("Pickles")
except OSError, errobj: # catch dir exists error
if errobj.errno != 17: raise OSError, errobj
os.chdir("Pickles")
if fName[-7:] != ".pickle": fName += ".pickle"
strategies = [str(s) for s in sim.setup.strategyList]
f = ZIPFile(fName + ZIP_EXTENSION, "w")
pickle.dump(strategies, f, protocol = pickle.HIGHEST_PROTOCOL)
pickle.dump(sim.payoffMatrix, f, protocol = pickle.HIGHEST_PROTOCOL)
pickle.dump(record, f, protocol = pickle.HIGHEST_PROTOCOL)
f.close()
os.chdir("../")
dirCache = {}
def matchName(fName):
global dirCache
key = os.getcwd()
if dirCache.has_key(key):
dl = dirCache[key]
else:
dl = [d for d in os.listdir(key) if d.find("nav") < 0]
dl.sort()
dirCache[key] = dl
# print "---", dl[int(fName[0:6])-1]
return dl[int(fName[0:6])-1]
# see dirty hack below
def genTrueNames():
automata = genAllAutomata()
return [str(a) for a in automata]
true_names = genTrueNames()
# end dirty hack
def unpickleSimulationResults(fName):
"""->(strategyNames, payoffMatrix, record)"""
os.chdir("Pickles")
if not fName.endswith(".pickle"): fName += ".pickle"
try:
f = file(fName, "r")
except IOError:
try:
f = ZIPFile(fName + ZIP_EXTENSION, "r")
except IOError:
fName = matchName(fName)
f = BZ2File(fName, "r")
print "Unpickeling: "+fName
strategies = pickle.load(f)
# begin dirty hack
for k in xrange(len(strategies)):
for name in true_names:
if strategies[k][:9] == name[:9]:
strategies[k] = name
break
# end of dirty hack
PM = pickle.load(f)
record = pickle.load(f)
f.close()
if REPICKLE_RESULTS:
print "Repickeling: "+fName
f = ZIPFile(fName+ZIP_EXTENSION, "w")
pickle.dump(strategies, f, protocol = pickle.HIGHEST_PROTOCOL)
pickle.dump(PM, f, protocol = pickle.HIGHEST_PROTOCOL)
pickle.dump(record, f, protocol = pickle.HIGHEST_PROTOCOL)
f.close()
os.chdir("../")
return (strategies, PM, record)
class SimInterface(object):
"""Wrapper class for a simulation object, including
servies such as saving of HTML logs etc.
"""
def newSetup(self, setup, maxGenerations = MAX_GENERATIONS):
"""Initialize a new simulation."""
raise NotImplementedError
def getSimulationObject(self):
"""-> Simulation.Simulation object"""
raise NotImplementedError
def continueSim(self):
"""Continue the simulation. Returns False if an equilibrium
or the maximum number of generations has been reached."""
raise NotImplementedError
def writeResults(self):
"""Save the results of the simulation as an html file."""
raise NotImplementedError
def log(self, str):
"""Output logging information."""
raise NotImplementedError
def dumpLog(self):
"""Write the log to disk."""
raise NotImplementedError
class Notifier(Logging.LogNotificationInterface):
def __init__(self):
global MainWindow
self.progressDialog = MainWindow
self.title = ""
def updateLog(self, logStr):
pass
def logToStart(self):
pass
def statusBarHint(self, hint):
# print hint+"\r",
if hint != "Running..." and hint != "Ready.": print hint
def progressIndicator(self, f, title = "Please wait...",
message = "Determining payoff table..."):
# print message + " [%3i%%]\r"% int(f*100),
if PROGRESS_DIALOG_OFF: return
if title != self.title:
self.title = title
self.progressDialog.SetTitle(title)
self.progressDialog.Update(min(999,int(f*1000)), message)
class dummyStgy(Strategy):
def __init__(self, name):
Strategy.__init__(self)
self.name = name
class OfflineSim(SimInterface):
"""A SimInterface that does not need the CoopSim main application
to run.
"""
def __init__(self):
self.record = []
self.maxGenerations = MAX_GENERATIONS
self.logstr = []
self.summaryPage = ["<html>\n<head>\n<title>Summary</title>"+\
"\n</head>\n<body>\n"]
self.htmlLog = Logging.HTMLLog()
self.notifier = Notifier()
self.simSetup = None
def constructObjects(self):
self.graphDriver = Gfx.nilDriver()
self.simplexDriver = Gfx.nilDriver()
self.graph = Graph.Cartesian(self.graphDriver, 0, 0.0, 50, 1.0,
"Population dynamical simulation of the reiterated "+\
"Prisoners Dilemma", "Generations", "Population")
self.simplex = Simplex.Diagram(self.simplexDriver, lambda p:p,
"Simplex diagram of the population dynamics of the "+\
"reiterated Prisoners Dilemma",
"Strategy 1", "Strategy 2", "Strategy 3",
styleFlags = Simplex.VECTORS)
self.simulation = Simulation(self.graph, self.simplex, self.htmlLog,
self.notifier)
def close(self):
pass
def newSetup(self, setup, maxGenerations = MAX_GENERATIONS):
self.simSetup = setup
self.constructObjects()
self.simulation.newSetup(self.simSetup,
self.notifier.progressIndicator)
self.notifier.progressIndicator(1.0)
self.maxGenerations = maxGenerations
self.record = []
def getSimulationObject(self):
return self.simulation
def continueSim(self):
self.notifier.progressIndicator(self.simulation.lastGeneration/MAX_GENERATIONS,
message="Evolutionary Sim...")
self.simulation.continueSim(self.record,
withSimplex = (len(self.simSetup.strategyList)==3))
if checkEquilibrium(self.record) or \
len(self.record) >= self.maxGenerations: return False
else: return True
def wx_dumpHTMLImages(self, w = SAVE_IMAGE_WIDTH, h = SAVE_IMAGE_HEIGHT):
#print self.simulation.imgdirName
try:
os.mkdir(self.simulation.imgdirName)
except OSError, errobj: # catch dir exists error
# print "Writing images for "+self.simulation.imgdirName
if errobj.errno != 17: raise OSError, errobj
N = float(len(self.simulation.rangeStack)+1); count = 1
msg = "Writing images for HTML page..."
self.notifier.progressIndicator(0.0, message = msg)
buf = wx.EmptyBitmap(w, h)
buf2 = wx.EmptyBitmap(HTML_IMAGE_WIDTH, HTML_IMAGE_HEIGHT)
gd = wxGfx.Driver(wx.BufferedDC(None, buf))
gd2 = wxGfx.Driver(wx.BufferedDC(None, buf2))
pd = psGfx.Driver()
bigPen = Gfx.BLACK_PEN
smallPen = copy.copy(Gfx.BLACK_PEN); smallPen.fontSize = Gfx.SMALL
if len(self.simSetup.strategyList) == 3:
self.simplex.changeGfx(gd)
image = wx.ImageFromBitmap(buf)
path = self.simulation.imgdirName+"/"+self.simulation.simplexName
image.SaveFile(path+".png", wx.BITMAP_TYPE_PNG)
image.Destroy()
pd.clear(); self.simplex.changeGfx(pd)
#pd.save(path+".eps")
f = ZIPFile(path+".eps"+ZIP_EXTENSION, "w")
f.write(pd.getPostscript())
f.close()
self.notifier.progressIndicator(0.5/N, message = msg)
self.simplex.changeGfx(gd2)
self.simplex.setStyle(titlePen = smallPen)
image = wx.ImageFromBitmap(buf2)
image.SaveFile(path+"_web.png", wx.BITMAP_TYPE_PNG)
image.Destroy()
self.simplex.changeGfx(self.simplexDriver)
self.simplex.setStyle(titlePen = bigPen)
self.notifier.progressIndicator(1.0/N, message = msg)
for imgName, x1, y1, x2, y2 in self.simulation.rangeStack:
self.graph.adjustRange(x1, y1, x2, y2)
self.graph.changeGfx(gd)
self.graph.setStyle(titlePen = bigPen)
image = wx.ImageFromBitmap(buf)
path = self.simulation.imgdirName + "/" + imgName
image.SaveFile(path+".png", wx.BITMAP_TYPE_PNG)
image.Destroy()
pd.clear(); self.graph.changeGfx(pd)
#pd.save(path+".eps")
f = ZIPFile(path+".eps"+ZIP_EXTENSION, "w")
f.write(pd.getPostscript())
f.close()
self.notifier.progressIndicator((count+0.5)/N, message = msg)
self.graph.changeGfx(gd2)
self.graph.setStyle(titlePen = smallPen)
image = wx.ImageFromBitmap(buf2)
image.SaveFile(path+"_web.png", wx.BITMAP_TYPE_PNG)
image.Destroy()
count += 1
self.notifier.progressIndicator(count/N, message = msg)
self.graph.changeGfx(self.graphDriver)
self.graph.setStyle(titlePen = bigPen)
self.notifier.progressIndicator(1.0, message = msg)
def gtk_dumpHTMLImages(self, w = SAVE_IMAGE_WIDTH, h = SAVE_IMAGE_HEIGHT):
try:
os.mkdir(self.simulation.imgdirName)
except OSError, errobj: # catch dir exists error
if errobj.errno != 17: raise OSError, errobj
N = float(len(self.simulation.rangeStack)+1); count = 1
msg = "Writing images for HTML page..."
self.notifier.progressIndicator(0.0, message = msg)
cv1 = gtk.DrawingArea(); cv1.set_size_request(w, h)
cv2 = gtk.DrawingArea(); cv2.set_size_request(HTML_IMAGE_WIDTH, HTML_IMAGE_HEIGHT)
pm = gdk.Pixmap(None, w, h, 24)
pm2 = gdk.Pixmap(None, HTML_IMAGE_WIDTH, HTML_IMAGE_HEIGHT, 24)
buf = gdk.Pixbuf(gdk.COLORSPACE_RGB, False, 8, w, h)
buf2 = gdk.Pixbuf(gdk.COLORSPACE_RGB, False, 8, HTML_IMAGE_WIDTH, HTML_IMAGE_HEIGHT)
gd = gtkGfx.Driver(cv1, cv1.create_pango_layout(""))
gd.changeDrawable(pm)
gd2 = gtkGfx.Driver(cv2, cv2.create_pango_layout(""))
gd2.changeDrawable(pm2)
pd = psGfx.Driver()
bigPen = Gfx.BLACK_PEN
smallPen = copy.copy(Gfx.BLACK_PEN); smallPen.fontSize = Gfx.SMALL
if len(self.simSetup.strategyList) == 3:
self.simplex.changeGfx(gd)
path = self.simulation.imgdirName+"/"+self.simulation.simplexName
pw,ph = pm.get_size()
buf.get_from_drawable(pm, pm.get_colormap(), 0,0,0,0, pw, ph)
buf.save(path+".png", "png")
pd.clear(); self.simplex.changeGfx(pd)
#pd.save(path+".eps")
f = ZIPFile(path+".eps"+ZIP_EXTENSION, "w")
f.write(pd.getPostscript())
f.close()
self.notifier.progressIndicator(0.5/N, message = msg)
self.simplex.changeGfx(gd2)
self.simplex.setStyle(titlePen = smallPen)
pw,ph = pm2.get_size()
buf2.get_from_drawable(pm2, pm2.get_colormap(), 0,0,0,0, pw, ph)
buf2.save(path+"_web.png", "png")
self.simplex.changeGfx(self.simplexDriver)
self.simplex.setStyle(titlePen = bigPen)
self.notifier.progressIndicator(1.0/N, message = msg)
for imgName, x1, y1, x2, y2 in self.simulation.rangeStack:
self.graph.adjustRange(x1, y1, x2, y2)
self.graph.changeGfx(gd)
self.graph.setStyle(titlePen = bigPen)
path = self.simulation.imgdirName + "/" + imgName
pw,ph = pm.get_size()
buf.get_from_drawable(pm, pm.get_colormap(), 0,0,0,0, pw, ph)
buf.save(path+".png", "png")
pd.clear(); self.graph.changeGfx(pd)
#pd.save(path+".eps")
f = ZIPFile(path+".eps"+ZIP_EXTENSION, "w")
f.write(pd.getPostscript())
f.close()
self.notifier.progressIndicator((count+0.5)/N, message = msg)
self.graph.changeGfx(gd2)
self.graph.setStyle(titlePen = smallPen)
pw,ph = pm2.get_size()
buf2.get_from_drawable(pm2, pm2.get_colormap(), 0,0,0,0, pw, ph)
buf2.save(path+"_web.png", "png")
count += 1
self.notifier.progressIndicator(count/N, message = msg)
self.graph.changeGfx(self.graphDriver)
self.graph.setStyle(titlePen = bigPen)
self.notifier.progressIndicator(1.0, message = msg)
dumpHTMLImages = wx_dumpHTMLImages
def dumpEPSImagesOnly(self):
try:
os.mkdir(self.simulation.imgdirName)
except OSError, errobj: # catch dir exists error
if errobj.errno != 17: raise OSError, errobj
N = float(len(self.simulation.rangeStack)+1); count = 1
msg = "Writing EPS images ..."
self.notifier.progressIndicator(0.0, message = msg)
pd = psGfx.Driver()
if len(self.simSetup.strategyList) == 3:
path = self.simulation.imgdirName+"/"+self.simulation.simplexName
pd.clear(); self.simplex.changeGfx(pd)
#pd.save(path+".eps")
f = ZIPFile(path+".eps"+ZIP_EXTENSION, "w")
f.write(pd.getPostscript())
f.close()
self.simplex.changeGfx(self.simplexDriver)
self.notifier.progressIndicator(1.0/N, message = msg)
for imgName, x1, y1, x2, y2 in self.simulation.rangeStack:
self.graph.adjustRange(x1, y1, x2, y2)
path = self.simulation.imgdirName + "/" + imgName
pd.clear(); self.graph.changeGfx(pd)
#pd.save(path+".eps")
f = ZIPFile(path+".eps"+ZIP_EXTENSION, "w")
f.write(pd.getPostscript())
f.close()
count += 1
self.notifier.progressIndicator(count/N, message = msg)
self.graph.changeGfx(self.graphDriver)
self.notifier.progressIndicator(1.0, message = msg)
def writeResults(self, prefix=""):
self.dumpHTMLImages()
try:
if prefix: fName = prefix
else: fName = self.simSetup.fname()
pageName = fName + ".html"
self.summaryPage.append('<a href="'+pageName+'">'+fName+\
'</a><br />')
f = file(pageName, "w")
f.write(self.htmlLog.getHTMLPage())
f.close()
gc.collect()
pickleSimulationResults(fName, self.simulation,
self.record)
except IOError,(errno, strerr):
self.log("IOError %i,%s while trying to save %s.html" % \
(errno, strerr, fName))
def replay(self, setup, prefix = "", maxGenerations = MAX_GENERATIONS):
self.simSetup = setup
if prefix: fName = prefix
else: fName = self.simSetup.fname()
fName = matchName(fName)
self.simSetup.fname = lambda : fName[7:-5] # don't try this at home!!!
self.simSetup.name = self.simSetup.fname() # avoid buggy headings
strategies, PM, record = unpickleSimulationResults(fName)
self.simSetup.cachedPM = PM
#if len(self.simSetup.strategyList) != len(strategies):
dummyStgies = [dummyStgy(s) for s in strategies]
self.simSetup.strategyList = dummyStgies # really dirty hack!!!
lg = Logging.HTMLLog() #dummy
for et in ["evranking","toc","title"]: lg.entryPoint(et)
self.simSetup.cachedLog = lg.backup()
self.newSetup(self.simSetup, self.notifier.progressIndicator)
#for s in self.simulation.setup.strategyList: print s,
self.maxGenerations = maxGenerations
generations = [50]; x = 50
while x < self.maxGenerations:
x *= 2; generations.append(x)
generations.append(generations[-1]*2)
self.record = record
if REDO_IMAGES or REDO_PAGES:
g = self.simulation.graph; of = Graph.OptimizingFilter(g)
self.record.insert(0, Dynamics.UniformDistribution(len(strategies)))
if (not REDO_AUTOMATA_ONLY) or \
self.simulation.imgdirName.find("Automat") > -1:
for i in xrange(len(self.record)):
p = self.record[i]
for k in xrange(len(strategies)):
of.addValue(strategies[k],i,p[k])
if i == generations[0]:
imgName = fName[7:-5] + "_gn%i" % i
self.simulation.rangeStack.append((imgName, 0.0, 0.0,
float(i), min(g.y2, 1.0)))
del generations[0]
g.adjustRange(0.0, 0.0, generations[0], g.y2)
of.graphChanged()
self.simulation.setup.population = self.record[-1]
def fixIndex(self, setup, prefix=""):
if prefix: fName = prefix
else: fName = setup.fname()
pageName = fName + ".html"
self.summaryPage.append('<a href="'+pageName+'">'+fName+\
'</a><br />')
def log(self, str):
self.logstr.append(str)
print str
def dumpLog(self):
self.summaryPage.append("</body></html>\n")
try:
f = file("LOG.TXT", "a")
f.write("\n".join(self.logstr))
f.close()
f = file("index.html", "w")
f.write("".join(self.summaryPage))
f.close()
except IOError, (errno, strerr):
print "IOError %i,%s while trying to save"+\
"LOG.TXT and summary page" % (errno, strerr)
fnDir = []
def fixName(fName):
global fnDir
if fnDir == []:
dr = os.listdir("./")
fnDir = [entry for entry in dr if entry[-5:] == ".html" and \
entry.find("nav") == -1]
fnDir.sort()
i = int(fName[0:6]) - 1
if fnDir[i] != fName:
print "fixing name: "+fnDir[i]
os.rename(fnDir[i], fName)
class SeriesRunner(object):
"""Runs a simulation series.
"""
def __init__(self, sim_interface, desc, operator = NullOperator,
limited = False,
directory = os.path.expanduser("~/Simulations")):
"""Initialize with series descriptor and a generator. If
the generator is 'None', the desc.generate will be used
as standard generator.
"""
self.sim = sim_interface
self.series = desc
self.operator = operator
self.names = []
self.limited = limited
if self.limited:
self.generator = self.series.genLimitedSeries
else: self.generator = self.series.generate
self.maxGenerations = MAX_GENERATIONS
self._prepareStatistics()
if directory[-1] != "/":
self.dir = directory + "/" + self.series.name
else:
self.dir = directory + self.series.name
try: os.mkdir(self.dir)
except OSError, errobj: # catch dir exists error
if errobj.errno != 17: raise OSError, errobj
def _prepareStatistics(self):
if self.operator == MonteCarloOperator:
self.statistics = [SeriesStatistics(self.series)]
else:
self.statistics = SpawnStatisticsObjects(self.series)
def writeCounter(self, counter):
"""Write the state of the simulation counter to disk, to be
able to resume the simulation in case the program crashes.
"""
try:
f = file("COUNTER.TXT", "w")
f.write(str(counter))
f.close()
f = file("STATE.PICKLE_NEW", "w")
pickle.dump(self.series, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(self.statistics, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(self.names, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(PD.cache, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(PD.outsourced_match_logs, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(PD.outsourced_last_name, f, pickle.HIGHEST_PROTOCOL)
f.close()
except IOError, (errno, strerr):
print "IOError %i,%s while trying to save COUNTER.TXT" % \
(errno, strerr)
try:
os.remove("STATE.PICKLE")
#print os.listdir("")
except OSError, (errno, strerr):
print "IOError %i, %s may be ignored"%(errno, strerr)
print os.getcwd()
try:
os.rename("STATE.PICKLE_NEW", "STATE.PICKLE")
except OSError, (errno, strerr):
print "IOError %i, %s"%(errno, strerr)
def readCounter(self):
"""-> counter"""
try:
f = file("COUNTER.TXT", "r")
counter = int(f.read())+1
f.close()
except IOError, (errno, strerr):
if errno != 2:
print "IOError %i,%s while trying to read COUNTER.TXT" % \
(errno, strerr)
counter = 1
self._prepareStatistics()
except ValueError:
print "ValueError!? Resuming simulation series from the beginning!"
counter = 1
self._prepareStatistics()
try:
f = file("STATE.PICKLE", "r")
self.series = pickle.load(f)
if self.limited:
self.generator = self.series.genLimitedSeries
else: self.generator = self.series.generate
self.statistics = pickle.load(f)
self.names = pickle.load(f)
PD.cache = pickle.load(f)
PD.outsourced_match_logs = pickle.load(f)
PD.outsourced_last_name = pickle.load(f)
f.close()
except IOError, (errno, strerr):
if counter != 1:
print "IOError %i,%s while trying to recover simulation state"%(errno, strerr)
print "Restarting simulation series"
counter = 1
self._prepareStatistics()
except EOFError:
print "EOFError while trying to recover simulation state"
print "Restarting simulation series"
counter = 1
self._prepareStatistics()
return counter
def resume(self, replay=False):
"""Resume the simulation series, where it has been stopped.
"""
cwd = os.getcwd()
os.chdir(self.dir)
if replay:
resume_from = 1
self.sim.log("Replaying simulation series %s from the beginning"%\
self.series.name)
else: resume_from = self.readCounter()
if resume_from > 1:
self.sim.log("Resuming simulation series %s from simulation %i"% \
(self.series.name, resume_from))
else:
if self.limited:
self.sim.log("Running simulation series %s. %i simulations to go."%\
(self.series.name,
self.series.numberOfVariableParameters(self.operator)))
else:
self.sim.log("Running simulation series %s. %i simulations to go."%\
(self.series.name,
self.series.numberOfSimulations(self.operator)))
os.chdir(cwd)
if self.operator == MonteCarloOperator or \
self.operator == DiscreteMCOperator:
if self.limited:
repeat = self.series.numberOfVariableParameters(NullOperator)
else:
repeat = self.series.numberOfSimulations(NullOperator)
else: repeat = 1
save_MLO = PD.MATCH_LOG_OUTSOURCED
if self.operator == MonteCarloOperator:
PD.MATCH_LOG_OUTSOURCED = False
PD.USE_CACHE = False
else: PD.MATCH_LOG_OUTSOURCED = True
counter = 0
for setup in self.generator(self.operator, repeat):
counter += 1;
prefix = ("%6i_"%counter).replace(" ","0") + setup.fname()
if counter < resume_from:
self.sim.fixIndex(setup, prefix)
continue
self.names.append(prefix)
if replay:
#self.sim.log("Reading simulation %i. %s"%(counter,setup.name))
os.chdir(self.dir)
self.sim.replay(setup, prefix)
for s in self.statistics:
s.record(self.sim.getSimulationObject())
if REDO_PAGES:
pass
elif REDO_IMAGES:
if REDO_AUTOMATA_ONLY:
if self.sim.simulation.imgdirName.find("Automat") > -1:
print "writing images for", setup.name
self.sim.dumpHTMLImages()
else: self.sim.dumpEPSImagesOnly()
os.chdir(cwd)
else:
self.sim.log("Running simulation %i. %s"%(counter,setup.name))
self.sim.newSetup(setup)
while self.sim.continueSim():
pass
os.chdir(self.dir)
for s in self.statistics:
s.record(self.sim.getSimulationObject())
self.sim.writeResults(prefix)
self.writeCounter(counter)
os.chdir(cwd)
gc.collect()
os.chdir(self.dir)
if replay: self.writeCounter(counter)
for s in self.statistics: s.write()
if not replay: self.writeMatchLogs()
PD.MATCH_LOG_OUTSOURCED = save_MLO
self.sim.log("Ready.")
if not replay: self.sim.dumpLog()
self.sim.close()
os.chdir(cwd)
def writeMatchLogs(self):
if not PD.MATCH_LOG_OUTSOURCED: return
try:
os.mkdir(PD.MATCH_LOG_DIRECTORY)
except OSError, errobj: # catch dir exists error
if errobj.errno != 17: raise OSError, errobj
for matchlog, filename in PD.outsourced_pages():
f = file(filename, "w")
f.write("".join(matchlog))
f.close()
def createNavigationPages(self):
"""Creates html pages that help navigating through the results
via frames.
"""
def mul(seq):
return reduce(lambda a,b:a*b, seq, 1)
def decompose(i, k):
if k > 0: m = i % modulo[k-1]
else: m = i
n = m - m % modulo[k]
return i - n
cwd = os.getcwd()
os.chdir(self.dir)
link_tmpl = """<a href="%s.html" target="content" onclick="switchnav('%s_nav.html')">"""
pl = self.series.getParameterList(self.operator)
pn = self.series.getParameterNames()
variations = self.series.parameterVariations(self.operator)
modulo = tuple([mul(variations[i+1:]) for i in range(len(variations))])
for i in xrange(len(self.names)):
page = ["<html>\n<head>\n"]
page.append(FRAME_SCRIPT)
page.append("</head>\n<body>\n")
for k in xrange(len(pn)):
page.append("<i>"+pn[k]+":</i> ")
pos = decompose(i,k) # i % modulo[k] ???
for c in xrange(variations[k]):
idx = c * modulo[k] + pos
#if k==0: print idx, i
par = str(pl[k][c])
if idx == i:
page.append("<b>"+par+"</b>&nbsp;&nbsp;&nbsp;")
else:
page.append(link_tmpl%(self.names[idx],self.names[idx]))
page.append(par+"</a>&nbsp;&nbsp;&nbsp;")
#fixName(self.names[idx])
page.append("<br />\n")
page.append("</body>\n</html>\n")
try:
f = file(self.names[i]+"_nav.html", "w")
f.write("".join(page))
f.close()
except IOError, (errno, errstr):
print "IOError %i,%s while trying to create navigation pages"%(errno, strerr)
try:
f = file("index_frames.html", "w")
f.write(FRAME_SET.replace("$NAV", self.names[0]+"_nav.html").\
replace("$CONT", self.names[0]+".html"))
f.close()
except IOError, (errno, errstr):
print "IOError %i,%s while trying to navigation index"%(errno, strerr)
os.chdir(cwd)
######################################################################
#
# Simulation Code
#
######################################################################
def limited_series():
series = BigSimulationSeries()
series.name = "LimitedSeries"
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series, limited = True)
runner.resume(replay=REPLAY)
def big_series():
series = BigSimulationSeries()
series.name = "BigSeries"
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series, limited = False)
runner.resume(replay = REPLAY)
runner.createNavigationPages()
def border_condition_series():
series = BigSimulationSeries()
series.name = "BorderConditionsSeries"
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series,
operator = LeftRightOperator,
limited = False)
runner.resume(replay=REPLAY)
runner.createNavigationPages()
def monte_carlo_series():
series = BigSimulationSeries()
series.name = "MonteCarloSeries"
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series,
operator = MonteCarloOperator,
limited = False)
runner.resume(replay=REPLAY)
# runner.createNavigationPages()
def discrete_monte_carlo_series():
series = BigSimulationSeries()
series.name = "DiscreteMonteCarloSeries"
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series,
operator = DiscreteMCOperator,
limited = False)
runner.resume(replay=REPLAY)
# runner.createNavigationPages()
######################################################################
def alternative_series():
series = AlternativeSeries()
series.name = "AlternativeSeries"
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series, limited = False)
runner.resume(replay=REPLAY)
runner.createNavigationPages()
def alternative_series_with_demes():
series = AlternativeSeries()
series.name = "AlternativeSeries_with_demes"
series.demes = ((50,3,7,5),)
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series, limited = False)
runner.resume(replay=REPLAY)
runner.createNavigationPages()
######################################################################
#
# Test Code
#
######################################################################
def system_test():
series = TestSeries()
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series)
runner.resume(replay=REPLAY)
runner.createNavigationPages()
def automata_test():
series = TestAutomata()
siminterface = OfflineSim()
runner = SeriesRunner(siminterface, series)
runner.resume()
if __name__ == "__main__":
#automata_test(None)
#limited_series()
# MainWindow = wxGfx.Window(title="CoopSim - Simulation Series")
myApp = wx.PySimpleApp()
MainWindow = wx.ProgressDialog("CoopSim - Simulation Series",
"Please Wait (very long...)", 1000)
# system_test()
# set 1
# limited_series()
# big_series()
# border_condition_series()
monte_carlo_series()
# discrete_monte_carlo_series()
# set 2
# alternative_series()
# alternative_series_with_demes()
MainWindow.Destroy()