Skip to content
This repository has been archived by the owner on Dec 13, 2021. It is now read-only.

Commit

Permalink
[EXPERIMENTAL] new exploration policy: replayable
Browse files Browse the repository at this point in the history
If event has "replay_hint" hash string (that does not contain time-dependent/random things),
we can semi-deterministically replay a scenario using time.Duration(hash(seed,replay_hint) % maxInterval).

The seed can be set as EQ_REPLAY_SEED.

I tested with example/zk-found-2212.nfqhook/config_replayable_experimental.toml.

 - Test count: 100 times for each
 - Config: replayable(maxInterval=100msec)
 -- Reproducibility with random seed: 5% (lower than the result in FOSDEM slice because I used different intervals in this test)
 -- Reproducibility with a certain fixed seed: 24% (up!)
  • Loading branch information
AkihiroSuda committed Apr 21, 2016
1 parent 08086d0 commit 13aa33b
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 3 deletions.
2 changes: 2 additions & 0 deletions earthquake/explorepolicy/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package explorepolicy
import (
dumb "github.com/osrg/earthquake/earthquake/explorepolicy/dumb"
random "github.com/osrg/earthquake/earthquake/explorepolicy/random"
replayable "github.com/osrg/earthquake/earthquake/explorepolicy/replayable"
)

func RegisterKnownExplorePolicies() {
RegisterPolicy(dumb.Name, func() ExplorePolicy { return dumb.New() })
RegisterPolicy(random.Name, func() ExplorePolicy { return random.New() })
RegisterPolicy(replayable.Name, func() ExplorePolicy { return replayable.New() })
}
126 changes: 126 additions & 0 deletions earthquake/explorepolicy/replayable/replayablepolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package replayable provides the EXPERIMENTAL semi-deterministic replayable policy.
package replayable

import (
"hash/fnv"
"os"
"time"

log "github.com/cihub/seelog"
"github.com/osrg/earthquake/earthquake/historystorage"
"github.com/osrg/earthquake/earthquake/signal"
"github.com/osrg/earthquake/earthquake/util/config"
)

type Replayable struct {
// channel
actionCh chan signal.Action

// parameter "maxInterval"
MaxInterval time.Duration

// parameter "seed"
Seed string
}

func New() *Replayable {
log.Warnf("The replayable explorer is EXPERIMENTAL feature.")
r := &Replayable{
actionCh: make(chan signal.Action),
MaxInterval: time.Duration(0),
Seed: "",
}
return r
}

const Name = "replayable"

// returns "replayable"
func (r *Replayable) Name() string {
return Name
}

// parameters:
// - maxInterval(duration): max interval (default: 10 msecs)
// - seed(string): seed for replaying (default: empty). can be overriden by EQ_REPLAY_SEED.
//
// should support dynamic reloading
func (r *Replayable) LoadConfig(cfg config.Config) error {
log.Debugf("CONFIG: %s", cfg.AllSettings())
paramMaxInterval := "explorepolicyparam.maxInterval"
if cfg.IsSet(paramMaxInterval) {
r.MaxInterval = cfg.GetDuration(paramMaxInterval)
log.Infof("Set maxInterval=%s", r.MaxInterval)
} else {
r.MaxInterval = 10 * time.Millisecond
log.Infof("Using default maxInterval=%s", r.MaxInterval)
}

paramSeed := "explorepolicyparam.seed"
if cfg.IsSet(paramSeed) {
r.Seed = cfg.GetString(paramSeed)
log.Infof("Set seed=%s", r.Seed)
} else {
r.Seed = ""
log.Infof("Using default seed=%s", r.Seed)
}

envSeed := "EQ_REPLAY_SEED"
if v := os.Getenv(envSeed); v != "" {
r.Seed = v
log.Infof("Overriding seed=%s (%s)", r.Seed, envSeed)
}

return nil
}

func (d *Replayable) SetHistoryStorage(storage historystorage.HistoryStorage) error {
return nil
}

func (d *Replayable) ActionChan() chan signal.Action {
return d.actionCh
}

func (r *Replayable) determineInterval(event signal.Event) time.Duration {
if r.MaxInterval == 0 {
log.Warnf("MaxInterval is zero")
return 0
}
hint := event.ReplayHint()
h := fnv.New64a()
h.Write([]byte(r.Seed))
h.Write([]byte(hint))
ui64 := h.Sum64()
t := time.Duration(ui64 % uint64(r.MaxInterval))
log.Debugf("REPLAYABLE: Determined interval %s for (seed=%s,hint=%s)",
t, r.Seed, hint)
return t
}

func (r *Replayable) QueueEvent(event signal.Event) {
interval := r.determineInterval(event)
action, err := event.DefaultAction()
if err != nil {
panic(log.Critical(err))
}
go func() {
<-time.After(interval)
r.actionCh <- action
}()
}
110 changes: 110 additions & 0 deletions earthquake/explorepolicy/replayable/replayablepolicy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package replayable

import (
"flag"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/osrg/earthquake/earthquake/signal"
"github.com/osrg/earthquake/earthquake/util/config"
logutil "github.com/osrg/earthquake/earthquake/util/log"
testutil "github.com/osrg/earthquake/earthquake/util/test"
)

func TestMain(m *testing.M) {
flag.Parse()
logutil.InitLog("", true)
signal.RegisterKnownSignals()
os.Exit(m.Run())
}

func TestReplayableWithPacketEvent_10_2(t *testing.T) {
xTestPolicyWithPacketEvent(t, 10, 2, true)
}

func TestReplayableWithPacketEvent_10_10(t *testing.T) {
xTestPolicyWithPacketEvent(t, 10, 10, true)
}

