From afdaf45b53131fe23863cc7bc048563b9ccf3495 Mon Sep 17 00:00:00 2001 From: sbogacz Date: Sat, 14 Jan 2017 01:02:31 -0700 Subject: [PATCH] Add annotation to input channels for an additional modicum of safety --- definitions.go | 6 +++--- definitions_test.go | 8 ++++---- examples_test.go | 12 ++++++------ ratelimiter.go | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/definitions.go b/definitions.go index 2ea9cbc..0f8dc88 100644 --- a/definitions.go +++ b/definitions.go @@ -5,16 +5,16 @@ import "sync" // Runner interface exposes functions that take in a chan interface{} // and outputs to a chan interface{} type Runner interface { - Run(chan interface{}) chan interface{} + Run(<-chan interface{}) chan interface{} } // Operator aliases a function that takes one input channel and one output channel -type Operator func(chan interface{}, chan interface{}) +type Operator func(<-chan interface{}, chan interface{}) // Run takes an input channel, and a series of operators, and uses the output // of each successive operator as the input for the next. This makes the Operator // implement the Runner interface -func (o Operator) Run(in chan interface{}) chan interface{} { +func (o Operator) Run(in <-chan interface{}) chan interface{} { out := make(chan interface{}) go func() { o(in, out) diff --git a/definitions_test.go b/definitions_test.go index a435380..1af9950 100644 --- a/definitions_test.go +++ b/definitions_test.go @@ -60,7 +60,7 @@ func TestPipeline(t *testing.T) { } func multiplier(x int) Operator { - return Operator(func(in chan interface{}, out chan interface{}) { + return Operator(func(in <-chan interface{}, out chan interface{}) { for m := range in { n := m.(int) out <- (int(n) * x) @@ -68,7 +68,7 @@ func multiplier(x int) Operator { }) } -var ifOdd = Operator(func(in chan interface{}, out chan interface{}) { +var ifOdd = Operator(func(in <-chan interface{}, out chan interface{}) { for m := range in { n := m.(int) if n%2 == 1 { @@ -77,7 +77,7 @@ var ifOdd = Operator(func(in chan interface{}, out chan interface{}) { } }) -var ifEven = Operator(func(in chan interface{}, out chan interface{}) { +var ifEven = Operator(func(in <-chan interface{}, out chan interface{}) { for m := range in { n := m.(int) if n%2 == 0 { @@ -86,7 +86,7 @@ var ifEven = Operator(func(in chan interface{}, out chan interface{}) { } }) -var summer = Operator(func(in chan interface{}, out chan interface{}) { +var summer = Operator(func(in <-chan interface{}, out chan interface{}) { total := 0 for m := range in { n := m.(int) diff --git a/examples_test.go b/examples_test.go index a2acc19..3b1d193 100644 --- a/examples_test.go +++ b/examples_test.go @@ -38,7 +38,7 @@ func ExampleFlow() { // multiplier takes an int and returns an Operator which multiplies the // input by the given int multiplier := func(x int) pipeline.Operator { - return pipeline.Operator(func(in chan interface{}, out chan interface{}) { + return pipeline.Operator(func(in <-chan interface{}, out chan interface{}) { for m := range in { n := m.(int) out <- (int(n) * x) @@ -47,7 +47,7 @@ func ExampleFlow() { } // ifEven is an operator which filters out odds and passes evens through - ifEven := pipeline.Operator(func(in chan interface{}, out chan interface{}) { + ifEven := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) { for m := range in { n := m.(int) if n%2 == 0 { @@ -57,7 +57,7 @@ func ExampleFlow() { }) // ifOdd is an operator which filters out evens and passes odds through - ifOdd := pipeline.Operator(func(in chan interface{}, out chan interface{}) { + ifOdd := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) { for m := range in { n := m.(int) if n%2 == 1 { @@ -68,7 +68,7 @@ func ExampleFlow() { // summer is an operator which aggregates input integers, and outputs the // total once the input channel closes - summer := pipeline.Operator(func(in chan interface{}, out chan interface{}) { + summer := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) { total := 0 for m := range in { n := m.(int) @@ -114,7 +114,7 @@ func ExampleFlow_wordCount() { // wordCount is an operator that takes in strings (words) and emits a tuple // of (word, 1) - wordCount := pipeline.Operator(func(in chan interface{}, out chan interface{}) { + wordCount := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) { for word := range in { out <- tuple{word.(string), 1} } @@ -122,7 +122,7 @@ func ExampleFlow_wordCount() { // countAggregator takes in tuples and aggregates their counts. Outputs // the word and count output as a string. - countAggregator := pipeline.Operator(func(in chan interface{}, out chan interface{}) { + countAggregator := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) { counts := make(map[string]int) for t := range in { counts[t.(tuple).token] += t.(tuple).count diff --git a/ratelimiter.go b/ratelimiter.go index e888279..2b57ef9 100644 --- a/ratelimiter.go +++ b/ratelimiter.go @@ -10,7 +10,7 @@ import ( // of the given `rate.Limiter`. Passing the limiter in allows you to share it // across multiple instances of this Operator. func RateLimiter(l *rate.Limiter) Operator { - return func(in chan interface{}, out chan interface{}) { + return func(in <-chan interface{}, out chan interface{}) { for n := range in { _ = l.Wait(context.Background()) out <- n