Skip to content

Commit

Permalink
Add annotation to input channels for an additional modicum of safety
Browse files Browse the repository at this point in the history
  • Loading branch information
sbogacz committed Jan 14, 2017
1 parent 376c07c commit afdaf45
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
6 changes: 3 additions & 3 deletions definitions.go
Expand Up @@ -5,16 +5,16 @@ import "sync"
// Runner interface exposes functions that take in a chan interface{}
// and outputs to a chan interface{}
type Runner interface {
Run(chan interface{}) chan interface{}
Run(<-chan interface{}) chan interface{}
}

// Operator aliases a function that takes one input channel and one output channel
type Operator func(chan interface{}, chan interface{})
type Operator func(<-chan interface{}, chan interface{})

// Run takes an input channel, and a series of operators, and uses the output
// of each successive operator as the input for the next. This makes the Operator
// implement the Runner interface
func (o Operator) Run(in chan interface{}) chan interface{} {
func (o Operator) Run(in <-chan interface{}) chan interface{} {
out := make(chan interface{})
go func() {
o(in, out)
Expand Down
8 changes: 4 additions & 4 deletions definitions_test.go
Expand Up @@ -60,15 +60,15 @@ func TestPipeline(t *testing.T) {
}

func multiplier(x int) Operator {
return Operator(func(in chan interface{}, out chan interface{}) {
return Operator(func(in <-chan interface{}, out chan interface{}) {
for m := range in {
n := m.(int)
out <- (int(n) * x)
}
})
}

var ifOdd = Operator(func(in chan interface{}, out chan interface{}) {
var ifOdd = Operator(func(in <-chan interface{}, out chan interface{}) {
for m := range in {
n := m.(int)
if n%2 == 1 {
Expand All @@ -77,7 +77,7 @@ var ifOdd = Operator(func(in chan interface{}, out chan interface{}) {
}
})

var ifEven = Operator(func(in chan interface{}, out chan interface{}) {
var ifEven = Operator(func(in <-chan interface{}, out chan interface{}) {
for m := range in {
n := m.(int)
if n%2 == 0 {
Expand All @@ -86,7 +86,7 @@ var ifEven = Operator(func(in chan interface{}, out chan interface{}) {
}
})

var summer = Operator(func(in chan interface{}, out chan interface{}) {
var summer = Operator(func(in <-chan interface{}, out chan interface{}) {
total := 0
for m := range in {
n := m.(int)
Expand Down
12 changes: 6 additions & 6 deletions examples_test.go
Expand Up @@ -38,7 +38,7 @@ func ExampleFlow() {
// multiplier takes an int and returns an Operator which multiplies the
// input by the given int
multiplier := func(x int) pipeline.Operator {
return pipeline.Operator(func(in chan interface{}, out chan interface{}) {
return pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
for m := range in {
n := m.(int)
out <- (int(n) * x)
Expand All @@ -47,7 +47,7 @@ func ExampleFlow() {
}

// ifEven is an operator which filters out odds and passes evens through
ifEven := pipeline.Operator(func(in chan interface{}, out chan interface{}) {
ifEven := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
for m := range in {
n := m.(int)
if n%2 == 0 {
Expand All @@ -57,7 +57,7 @@ func ExampleFlow() {
})

// ifOdd is an operator which filters out evens and passes odds through
ifOdd := pipeline.Operator(func(in chan interface{}, out chan interface{}) {
ifOdd := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
for m := range in {
n := m.(int)
if n%2 == 1 {
Expand All @@ -68,7 +68,7 @@ func ExampleFlow() {

// summer is an operator which aggregates input integers, and outputs the
// total once the input channel closes
summer := pipeline.Operator(func(in chan interface{}, out chan interface{}) {
summer := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
total := 0
for m := range in {
n := m.(int)
Expand Down Expand Up @@ -114,15 +114,15 @@ func ExampleFlow_wordCount() {

// wordCount is an operator that takes in strings (words) and emits a tuple
// of (word, 1)
wordCount := pipeline.Operator(func(in chan interface{}, out chan interface{}) {
wordCount := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
for word := range in {
out <- tuple{word.(string), 1}
}
})

// countAggregator takes in tuples and aggregates their counts. Outputs
// the word and count output as a string.
countAggregator := pipeline.Operator(func(in chan interface{}, out chan interface{}) {
countAggregator := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
counts := make(map[string]int)
for t := range in {
counts[t.(tuple).token] += t.(tuple).count
Expand Down
2 changes: 1 addition & 1 deletion ratelimiter.go
Expand Up @@ -10,7 +10,7 @@ import (
// of the given `rate.Limiter`. Passing the limiter in allows you to share it
// across multiple instances of this Operator.
func RateLimiter(l *rate.Limiter) Operator {
return func(in chan interface{}, out chan interface{}) {
return func(in <-chan interface{}, out chan interface{}) {
for n := range in {
_ = l.Wait(context.Background())
out <- n
Expand Down

0 comments on commit afdaf45

Please sign in to comment.