diff --git a/earthquake/explorepolicy/register.go b/earthquake/explorepolicy/register.go index 9e68121..8ed4cbc 100644 --- a/earthquake/explorepolicy/register.go +++ b/earthquake/explorepolicy/register.go @@ -18,9 +18,11 @@ package explorepolicy import ( dumb "github.com/osrg/earthquake/earthquake/explorepolicy/dumb" random "github.com/osrg/earthquake/earthquake/explorepolicy/random" + replayable "github.com/osrg/earthquake/earthquake/explorepolicy/replayable" ) func RegisterKnownExplorePolicies() { RegisterPolicy(dumb.Name, func() ExplorePolicy { return dumb.New() }) RegisterPolicy(random.Name, func() ExplorePolicy { return random.New() }) + RegisterPolicy(replayable.Name, func() ExplorePolicy { return replayable.New() }) } diff --git a/earthquake/explorepolicy/replayable/replayablepolicy.go b/earthquake/explorepolicy/replayable/replayablepolicy.go new file mode 100644 index 0000000..ad73e92 --- /dev/null +++ b/earthquake/explorepolicy/replayable/replayablepolicy.go @@ -0,0 +1,126 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package replayable provides the EXPERIMENTAL semi-deterministic replayable policy. +package replayable + +import ( + "hash/fnv" + "os" + "time" + + log "github.com/cihub/seelog" + "github.com/osrg/earthquake/earthquake/historystorage" + "github.com/osrg/earthquake/earthquake/signal" + "github.com/osrg/earthquake/earthquake/util/config" +) + +type Replayable struct { + // channel + actionCh chan signal.Action + + // parameter "maxInterval" + MaxInterval time.Duration + + // parameter "seed" + Seed string +} + +func New() *Replayable { + log.Warnf("The replayable explorer is EXPERIMENTAL feature.") + r := &Replayable{ + actionCh: make(chan signal.Action), + MaxInterval: time.Duration(0), + Seed: "", + } + return r +} + +const Name = "replayable" + +// returns "replayable" +func (r *Replayable) Name() string { + return Name +} + +// parameters: +// - maxInterval(duration): max interval (default: 10 msecs) +// - seed(string): seed for replaying (default: empty). can be overriden by EQ_REPLAY_SEED. +// +// should support dynamic reloading +func (r *Replayable) LoadConfig(cfg config.Config) error { + log.Debugf("CONFIG: %s", cfg.AllSettings()) + paramMaxInterval := "explorepolicyparam.maxInterval" + if cfg.IsSet(paramMaxInterval) { + r.MaxInterval = cfg.GetDuration(paramMaxInterval) + log.Infof("Set maxInterval=%s", r.MaxInterval) + } else { + r.MaxInterval = 10 * time.Millisecond + log.Infof("Using default maxInterval=%s", r.MaxInterval) + } + + paramSeed := "explorepolicyparam.seed" + if cfg.IsSet(paramSeed) { + r.Seed = cfg.GetString(paramSeed) + log.Infof("Set seed=%s", r.Seed) + } else { + r.Seed = "" + log.Infof("Using default seed=%s", r.Seed) + } + + envSeed := "EQ_REPLAY_SEED" + if v := os.Getenv(envSeed); v != "" { + r.Seed = v + log.Infof("Overriding seed=%s (%s)", r.Seed, envSeed) + } + + return nil +} + +func (d *Replayable) SetHistoryStorage(storage historystorage.HistoryStorage) error { + return nil +} + +func (d *Replayable) ActionChan() chan signal.Action { + return d.actionCh +} + +func (r *Replayable) determineInterval(event signal.Event) time.Duration { + if r.MaxInterval == 0 { + log.Warnf("MaxInterval is zero") + return 0 + } + hint := event.ReplayHint() + h := fnv.New64a() + h.Write([]byte(r.Seed)) + h.Write([]byte(hint)) + ui64 := h.Sum64() + t := time.Duration(ui64 % uint64(r.MaxInterval)) + log.Debugf("REPLAYABLE: Determined interval %s for (seed=%s,hint=%s)", + t, r.Seed, hint) + return t +} + +func (r *Replayable) QueueEvent(event signal.Event) { + interval := r.determineInterval(event) + action, err := event.DefaultAction() + if err != nil { + panic(log.Critical(err)) + } + go func() { + <-time.After(interval) + r.actionCh <- action + }() +} diff --git a/earthquake/explorepolicy/replayable/replayablepolicy_test.go b/earthquake/explorepolicy/replayable/replayablepolicy_test.go new file mode 100644 index 0000000..dfd1154 --- /dev/null +++ b/earthquake/explorepolicy/replayable/replayablepolicy_test.go @@ -0,0 +1,110 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package replayable + +import ( + "flag" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/osrg/earthquake/earthquake/signal" + "github.com/osrg/earthquake/earthquake/util/config" + logutil "github.com/osrg/earthquake/earthquake/util/log" + testutil "github.com/osrg/earthquake/earthquake/util/test" +) + +func TestMain(m *testing.M) { + flag.Parse() + logutil.InitLog("", true) + signal.RegisterKnownSignals() + os.Exit(m.Run()) +} + +func TestReplayableWithPacketEvent_10_2(t *testing.T) { + xTestPolicyWithPacketEvent(t, 10, 2, true) +} + +func TestReplayableWithPacketEvent_10_10(t *testing.T) { + xTestPolicyWithPacketEvent(t, 10, 10, true) +} + +func TestReplayableShouldNotBlockWithPacketEvent_10_2(t *testing.T) { + xTestPolicyWithPacketEvent(t, 10, 2, false) +} + +func TestReplayableShouldNotBlockWithPacketEvent_10_10(t *testing.T) { + xTestPolicyWithPacketEvent(t, 10, 10, false) +} + +func newPolicy(t *testing.T, seed string) *Replayable { + policy := New() + cfg := config.New() + cfg.Set("explorePolicy", "replayable") + cfg.Set("explorePolicyParam", map[string]interface{}{ + "maxInterval": 1 * time.Second, + "seed": seed, + }) + err := policy.LoadConfig(cfg) + assert.NoError(t, err) + return policy +} + +func xTestPolicyWithPacketEvent(t *testing.T, n, entities int, concurrent bool) { + seed := "foobar" + policy := newPolicy(t, seed) + sender := func() { + for i := 0; i < n; i++ { + entityID := fmt.Sprintf("entity-%d", i%entities) + event := testutil.NewPacketEvent(t, entityID, i).(*signal.PacketEvent) + hint := fmt.Sprintf("hint-%s-%d", entityID, i) + event.SetReplayHint(hint) + t.Logf("Test %d: Sending event (hint=%s)", i, hint) + policy.QueueEvent(event) + t.Logf("Test %d: Sent event (hint=%s)", i, hint) + } + } + receiver := func() { + for i := 0; i < n; i++ { + t.Logf("Test %d: Receiving", i) + action := <-policy.ActionChan() + event := action.Event() + t.Logf("Test %d: Received action (event hint=%s)", + i, event.ReplayHint()) + } + } + + if concurrent { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + sender() + }() + go func() { + defer wg.Done() + receiver() + }() + wg.Wait() + } else { + sender() + receiver() + } +} diff --git a/earthquake/signal/event.go b/earthquake/signal/event.go index 2ec6d06..0178096 100644 --- a/earthquake/signal/event.go +++ b/earthquake/signal/event.go @@ -43,6 +43,20 @@ func (this *BasicEvent) SetDeferred(deferred bool) { this.Set("deferred", deferred) } +// implements Event +func (this *BasicEvent) ReplayHint() string { + hint, ok := this.Get("replay_hint").(string) + if !ok { + return "" + } + return hint +} + +func (this *BasicEvent) SetReplayHint(hint string) { + // use BasicSignal.Set() so as to serialize in JSON + this.Set("replay_hint", hint) +} + // implements Event func (this *BasicEvent) DefaultAction() (Action, error) { if this.Deferred() { diff --git a/earthquake/signal/interface.go b/earthquake/signal/interface.go index 494d8ba..174029e 100644 --- a/earthquake/signal/interface.go +++ b/earthquake/signal/interface.go @@ -21,6 +21,15 @@ type Event interface { // json name: "deferred" Deferred() bool + // explore policy can use this hash string as a hint for semi-deterministic replaying. + // The hint should not contain time-dependent or random things for better determinism. + // Note that we will not support fully deterministic replaying. + // + // The hint can contain any character. + // + // json name: "replay_hint" + ReplayHint() string + // default positive action. can be NopAction, but cannot be nil. // (NopAction is used for history storage) DefaultAction() (Action, error) diff --git a/example/zk-found-2212.nfqhook/config_replayable_experimental.toml b/example/zk-found-2212.nfqhook/config_replayable_experimental.toml new file mode 100644 index 0000000..1d9f53a --- /dev/null +++ b/example/zk-found-2212.nfqhook/config_replayable_experimental.toml @@ -0,0 +1,15 @@ +init = "init.sh" +run = "run.sh" +validate = "validate.sh" +clean = "clean.sh" +restPort = 10080 + +# The replayable policy enables "semi-deterministic" replaying. +# You have to specify a "seed" string as EQ_REPLAY_SEED (environmental variable). +# The replayable policy determines delays as `hash(seed, hint) % maxInterval`. +# The "hint" is determined by pyearthquake zookeeper inspector. +explorePolicy = "replayable" + +[explorePolicyParam] +# larger interval can increase reproducibility + maxInterval = "100ms" diff --git a/misc/pyearthquake/inspector/zookeeper.py b/misc/pyearthquake/inspector/zookeeper.py index 1487787..b26f0df 100644 --- a/misc/pyearthquake/inspector/zookeeper.py +++ b/misc/pyearthquake/inspector/zookeeper.py @@ -109,7 +109,9 @@ def map_zktraffic_message_to_event(self, zt_msg): src_entity, dst_entity = self.map_zktraffic_message_to_entity_ids( zt_msg) d = self.map_zktraffic_message_to_dict(zt_msg) - event = PacketEvent.from_message(src_entity, dst_entity, d) + ## replay_hint is an optional string that helps replaying + replay_hint = str(hash(frozenset(d.items()))) + event = PacketEvent.from_message(src_entity, dst_entity, d, replay_hint) if isinstance(zt_msg, FLE.Message): LOG.debug(colorama.Back.CYAN + colorama.Fore.BLACK + diff --git a/misc/pyearthquake/signal/event.py b/misc/pyearthquake/signal/event.py index ccfeaf5..7494f84 100644 --- a/misc/pyearthquake/signal/event.py +++ b/misc/pyearthquake/signal/event.py @@ -12,7 +12,7 @@ class PacketEvent(EventBase): deferred = True @classmethod - def from_message(cls, src_entity, dst_entity, message): + def from_message(cls, src_entity, dst_entity, message, replay_hint=""): inst = cls() # we do not set inst.entity here inst.option = { @@ -20,6 +20,7 @@ def from_message(cls, src_entity, dst_entity, message): 'dst_entity': dst_entity, 'message': message } + inst.replay_hint = replay_hint return inst diff --git a/misc/pyearthquake/signal/signal.py b/misc/pyearthquake/signal/signal.py index 2524969..2482ef2 100644 --- a/misc/pyearthquake/signal/signal.py +++ b/misc/pyearthquake/signal/signal.py @@ -101,7 +101,8 @@ class EventBase(SignalBase): """ type_name = 'event' deferred = False - var_names = SignalBase.var_names + ['deferred'] + replay_hint = '' + var_names = SignalBase.var_names + ['deferred', 'replay_hint'] # orchestrator sets recv_timestamp recv_timestamp = -1