diff --git a/README.md b/README.md index 08d913b..f09d739 100644 --- a/README.md +++ b/README.md @@ -59,9 +59,17 @@ explorePolicy = "random" # Default: 0 and 0 minInterval = "80ms" maxInterval = "3000ms" - # procResetSchedProbability is a probability for resetting process scheduling attributes (for Process inspector) - # Default: 0.1 (10%) - procResetSchedProbability = 0.1 + # for Process inspector, you can specify how to schedule processes + # "extreme": pick up some processes and execute them with SCHED_FF scheduler. others are executed with SCHED_IDLE scheduler. + # "dirichlet": execute processes with SCHED_DEADLINE scheduler. Dirichlet-distribution is used for deciding runtime values. + # Default: "extreme" + procPolicy = "extreme" + +[explorePolicyParam.procPolicyParam] + # prioritized is a number of processes that are highly prioritized. + # effective only when procPolicy is "extreme". + # Default: 3 + prioritized = 10 [container] # Default: false @@ -73,17 +81,32 @@ explorePolicy = "random" ``` For other parameters, please refer to [`config.go`](earthquake/util/config/config.go) and [`randompolicy.go`](earthquake/explorepolicy/random/randompolicy.go). -If you don't want to use containers, you can also use Earthquake with an arbitrary process tree. +If you don't want to use containers, you can also use Earthquake (process inspector) with an arbitrary process tree. $ go get github.com/osrg/earthquake/earthquake $ sudo earthquake inspectors proc -root-pid $TARGET_PID -watch-interval 1s -autopilot config.toml +For Ethernet inspector, + + $ iptables -A OUTPUT -p tcp -m owner --uid-owner $(id -u johndoe) -j NFQUEUE --queue-num 42 + $ sudo earthquake inspectors proc -nfq-number 42 -autopilot config.toml + $ sudo -u johndoe $TARGET_PROGRAM + $ iptables -D OUTPUT -p tcp -m owner --uid-owner $(id -u johndoe) -j NFQUEUE --queue-num 42 + +For Filesystem inspector, + + $ mkdir /tmp/{eqfs-orig,eqfs} + $ sudo earthquake inspectors fs -original-dir /tmp/eqfs-orig -mount-point /tmp/eqfs -autopilot config.toml + $ $TARGET_PROGRAM_WHICH_ACCESSES_TMP_EQFS + $ sudo fusermount -u /tmp/eqfs + For full-stack (fully-distributed) Earthquake environment, please refer to [doc/how-to-setup-env-full.md](doc/how-to-setup-env-full.md). [The slides for the presentation at FOSDEM](http://www.slideshare.net/AkihiroSuda/tackling-nondeterminism-in-hadoop-testing-and-debugging-distributed-systems-with-earthquake-57866497/42) might be also helpful. ## Talks + * [ApacheCon Core North America](http://events.linuxfoundation.org/events/apachecon-north-america/program/schedule) (May 11-13, 2016, Vancouver) * [FOSDEM](https://fosdem.org/2016/schedule/event/nondeterminism_in_hadoop/) (January 30-31, 2016, Brussels) * The poster session of [ACM Symposium on Cloud Computing (SoCC)](http://acmsocc.github.io/2015/) (August 27-29, 2015, Hawaii) @@ -148,5 +171,5 @@ func main(){ Please refer to [example/template](example/template) for further information. ## Known Limitation -After running Earthquake (process inspector) many times, `sched_setattr(2)` can fail with `EBUSY`. +After running Earthquake (process inspector with `exploreParam.procPolicyParam="dirichlet"`) many times, `sched_setattr(2)` can fail with `EBUSY`. This seems to be a bug of kernel; We're looking into this. diff --git a/earthquake/explorepolicy/random/randomproc.go b/earthquake/explorepolicy/random/dirichlet.go similarity index 69% rename from earthquake/explorepolicy/random/randomproc.go rename to earthquake/explorepolicy/random/dirichlet.go index 0fd8fbe..1dfb7a1 100644 --- a/earthquake/explorepolicy/random/randomproc.go +++ b/earthquake/explorepolicy/random/dirichlet.go @@ -16,7 +16,6 @@ package random import ( - "fmt" "github.com/AkihiroSuda/go-linuxsched" log "github.com/cihub/seelog" "github.com/leesper/go_rng" @@ -30,42 +29,33 @@ var ( drng = rng.NewDirichletGenerator(time.Now().UnixNano()) ) -func (r *Random) makeActionForProcSetEvent(event *signal.ProcSetEvent) (signal.Action, error) { - procs, err := r.parseProcSetEvent(event) +type dirichlet struct { + r *Random +} + +// implements procPolicyIntf +func (d *dirichlet) Action(event *signal.ProcSetEvent) (signal.Action, error) { + procs, err := parseProcSetEvent(event) if err != nil { return nil, err } - attrs := r.dirichletSchedDeadline(procs, time.Millisecond, 1.0) + attrs := d.dirichletSchedDeadline(procs, time.Millisecond, 1.0) for pidStr, attr := range attrs { log.Debugf("For PID=%s, setting Attr=%v", pidStr, attr) } return signal.NewProcSetSchedAction(event, attrs) } -// parses *ProcSetEvent and returns array of PIDs. -// -// due to JSON nature, we use string for PID representation. -func (r *Random) parseProcSetEvent(event *signal.ProcSetEvent) ([]string, error) { - option := event.Option() - procs, ok := option["procs"].([]string) - if !ok { - // FIXME: this may not work with REST endpoint. - // we need to convert []interface{} to []string here..? - return nil, fmt.Errorf("no procs? this should be an implementation error. event=%#v", event) - } - return procs, nil -} - // Returns map of linuxsched.SchedAttr{Policy: linuxsched.Deadline} for procs. // The runtime value is base * r * eff * numCPU, where r is dirichlet-distributed random numbers. // (we should improve this strategy.) // // due to JSON nature, we use string for PID representation. -func (r *Random) dirichletSchedDeadline(procs []string, base time.Duration, eff float64) map[string]linuxsched.SchedAttr { +func (d *dirichlet) dirichletSchedDeadline(procs []string, base time.Duration, eff float64) map[string]linuxsched.SchedAttr { attrs := make(map[string]linuxsched.SchedAttr, len(procs)) ratios := drng.FlatDirichlet(len(procs)) for i, pidStr := range procs { - if rand.Intn(999) < int(r.ProcResetSchedProbability*1000.0) { + if rand.Intn(999) < int(d.r.PPPDirichlet.ResetProbability*1000.0) { attrs[pidStr] = linuxsched.SchedAttr{ Policy: linuxsched.Normal, } diff --git a/earthquake/explorepolicy/random/extreme.go b/earthquake/explorepolicy/random/extreme.go new file mode 100644 index 0000000..ed1b880 --- /dev/null +++ b/earthquake/explorepolicy/random/extreme.go @@ -0,0 +1,63 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package random + +import ( + "github.com/AkihiroSuda/go-linuxsched" + log "github.com/cihub/seelog" + "github.com/osrg/earthquake/earthquake/signal" + "math/rand" +) + +type extreme struct { + r *Random +} + +// implements procPolicyIntf +func (e *extreme) Action(event *signal.ProcSetEvent) (signal.Action, error) { + procs, err := parseProcSetEvent(event) + if err != nil { + return nil, err + } + attrs := e.extremeSched(procs, e.r.PPPExtreme.Prioritized) + for pidStr, attr := range attrs { + log.Debugf("For PID=%s, setting Attr=%v", pidStr, attr) + } + return signal.NewProcSetSchedAction(event, attrs) +} + +// Returns map of linuxsched.SchedAttr{} for procs +// due to JSON nature, we use string for PID representation. +func (e *extreme) extremeSched(procs []string, nprio int) map[string]linuxsched.SchedAttr { + prios := make(map[int]bool) + for i := 0; i < nprio; i++ { + prios[int(rand.Int31n(int32(len(procs))))] = true + } + attrs := make(map[string]linuxsched.SchedAttr, len(procs)) + for i, pidStr := range procs { + if prios[i] { + attrs[pidStr] = linuxsched.SchedAttr{ + Policy: linuxsched.FIFO, + Priority: uint32(rand.Int31n(100)), + } + } else { + attrs[pidStr] = linuxsched.SchedAttr{ + Policy: linuxsched.Idle, + } + } + } + return attrs +} diff --git a/earthquake/explorepolicy/random/randompolicy.go b/earthquake/explorepolicy/random/randompolicy.go index 4aad27d..acd60a0 100644 --- a/earthquake/explorepolicy/random/randompolicy.go +++ b/earthquake/explorepolicy/random/randompolicy.go @@ -56,25 +56,53 @@ type Random struct { // parameter "faultActionProbability” FaultActionProbability float64 - // parameter "procResetSchedProbability” - ProcResetSchedProbability float64 + // parameter "procPolicy" + ProcPolicy string + + procPolicy procPolicyIntf + + // parameter "procPolicyParam" (for "extreme" procPolicy) + PPPExtreme pppExtreme + + // parameter "procPolicyParam" (for "dirichlet" procPolicy) + PPPDirichlet pppDirichlet +} + +type procPolicyIntf interface { + Action(event *signal.ProcSetEvent) (signal.Action, error) +} + +type pppExtreme struct { + // parameter "procPolicyParam.prioritized” + Prioritized int +} + +type pppDirichlet struct { + // parameter "procPolicyParam.resetProbability” + ResetProbability float64 } func New() *Random { nextActionChan := make(chan signal.Action) q := queue.NewBasicTBQueue() r := &Random{ - nextActionChan: nextActionChan, - queue: q, - queueDeqCh: q.GetDequeueChan(), - shelActionRoutineRunning: false, - MinInterval: time.Duration(0), - MaxInterval: time.Duration(0), - PrioritizedEntities: make(map[string]bool, 0), - ShellActionInterval: time.Duration(0), - ShellActionCommand: "", - FaultActionProbability: 0.0, - ProcResetSchedProbability: 0.1, + nextActionChan: nextActionChan, + queue: q, + queueDeqCh: q.GetDequeueChan(), + shelActionRoutineRunning: false, + MinInterval: time.Duration(0), + MaxInterval: time.Duration(0), + PrioritizedEntities: make(map[string]bool, 0), + ShellActionInterval: time.Duration(0), + ShellActionCommand: "", + FaultActionProbability: 0.0, + ProcPolicy: "extreme", + PPPExtreme: pppExtreme{ + Prioritized: 3, + }, + PPPDirichlet: pppDirichlet{ + ResetProbability: 0.1, + }, } go r.dequeueEventRoutine() return r @@ -102,7 +130,13 @@ func (this *Random) Name() string { // // - faultActionProbability(float64): probability (0.0-1.0) of PacketFaultAction/FilesystemFaultAction (default: 0.0) // -// - procResetSchedProbability(float64): probability (0.0-1.0) for resetting ProcSetSchedAction (default: 0.1) +// - procPolicy(string): "extreme", "dirichlet", .. +// +// - procPolicyParam(map[string]interface{}) for "extreme": +// -- prioritized: prioritized processes count +// +// - procPolicyParam(map[string]interface{}) for "dirichlet": +// -- resetProbability(float64): probability (0.0-1.0) for resetting ProcSetSchedAction (default: 0.1) // // should support dynamic reloading func (r *Random) LoadConfig(cfg config.Config) error { @@ -176,13 +210,39 @@ func (r *Random) LoadConfig(cfg config.Config) error { return fmt.Errorf("bad faultActionProbability %f", r.FaultActionProbability) } - paramProcResetSchedProbability := epp + "procResetSchedProbability" - if cfg.IsSet(paramProcResetSchedProbability) { - r.ProcResetSchedProbability = cfg.GetFloat64(paramProcResetSchedProbability) - log.Infof("Set procResetSchedProbability=%f", r.ProcResetSchedProbability) + return r.loadProcConfig(cfg) +} + +func (r *Random) loadProcConfig(cfg config.Config) error { + paramProcPolicy := cfg.GetString("explorepolicyparam.procPolicy") + if paramProcPolicy != "" { + r.ProcPolicy = paramProcPolicy } - if r.ProcResetSchedProbability < 0.0 || r.ProcResetSchedProbability > 1.0 { - return fmt.Errorf("bad procResetSchedProbability %f", r.ProcResetSchedProbability) + paramPrefix := "explorepolicyparam.procPolicyParam." + switch r.ProcPolicy { + case "extreme": + log.Infof("Set procPolicy=extreme") + // should we move the pppExtreme struct to extreme.go? + paramPrio := paramPrefix + "prioritized" + if cfg.IsSet(paramPrio) { + r.PPPExtreme.Prioritized = cfg.GetInt(paramPrio) + log.Infof("Set procPolicyParam.prioritized=%d", r.PPPExtreme.Prioritized) + } + r.procPolicy = &extreme{r: r} + case "dirichlet": + log.Infof("Set procPolicy=dirichlet") + // should we move the pppDirichlet struct to dirichlet.go? + paramResetProbability := paramPrefix + "resetProbability" + if cfg.IsSet(paramResetProbability) { + r.PPPDirichlet.ResetProbability = cfg.GetFloat64(paramResetProbability) + log.Infof("Set procPolicyParam.resetProbability=%f", r.PPPDirichlet.ResetProbability) + } + if r.PPPDirichlet.ResetProbability < 0.0 || r.PPPDirichlet.ResetProbability > 1.0 { + return fmt.Errorf("bad procPolicyParam.resetProbability %f", r.PPPDirichlet.ResetProbability) + } + r.procPolicy = &dirichlet{r: r} + default: + return fmt.Errorf("bad procPolicy %s", r.ProcPolicy) } return nil } @@ -218,7 +278,7 @@ func (r *Random) shellFaultInjectionRoutine() { func (r *Random) makeActionForEvent(event signal.Event) (signal.Action, error) { switch event.(type) { case *signal.ProcSetEvent: - return r.makeActionForProcSetEvent(event.(*signal.ProcSetEvent)) + return r.procPolicy.Action(event.(*signal.ProcSetEvent)) } defaultAction, defaultActionErr := event.DefaultAction() faultAction, faultActionErr := event.DefaultFaultAction() diff --git a/earthquake/explorepolicy/random/randompolicy_test.go b/earthquake/explorepolicy/random/randompolicy_test.go index 03e6fb6..432cb87 100644 --- a/earthquake/explorepolicy/random/randompolicy_test.go +++ b/earthquake/explorepolicy/random/randompolicy_test.go @@ -71,7 +71,7 @@ explorePolicy = "random" assert.Zero(t, policy.ShellActionInterval) assert.Empty(t, policy.ShellActionCommand) assert.True(t, policy.FaultActionProbability < 0.01) - assert.True(t, policy.ProcResetSchedProbability > 0.09) + assert.True(t, policy.PPPDirichlet.ResetProbability > 0.09) badPolicyNameAllowedForExtensibility := ` explorePolicy = "randomBADBAD" @@ -82,8 +82,11 @@ explorePolicy = "randomBADBAD" shellActionInterval = "10s" shellActionCommand = "echo hello world" faultActionProbability = 0.1 - procResetSchedProbability = 0.0 thisParameterDoesNotExistButShouldNotMatter = 42 + procPolicy = "dirichlet" + +[explorePolicyParam.procPolicyParam] + resetProbability = 0.0 ` policy, err = newPolicyFromConfigString(badPolicyNameAllowedForExtensibility, "toml") assert.NoError(t, err) @@ -94,7 +97,8 @@ explorePolicy = "randomBADBAD" assert.Equal(t, policy.ShellActionInterval, 10*time.Second) assert.Equal(t, policy.ShellActionCommand, "echo hello world") assert.True(t, policy.FaultActionProbability > 0.09) - assert.True(t, policy.ProcResetSchedProbability < 0.01) + assert.Equal(t, policy.ProcPolicy, "dirichlet") + assert.True(t, policy.PPPDirichlet.ResetProbability < 0.01) } func TestRandomPolicyWithPacketEvent_10_2(t *testing.T) { @@ -113,15 +117,18 @@ func TestRandomPolicyShouldNotBlockWithPacketEvent_10_10(t *testing.T) { tester.XTestPolicyWithPacketEvent(t, newPolicy(t), 10, 10, false) } -func TestRandomPolicyWithProcEvent_100(t *testing.T) { - testRandomPolicyWithProcEvent(t, 100) +func TestRandomPolicyDirichlet_100(t *testing.T) { + testRandomPolicyDirichlet(t, 100) } -func testRandomPolicyWithProcEvent(t *testing.T, n int) { +func testRandomPolicyDirichlet(t *testing.T, n int) { cfg := ` explorePolicy = "random" [explorePolicyParam] - procResetSchedProbability = 0.5 + procPolicy = "dirichlet" + +[explorePolicyParam.procPolicyParam] + resetProbability = 0.5 ` policy, err := newPolicyFromConfigString(cfg, "toml") assert.NoError(t, err) @@ -157,3 +164,35 @@ explorePolicy = "random" assert.True(t, deadlineCount > 0) assert.True(t, othersCount == 0) } + +func TestRandomPolicyExtreme_100(t *testing.T) { + testRandomPolicyExtreme(t, 100) +} + +func testRandomPolicyExtreme(t *testing.T, n int) { + assert.True(t, n > 10) + cfg := ` +explorePolicy = "random" +[explorePolicyParam] + procPolicy = "extreme" + +[explorePolicyParam.procPolicyParam] + prioritized = 4 +` + policy, err := newPolicyFromConfigString(cfg, "toml") + assert.NoError(t, err) + actionChan := policy.ActionChan() + procs := make([]string, n) + for i := 0; i < n; i++ { + procs[i] = fmt.Sprintf("%d", i) + } + event, err := signal.NewProcSetEvent("foo", + procs, + map[string]interface{}{}) + assert.NoError(t, err) + policy.QueueEvent(event) + action := <-actionChan + option := action.JSONMap()["option"].(map[string]interface{}) + attrs := option["attrs"].(map[string]linuxsched.SchedAttr) + assert.NotNil(t, attrs) +} diff --git a/earthquake/explorepolicy/random/util.go b/earthquake/explorepolicy/random/util.go new file mode 100644 index 0000000..febd358 --- /dev/null +++ b/earthquake/explorepolicy/random/util.go @@ -0,0 +1,35 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package random + +import ( + "fmt" + "github.com/osrg/earthquake/earthquake/signal" +) + +// parses *ProcSetEvent and returns array of PIDs. +// +// due to JSON nature, we use string for PID representation. +func parseProcSetEvent(event *signal.ProcSetEvent) ([]string, error) { + option := event.Option() + procs, ok := option["procs"].([]string) + if !ok { + // FIXME: this may not work with REST endpoint. + // we need to convert []interface{} to []string here..? + return nil, fmt.Errorf("no procs? this should be an implementation error. event=%#v", event) + } + return procs, nil +}