func TestReplayableShouldNotBlockWithPacketEvent_10_2(t *testing.T) {
xTestPolicyWithPacketEvent(t, 10, 2, false)
}

func TestReplayableShouldNotBlockWithPacketEvent_10_10(t *testing.T) {
xTestPolicyWithPacketEvent(t, 10, 10, false)
}

func newPolicy(t *testing.T, seed string) *Replayable {
policy := New()
cfg := config.New()
cfg.Set("explorePolicy", "replayable")
cfg.Set("explorePolicyParam", map[string]interface{}{
"maxInterval": 1 * time.Second,
"seed": seed,
})
err := policy.LoadConfig(cfg)
assert.NoError(t, err)
return policy
}

func xTestPolicyWithPacketEvent(t *testing.T, n, entities int, concurrent bool) {
seed := "foobar"
policy := newPolicy(t, seed)
sender := func() {
for i := 0; i < n; i++ {
entityID := fmt.Sprintf("entity-%d", i%entities)
event := testutil.NewPacketEvent(t, entityID, i).(*signal.PacketEvent)
hint := fmt.Sprintf("hint-%s-%d", entityID, i)
event.SetReplayHint(hint)
t.Logf("Test %d: Sending event (hint=%s)", i, hint)
policy.QueueEvent(event)
t.Logf("Test %d: Sent event (hint=%s)", i, hint)
}
}
receiver := func() {
for i := 0; i < n; i++ {
t.Logf("Test %d: Receiving", i)
action := <-policy.ActionChan()
event := action.Event()
t.Logf("Test %d: Received action (event hint=%s)",
i, event.ReplayHint())
}
}

if concurrent {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
sender()
}()
go func() {
defer wg.Done()
receiver()
}()
wg.Wait()
} else {
sender()
receiver()
}
}
14 changes: 14 additions & 0 deletions earthquake/signal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ func (this *BasicEvent) SetDeferred(deferred bool) {
this.Set("deferred", deferred)
}

// implements Event
func (this *BasicEvent) ReplayHint() string {
hint, ok := this.Get("replay_hint").(string)
if !ok {
return ""
}
return hint
}

func (this *BasicEvent) SetReplayHint(hint string) {
// use BasicSignal.Set() so as to serialize in JSON
this.Set("replay_hint", hint)
}

// implements Event
func (this *BasicEvent) DefaultAction() (Action, error) {
if this.Deferred() {
Expand Down
9 changes: 9 additions & 0 deletions earthquake/signal/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ type Event interface {
// json name: "deferred"
Deferred() bool

// explore policy can use this hash string as a hint for semi-deterministic replaying.
// The hint should not contain time-dependent or random things for better determinism.
// Note that we will not support fully deterministic replaying.
//
// The hint can contain any character.
//
// json name: "replay_hint"
ReplayHint() string

// default positive action. can be NopAction, but cannot be nil.
// (NopAction is used for history storage)
DefaultAction() (Action, error)
Expand Down
15 changes: 15 additions & 0 deletions example/zk-found-2212.nfqhook/config_replayable_experimental.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
init = "init.sh"
run = "run.sh"
validate = "validate.sh"
clean = "clean.sh"
restPort = 10080

# The replayable policy enables "semi-deterministic" replaying.
# You have to specify a "seed" string as EQ_REPLAY_SEED (environmental variable).
# The replayable policy determines delays as `hash(seed, hint) % maxInterval`.
# The "hint" is determined by pyearthquake zookeeper inspector.
explorePolicy = "replayable"

[explorePolicyParam]
# larger interval can increase reproducibility
maxInterval = "100ms"
4 changes: 3 additions & 1 deletion misc/pyearthquake/inspector/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def map_zktraffic_message_to_event(self, zt_msg):
src_entity, dst_entity = self.map_zktraffic_message_to_entity_ids(
zt_msg)
d = self.map_zktraffic_message_to_dict(zt_msg)
event = PacketEvent.from_message(src_entity, dst_entity, d)
## replay_hint is an optional string that helps replaying
replay_hint = str(hash(frozenset(d.items())))
event = PacketEvent.from_message(src_entity, dst_entity, d, replay_hint)

if isinstance(zt_msg, FLE.Message):
LOG.debug(colorama.Back.CYAN + colorama.Fore.BLACK +
Expand Down
3 changes: 2 additions & 1 deletion misc/pyearthquake/signal/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ class PacketEvent(EventBase):
deferred = True

@classmethod
def from_message(cls, src_entity, dst_entity, message):
def from_message(cls, src_entity, dst_entity, message, replay_hint=""):
inst = cls()
# we do not set inst.entity here
inst.option = {
'src_entity': src_entity,
'dst_entity': dst_entity,
'message': message
}
inst.replay_hint = replay_hint
return inst


Expand Down
3 changes: 2 additions & 1 deletion misc/pyearthquake/signal/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class EventBase(SignalBase):
"""
type_name = 'event'
deferred = False
var_names = SignalBase.var_names + ['deferred']
replay_hint = ''
var_names = SignalBase.var_names + ['deferred', 'replay_hint']
# orchestrator sets recv_timestamp
recv_timestamp = -1

Expand Down

1 comment on commit 13aa33b

@AkihiroSuda
Copy link
Member Author

@AkihiroSuda AkihiroSuda commented on 13aa33b Apr 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra test:

  • Config: replayable(maxInterval=300msec)
    • Reproducibility with random seed: 51%
    • Reproducibility with a certain fixed seed: 65%
  • Config: replayable(maxInterval=1000msec)
    • Reproducibility with random seed: 6%
    • Reproducibility with a certain fixed seed: 64%

Please sign in to comment.