Skip to content
This repository has been archived by the owner on Dec 13, 2021. It is now read-only.

Commit

Permalink
Merge fb5ffed into afa486b
Browse files Browse the repository at this point in the history
  • Loading branch information
AkihiroSuda committed Mar 10, 2016
2 parents afa486b + fb5ffed commit 71de9dd
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 53 deletions.
33 changes: 28 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,17 @@ explorePolicy = "random"
# Default: 0 and 0
minInterval = "80ms"
maxInterval = "3000ms"
# procResetSchedProbability is a probability for resetting process scheduling attributes (for Process inspector)
# Default: 0.1 (10%)
procResetSchedProbability = 0.1
# for Process inspector, you can specify how to schedule processes
# "extreme": pick up some processes and execute them with SCHED_FF scheduler. others are executed with SCHED_IDLE scheduler.
# "dirichlet": execute processes with SCHED_DEADLINE scheduler. Dirichlet-distribution is used for deciding runtime values.
# Default: "extreme"
procPolicy = "extreme"

[explorePolicyParam.procPolicyParam]
# prioritized is a number of processes that are highly prioritized.
# effective only when procPolicy is "extreme".
# Default: 3
prioritized = 10

[container]
# Default: false
Expand All @@ -73,17 +81,32 @@ explorePolicy = "random"
```
For other parameters, please refer to [`config.go`](earthquake/util/config/config.go) and [`randompolicy.go`](earthquake/explorepolicy/random/randompolicy.go).

If you don't want to use containers, you can also use Earthquake with an arbitrary process tree.
If you don't want to use containers, you can also use Earthquake (process inspector) with an arbitrary process tree.

$ go get github.com/osrg/earthquake/earthquake
$ sudo earthquake inspectors proc -root-pid $TARGET_PID -watch-interval 1s -autopilot config.toml

For Ethernet inspector,

$ iptables -A OUTPUT -p tcp -m owner --uid-owner $(id -u johndoe) -j NFQUEUE --queue-num 42
$ sudo earthquake inspectors proc -nfq-number 42 -autopilot config.toml
$ sudo -u johndoe $TARGET_PROGRAM
$ iptables -D OUTPUT -p tcp -m owner --uid-owner $(id -u johndoe) -j NFQUEUE --queue-num 42

For Filesystem inspector,

$ mkdir /tmp/{eqfs-orig,eqfs}
$ sudo earthquake inspectors fs -original-dir /tmp/eqfs-orig -mount-point /tmp/eqfs -autopilot config.toml
$ $TARGET_PROGRAM_WHICH_ACCESSES_TMP_EQFS
$ sudo fusermount -u /tmp/eqfs

For full-stack (fully-distributed) Earthquake environment, please refer to [doc/how-to-setup-env-full.md](doc/how-to-setup-env-full.md).

[The slides for the presentation at FOSDEM](http://www.slideshare.net/AkihiroSuda/tackling-nondeterminism-in-hadoop-testing-and-debugging-distributed-systems-with-earthquake-57866497/42) might be also helpful.

## Talks

* [ApacheCon Core North America](http://events.linuxfoundation.org/events/apachecon-north-america/program/schedule) (May 11-13, 2016, Vancouver)
* [FOSDEM](https://fosdem.org/2016/schedule/event/nondeterminism_in_hadoop/) (January 30-31, 2016, Brussels)
* The poster session of [ACM Symposium on Cloud Computing (SoCC)](http://acmsocc.github.io/2015/) (August 27-29, 2015, Hawaii)

Expand Down Expand Up @@ -148,5 +171,5 @@ func main(){
Please refer to [example/template](example/template) for further information.

## Known Limitation
After running Earthquake (process inspector) many times, `sched_setattr(2)` can fail with `EBUSY`.
After running Earthquake (process inspector with `exploreParam.procPolicyParam="dirichlet"`) many times, `sched_setattr(2)` can fail with `EBUSY`.
This seems to be a bug of kernel; We're looking into this.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package random

import (
"fmt"
"github.com/AkihiroSuda/go-linuxsched"
log "github.com/cihub/seelog"
"github.com/leesper/go_rng"
Expand All @@ -30,42 +29,33 @@ var (
drng = rng.NewDirichletGenerator(time.Now().UnixNano())
)

func (r *Random) makeActionForProcSetEvent(event *signal.ProcSetEvent) (signal.Action, error) {
procs, err := r.parseProcSetEvent(event)
type dirichlet struct {
r *Random
}

// implements procPolicyIntf
func (d *dirichlet) Action(event *signal.ProcSetEvent) (signal.Action, error) {
procs, err := parseProcSetEvent(event)
if err != nil {
return nil, err
}
attrs := r.dirichletSchedDeadline(procs, time.Millisecond, 1.0)
attrs := d.dirichletSchedDeadline(procs, time.Millisecond, 1.0)
for pidStr, attr := range attrs {
log.Debugf("For PID=%s, setting Attr=%v", pidStr, attr)
}
return signal.NewProcSetSchedAction(event, attrs)
}

// parses *ProcSetEvent and returns array of PIDs.
//
// due to JSON nature, we use string for PID representation.
func (r *Random) parseProcSetEvent(event *signal.ProcSetEvent) ([]string, error) {
option := event.Option()
procs, ok := option["procs"].([]string)
if !ok {
// FIXME: this may not work with REST endpoint.
// we need to convert []interface{} to []string here..?
return nil, fmt.Errorf("no procs? this should be an implementation error. event=%#v", event)
}
return procs, nil
}

// Returns map of linuxsched.SchedAttr{Policy: linuxsched.Deadline} for procs.
// The runtime value is base * r * eff * numCPU, where r is dirichlet-distributed random numbers.
// (we should improve this strategy.)
//
// due to JSON nature, we use string for PID representation.
func (r *Random) dirichletSchedDeadline(procs []string, base time.Duration, eff float64) map[string]linuxsched.SchedAttr {
func (d *dirichlet) dirichletSchedDeadline(procs []string, base time.Duration, eff float64) map[string]linuxsched.SchedAttr {
attrs := make(map[string]linuxsched.SchedAttr, len(procs))
ratios := drng.FlatDirichlet(len(procs))
for i, pidStr := range procs {
if rand.Intn(999) < int(r.ProcResetSchedProbability*1000.0) {
if rand.Intn(999) < int(d.r.PPPDirichlet.ResetProbability*1000.0) {
attrs[pidStr] = linuxsched.SchedAttr{
Policy: linuxsched.Normal,
}
Expand Down
63 changes: 63 additions & 0 deletions earthquake/explorepolicy/random/extreme.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package random

import (
"github.com/AkihiroSuda/go-linuxsched"
log "github.com/cihub/seelog"
"github.com/osrg/earthquake/earthquake/signal"
"math/rand"
)

type extreme struct {
r *Random
}

// implements procPolicyIntf
func (e *extreme) Action(event *signal.ProcSetEvent) (signal.Action, error) {
procs, err := parseProcSetEvent(event)
if err != nil {
return nil, err
}
attrs := e.extremeSched(procs, e.r.PPPExtreme.Prioritized)
for pidStr, attr := range attrs {
log.Debugf("For PID=%s, setting Attr=%v", pidStr, attr)
}
return signal.NewProcSetSchedAction(event, attrs)
}

// Returns map of linuxsched.SchedAttr{} for procs
// due to JSON nature, we use string for PID representation.
func (e *extreme) extremeSched(procs []string, nprio int) map[string]linuxsched.SchedAttr {
prios := make(map[int]bool)
for i := 0; i < nprio; i++ {
prios[int(rand.Int31n(int32(len(procs))))] = true
}
attrs := make(map[string]linuxsched.SchedAttr, len(procs))
for i, pidStr := range procs {
if prios[i] {
attrs[pidStr] = linuxsched.SchedAttr{
Policy: linuxsched.FIFO,
Priority: uint32(rand.Int31n(100)),
}
} else {
attrs[pidStr] = linuxsched.SchedAttr{
Policy: linuxsched.Idle,
}
}
}
return attrs
}
102 changes: 81 additions & 21 deletions earthquake/explorepolicy/random/randompolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,53 @@ type Random struct {
// parameter "faultActionProbability”
FaultActionProbability float64

// parameter "procResetSchedProbability”
ProcResetSchedProbability float64
// parameter "procPolicy"
ProcPolicy string

procPolicy procPolicyIntf

// parameter "procPolicyParam" (for "extreme" procPolicy)
PPPExtreme pppExtreme

// parameter "procPolicyParam" (for "dirichlet" procPolicy)
PPPDirichlet pppDirichlet
}

type procPolicyIntf interface {
Action(event *signal.ProcSetEvent) (signal.Action, error)
}

type pppExtreme struct {
// parameter "procPolicyParam.prioritized”
Prioritized int
}

type pppDirichlet struct {
// parameter "procPolicyParam.resetProbability”
ResetProbability float64
}

func New() *Random {
nextActionChan := make(chan signal.Action)
q := queue.NewBasicTBQueue()
r := &Random{
nextActionChan: nextActionChan,
queue: q,
queueDeqCh: q.GetDequeueChan(),
shelActionRoutineRunning: false,
MinInterval: time.Duration(0),
MaxInterval: time.Duration(0),
PrioritizedEntities: make(map[string]bool, 0),
ShellActionInterval: time.Duration(0),
ShellActionCommand: "",
FaultActionProbability: 0.0,
ProcResetSchedProbability: 0.1,
nextActionChan: nextActionChan,
queue: q,
queueDeqCh: q.GetDequeueChan(),
shelActionRoutineRunning: false,
MinInterval: time.Duration(0),
MaxInterval: time.Duration(0),
PrioritizedEntities: make(map[string]bool, 0),
ShellActionInterval: time.Duration(0),
ShellActionCommand: "",
FaultActionProbability: 0.0,
ProcPolicy: "extreme",
PPPExtreme: pppExtreme{
Prioritized: 3,
},
PPPDirichlet: pppDirichlet{
ResetProbability: 0.1,
},
}
go r.dequeueEventRoutine()
return r
Expand Down Expand Up @@ -102,7 +130,13 @@ func (this *Random) Name() string {
//
// - faultActionProbability(float64): probability (0.0-1.0) of PacketFaultAction/FilesystemFaultAction (default: 0.0)
//
// - procResetSchedProbability(float64): probability (0.0-1.0) for resetting ProcSetSchedAction (default: 0.1)
// - procPolicy(string): "extreme", "dirichlet", ..
//
// - procPolicyParam(map[string]interface{}) for "extreme":
// -- prioritized: prioritized processes count
//
// - procPolicyParam(map[string]interface{}) for "dirichlet":
// -- resetProbability(float64): probability (0.0-1.0) for resetting ProcSetSchedAction (default: 0.1)
//
// should support dynamic reloading
func (r *Random) LoadConfig(cfg config.Config) error {
Expand Down Expand Up @@ -176,13 +210,39 @@ func (r *Random) LoadConfig(cfg config.Config) error {
return fmt.Errorf("bad faultActionProbability %f", r.FaultActionProbability)
}

paramProcResetSchedProbability := epp + "procResetSchedProbability"
if cfg.IsSet(paramProcResetSchedProbability) {
r.ProcResetSchedProbability = cfg.GetFloat64(paramProcResetSchedProbability)
log.Infof("Set procResetSchedProbability=%f", r.ProcResetSchedProbability)
return r.loadProcConfig(cfg)
}

func (r *Random) loadProcConfig(cfg config.Config) error {
paramProcPolicy := cfg.GetString("explorepolicyparam.procPolicy")
if paramProcPolicy != "" {
r.ProcPolicy = paramProcPolicy
}
if r.ProcResetSchedProbability < 0.0 || r.ProcResetSchedProbability > 1.0 {
return fmt.Errorf("bad procResetSchedProbability %f", r.ProcResetSchedProbability)
paramPrefix := "explorepolicyparam.procPolicyParam."
switch r.ProcPolicy {
case "extreme":
log.Infof("Set procPolicy=extreme")
// should we move the pppExtreme struct to extreme.go?
paramPrio := paramPrefix + "prioritized"
if cfg.IsSet(paramPrio) {
r.PPPExtreme.Prioritized = cfg.GetInt(paramPrio)
log.Infof("Set procPolicyParam.prioritized=%d", r.PPPExtreme.Prioritized)
}
r.procPolicy = &extreme{r: r}
case "dirichlet":
log.Infof("Set procPolicy=dirichlet")
// should we move the pppDirichlet struct to dirichlet.go?
paramResetProbability := paramPrefix + "resetProbability"
if cfg.IsSet(paramResetProbability) {
r.PPPDirichlet.ResetProbability = cfg.GetFloat64(paramResetProbability)
log.Infof("Set procPolicyParam.resetProbability=%f", r.PPPDirichlet.ResetProbability)
}
if r.PPPDirichlet.ResetProbability < 0.0 || r.PPPDirichlet.ResetProbability > 1.0 {
return fmt.Errorf("bad procPolicyParam.resetProbability %f", r.PPPDirichlet.ResetProbability)
}
r.procPolicy = &dirichlet{r: r}
default:
return fmt.Errorf("bad procPolicy %s", r.ProcPolicy)
}
return nil
}
Expand Down Expand Up @@ -218,7 +278,7 @@ func (r *Random) shellFaultInjectionRoutine() {
func (r *Random) makeActionForEvent(event signal.Event) (signal.Action, error) {
switch event.(type) {
case *signal.ProcSetEvent:
return r.makeActionForProcSetEvent(event.(*signal.ProcSetEvent))
return r.procPolicy.Action(event.(*signal.ProcSetEvent))
}
defaultAction, defaultActionErr := event.DefaultAction()
faultAction, faultActionErr := event.DefaultFaultAction()
Expand Down
Loading

0 comments on commit 71de9dd

Please sign in to comment.