Skip to content

Commit

Permalink
fix: increase times in concurrency tests to increase stability
Browse files Browse the repository at this point in the history
  • Loading branch information
peterlgh7 committed Jun 15, 2023
1 parent 30f8171 commit e73936b
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
8 changes: 4 additions & 4 deletions filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestFilter(t *testing.T) {
start := time.Now()
channel := jpipe.FromSlice(pipeline, []int{1, 2, 3, 4, 5}).
Filter(func(i int) bool {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
return i%2 == 1
}, jpipe.Concurrent(3))

Expand All @@ -49,7 +49,7 @@ func TestFilter(t *testing.T) {

slices.Sort(mappedValues) // The output order with concurrency is unpredictable
assert.Equal(t, []int{1, 3, 5}, mappedValues)
assert.Less(t, elapsed, 30*time.Millisecond) // It would have taken 50ms serially, but it takes about 20ms with 5 elements and concurrency 3
assert.Less(t, elapsed, 300*time.Millisecond) // It would have taken 500ms serially, but it takes about 200ms with 5 elements and concurrency 3
assertPipelineDone(t, pipeline, 10*time.Millisecond)
})

Expand All @@ -58,15 +58,15 @@ func TestFilter(t *testing.T) {
start := time.Now()
channel := jpipe.FromSlice(pipeline, []int{1, 2, 3, 4, 5}).
Filter(func(i int) bool {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
return i%2 == 1
}, jpipe.Concurrent(3), jpipe.Ordered(3))

mappedValues := drainChannel(channel)
elapsed := time.Since(start)

assert.Equal(t, []int{1, 3, 5}, mappedValues)
assert.Less(t, elapsed, 30*time.Millisecond) // It would have taken 50ms serially, but it takes about 20ms with 5 elements and concurrency 3
assert.Less(t, elapsed, 300*time.Millisecond) // It would have taken 500ms serially, but it takes about 200ms with 5 elements and concurrency 3
assertPipelineDone(t, pipeline, 10*time.Millisecond)
})

