Skip to content
This repository has been archived by the owner on Dec 13, 2021. It is now read-only.

[EXPERIMENTAL] new exploration policy: replayable #137

Merged
merged 1 commit into from
Apr 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions earthquake/explorepolicy/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package explorepolicy
import (
dumb "github.com/osrg/earthquake/earthquake/explorepolicy/dumb"
random "github.com/osrg/earthquake/earthquake/explorepolicy/random"
replayable "github.com/osrg/earthquake/earthquake/explorepolicy/replayable"
)

func RegisterKnownExplorePolicies() {
RegisterPolicy(dumb.Name, func() ExplorePolicy { return dumb.New() })
RegisterPolicy(random.Name, func() ExplorePolicy { return random.New() })
RegisterPolicy(replayable.Name, func() ExplorePolicy { return replayable.New() })
}
126 changes: 126 additions & 0 deletions earthquake/explorepolicy/replayable/replayablepolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package replayable provides the EXPERIMENTAL semi-deterministic replayable policy.
package replayable

import (
"hash/fnv"
"os"
"time"

log "github.com/cihub/seelog"
"github.com/osrg/earthquake/earthquake/historystorage"
"github.com/osrg/earthquake/earthquake/signal"
"github.com/osrg/earthquake/earthquake/util/config"
)

type Replayable struct {
// channel
actionCh chan signal.Action

// parameter "maxInterval"
MaxInterval time.Duration

// parameter "seed"
Seed string
}

func New() *Replayable {
log.Warnf("The replayable explorer is EXPERIMENTAL feature.")
r := &Replayable{
actionCh: make(chan signal.Action),
MaxInterval: time.Duration(0),
Seed: "",
}
return r
}

const Name = "replayable"

// returns "replayable"
func (r *Replayable) Name() string {
return Name
}

// parameters:
// - maxInterval(duration): max interval (default: 10 msecs)
// - seed(string): seed for replaying (default: empty). can be overriden by EQ_REPLAY_SEED.
//
// should support dynamic reloading
func (r *Replayable) LoadConfig(cfg config.Config) error {
log.Debugf("CONFIG: %s", cfg.AllSettings())
paramMaxInterval := "explorepolicyparam.maxInterval"
if cfg.IsSet(paramMaxInterval) {
r.MaxInterval = cfg.GetDuration(paramMaxInterval)
log.Infof("Set maxInterval=%s", r.MaxInterval)
} else {
r.MaxInterval = 10 * time.Millisecond
log.Infof("Using default maxInterval=%s", r.MaxInterval)
}

paramSeed := "explorepolicyparam.seed"
if cfg.IsSet(paramSeed) {
r.Seed = cfg.GetString(paramSeed)
log.Infof("Set seed=%s", r.Seed)
} else {
r.Seed = ""
log.Infof("Using default seed=%s", r.Seed)
}

envSeed := "EQ_REPLAY_SEED"
if v := os.Getenv(envSeed); v != "" {
r.Seed = v
log.Infof("Overriding seed=%s (%s)", r.Seed, envSeed)
}

return nil
}

func (d *Replayable) SetHistoryStorage(storage historystorage.HistoryStorage) error {
return nil
}

func (d *Replayable) ActionChan() chan signal.Action {
return d.actionCh
}

func (r *Replayable) determineInterval(event signal.Event) time.Duration {
if r.MaxInterval == 0 {
log.Warnf("MaxInterval is zero")
return 0
}
hint := event.ReplayHint()
h := fnv.New64a()
h.Write([]byte(r.Seed))
h.Write([]byte(hint))
ui64 := h.Sum64()
t := time.Duration(ui64 % uint64(r.MaxInterval))
log.Debugf("REPLAYABLE: Determined interval %s for (seed=%s,hint=%s)",
t, r.Seed, hint)
return t
}

func (r *Replayable) QueueEvent(event signal.Event) {
interval := r.determineInterval(event)
action, err := event.DefaultAction()
if err != nil {
panic(log.Critical(err))
}
go func() {
<-time.After(interval)
r.actionCh <- action
}()
}
110 changes: 110 additions & 0 deletions earthquake/explorepolicy/replayable/replayablepolicy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package replayable

import (
"flag"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/osrg/earthquake/earthquake/signal"
"github.com/osrg/earthquake/earthquake/util/config"
logutil "github.com/osrg/earthquake/earthquake/util/log"
testutil "github.com/osrg/earthquake/earthquake/util/test"
)

func TestMain(m *testing.M) {
flag.Parse()
logutil.InitLog("", true)
signal.RegisterKnownSignals()
os.Exit(m.Run())
}

func TestReplayableWithPacketEvent_10_2(t *testing.T) {
xTestPolicyWithPacketEvent(t, 10, 2, true)
}

func TestReplayableWithPacketEvent_10_10(t *testing.T) {
xTestPolicyWithPacketEvent(t, 10, 10, true)
}

func TestReplayableShouldNotBlockWithPacketEvent_10_2(t *testing.T) {
xTestPolicyWithPacketEvent(t, 10, 2, false)
}