Expand Down
4 changes: 2 additions & 2 deletions sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestForEach(t *testing.T) {
lock := sync.Mutex{}
start := time.Now()
<-channel.ForEach(func(value int) {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
lock.Lock()
values = append(values, value)
lock.Unlock()
Expand All @@ -59,7 +59,7 @@ func TestForEach(t *testing.T) {

slices.Sort(values) // The output order with concurrency is unpredictable
assert.Equal(t, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, values)
assert.Less(t, elapsed, 40*time.Millisecond) // It would have taken 100ms serially, but it takes about 20ms with 10 elements and concurrency 5
assert.Less(t, elapsed, 400*time.Millisecond) // It would have taken 1000ms serially, but it takes about 200ms with 10 elements and concurrency 5
assert.NoError(t, pipeline.Error())
assertPipelineDone(t, pipeline, 10*time.Millisecond)
})
Expand Down
12 changes: 6 additions & 6 deletions transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMap(t *testing.T) {
channel := jpipe.FromSlice(pipeline, []int{1, 2, 3, 4, 5})
start := time.Now()
mappedChannel := jpipe.Map(channel, func(i int) string {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("%dA", i)
}, jpipe.Concurrent(3))

Expand All @@ -51,7 +51,7 @@ func TestMap(t *testing.T) {

slices.Sort(mappedValues) // The output order with concurrency is unpredictable
assert.Equal(t, []string{"1A", "2A", "3A", "4A", "5A"}, mappedValues)
assert.Less(t, elapsed, 30*time.Millisecond) // It would have taken 50ms serially, but it takes about 20ms with 5 elements and concurrency 3
assert.Less(t, elapsed, 300*time.Millisecond) // It would have taken 500ms serially, but it takes about 200ms with 5 elements and concurrency 3
assertPipelineDone(t, pipeline, 10*time.Millisecond)
})

Expand All @@ -60,15 +60,15 @@ func TestMap(t *testing.T) {
channel := jpipe.FromSlice(pipeline, []int{1, 2, 3, 4, 5})
start := time.Now()
mappedChannel := jpipe.Map(channel, func(i int) string {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("%dA", i)
}, jpipe.Concurrent(3), jpipe.Ordered(3))

mappedValues := drainChannel(mappedChannel)
elapsed := time.Since(start)

assert.Equal(t, []string{"1A", "2A", "3A", "4A", "5A"}, mappedValues)
assert.Less(t, elapsed, 30*time.Millisecond) // It would have taken 50ms serially, but it takes about 20ms with 5 elements and concurrency 3
assert.Less(t, elapsed, 300*time.Millisecond) // It would have taken 500ms serially, but it takes about 200ms with 5 elements and concurrency 3
assertPipelineDone(t, pipeline, 10*time.Millisecond)
})

Expand Down Expand Up @@ -124,7 +124,7 @@ func TestFlatMap(t *testing.T) {
channel := jpipe.FromSlice(pipeline, []int{1, 2, 3, 4, 5})
start := time.Now()
mappedChannel := jpipe.FlatMap(channel, func(i int) *jpipe.Channel[string] {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
return jpipe.FromSlice(pipeline, []string{fmt.Sprintf("%dA", i), fmt.Sprintf("%dB", i)})
}, jpipe.Concurrent(3))

Expand All @@ -133,7 +133,7 @@ func TestFlatMap(t *testing.T) {

slices.Sort(mappedValues) // The output order with concurrency is unpredictable
assert.Equal(t, []string{"1A", "1B", "2A", "2B", "3A", "3B", "4A", "4B", "5A", "5B"}, mappedValues)
assert.Less(t, elapsed, 30*time.Millisecond) // It would have taken 50ms serially, but it takes about 20ms with 5 elements and concurrency 3
assert.Less(t, elapsed, 300*time.Millisecond) // It would have taken 500ms serially, but it takes about 200ms with 5 elements and concurrency 3
assertPipelineDone(t, pipeline, 10*time.Millisecond)
})
}
Expand Down
8 changes: 4 additions & 4 deletions utility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestTap(t *testing.T) {
start := time.Now()
channel := jpipe.FromSlice(pipeline, slice).
Tap(func(i int) {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
valuesFromTap[i-1] = i // not using append to avoid the need for synchronization
}, jpipe.Concurrent(3))

Expand All @@ -112,7 +112,7 @@ func TestTap(t *testing.T) {
slices.Sort(outputValues) // The output order with concurrency is unpredictable
assert.Equal(t, slice, valuesFromTap)
assert.Equal(t, slice, outputValues)
assert.Less(t, elapsed, 30*time.Millisecond) // It would have taken 50ms serially, but it takes about 20ms with 5 elements and concurrency 3
assert.Less(t, elapsed, 300*time.Millisecond) // It would have taken 500ms serially, but it takes about 200ms with 5 elements and concurrency 3
assertPipelineDone(t, pipeline, 10*time.Millisecond)
})

Expand All @@ -123,7 +123,7 @@ func TestTap(t *testing.T) {
start := time.Now()
channel := jpipe.FromSlice(pipeline, slice).
Tap(func(i int) {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
valuesFromTap[i-1] = i // not using append to avoid the need for synchronization
}, jpipe.Concurrent(3), jpipe.Ordered(3))

Expand All @@ -133,7 +133,7 @@ func TestTap(t *testing.T) {
slices.Sort(valuesFromTap) // The tap execution order with concurrency is unpredictable
assert.Equal(t, slice, valuesFromTap)
assert.Equal(t, slice, outputValues)
assert.Less(t, elapsed, 30*time.Millisecond) // It would have taken 50ms serially, but it takes about 20ms with 5 elements and concurrency 3
assert.Less(t, elapsed, 300*time.Millisecond) // It would have taken 500ms serially, but it takes about 200ms with 5 elements and concurrency 3
assertPipelineDone(t, pipeline, 10*time.Millisecond)
})

Expand Down

0 comments on commit e73936b

Please sign in to comment.