func TestReplayableShouldNotBlockWithPacketEvent_10_10(t *testing.T) {
xTestPolicyWithPacketEvent(t, 10, 10, false)
}

func newPolicy(t *testing.T, seed string) *Replayable {
policy := New()
cfg := config.New()
cfg.Set("explorePolicy", "replayable")
cfg.Set("explorePolicyParam", map[string]interface{}{
"maxInterval": 1 * time.Second,
"seed": seed,
})
err := policy.LoadConfig(cfg)
assert.NoError(t, err)
return policy
}

func xTestPolicyWithPacketEvent(t *testing.T, n, entities int, concurrent bool) {
seed := "foobar"
policy := newPolicy(t, seed)
sender := func() {
for i := 0; i < n; i++ {
entityID := fmt.Sprintf("entity-%d", i%entities)
event := testutil.NewPacketEvent(t, entityID, i).(*signal.PacketEvent)
hint := fmt.Sprintf("hint-%s-%d", entityID, i)
event.SetReplayHint(hint)
t.Logf("Test %d: Sending event (hint=%s)", i, hint)
policy.QueueEvent(event)
t.Logf("Test %d: Sent event (hint=%s)", i, hint)
}
}
receiver := func() {
for i := 0; i < n; i++ {
t.Logf("Test %d: Receiving", i)
action := <-policy.ActionChan()
event := action.Event()
t.Logf("Test %d: Received action (event hint=%s)",
i, event.ReplayHint())
}
}

if concurrent {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
sender()
}()
go func() {
defer wg.Done()
receiver()
}()
wg.Wait()
} else {
sender()
receiver()
}
}
14 changes: 14 additions & 0 deletions earthquake/signal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ func (this *BasicEvent) SetDeferred(deferred bool) {
this.Set("deferred", deferred)
}

// implements Event
func (this *BasicEvent) ReplayHint() string {
hint, ok := this.Get("replay_hint").(string)
if !ok {
return ""
}
return hint
}

func (this *BasicEvent) SetReplayHint(hint string) {
// use BasicSignal.Set() so as to serialize in JSON
this.Set("replay_hint", hint)
}

// implements Event
func (this *BasicEvent) DefaultAction() (Action, error) {
if this.Deferred() {
Expand Down
9 changes: 9 additions & 0 deletions earthquake/signal/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ type Event interface {
// json name: "deferred"
Deferred() bool

// explore policy can use this hash string as a hint for semi-deterministic replaying.
// The hint should not contain time-dependent or random things for better determinism.
// Note that we will not support fully deterministic replaying.
//
// The hint can contain any character.
//
// json name: "replay_hint"
ReplayHint() string

// default positive action. can be NopAction, but cannot be nil.
// (NopAction is used for history storage)
DefaultAction() (Action, error)
Expand Down
15 changes: 15 additions & 0 deletions example/zk-found-2212.nfqhook/config_replayable_experimental.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
init = "init.sh"
run = "run.sh"
validate = "validate.sh"
clean = "clean.sh"
restPort = 10080

# The replayable policy enables "semi-deterministic" replaying.
# You have to specify a "seed" string as EQ_REPLAY_SEED (environmental variable).
# The replayable policy determines delays as `hash(seed, hint) % maxInterval`.
# The "hint" is determined by pyearthquake zookeeper inspector.
explorePolicy = "replayable"

[explorePolicyParam]
# larger interval can increase reproducibility
maxInterval = "100ms"
4 changes: 3 additions & 1 deletion misc/pyearthquake/inspector/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def map_zktraffic_message_to_event(self, zt_msg):
src_entity, dst_entity = self.map_zktraffic_message_to_entity_ids(
zt_msg)
d = self.map_zktraffic_message_to_dict(zt_msg)
event = PacketEvent.from_message(src_entity, dst_entity, d)
## replay_hint is an optional string that helps replaying
replay_hint = str(hash(frozenset(d.items())))
event = PacketEvent.from_message(src_entity, dst_entity, d, replay_hint)

if isinstance(zt_msg, FLE.Message):
LOG.debug(colorama.Back.CYAN + colorama.Fore.BLACK +
Expand Down
3 changes: 2 additions & 1 deletion misc/pyearthquake/signal/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ class PacketEvent(EventBase):
deferred = True

@classmethod
def from_message(cls, src_entity, dst_entity, message):
def from_message(cls, src_entity, dst_entity, message, replay_hint=""):
inst = cls()
# we do not set inst.entity here
inst.option = {
'src_entity': src_entity,
'dst_entity': dst_entity,
'message': message
}
inst.replay_hint = replay_hint
return inst


Expand Down
3 changes: 2 additions & 1 deletion misc/pyearthquake/signal/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class EventBase(SignalBase):
"""
type_name = 'event'
deferred = False
var_names = SignalBase.var_names + ['deferred']
replay_hint = ''
var_names = SignalBase.var_names + ['deferred', 'replay_hint']
# orchestrator sets recv_timestamp
recv_timestamp = -1

Expand Down