Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/kafka: fix send on closed channel panic #912

Merged
merged 8 commits into from Sep 3, 2020

Conversation

amyangfei
Copy link
Contributor

What problem does this PR solve?

Fix #908

What is changed and how it works?

It is safe to close chan from the sender routine generally, we put the Close of the producer in the later processor stop procedure.

Besides there exists a bug in select usage. In Golang select model, if one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection, so we put message sender to the default branch

	select {
	case <-ctx.Done():
		return ctx.Err()
        case <-k.closeCh:
		return nil
-       case k.asyncClient.Input() <- msg:
+       default:
+               k.asyncClient.Input() <- msg

Check List

Tests

  • Unit test
  • Integration test

Release note

  • No release note

@amyangfei amyangfei added type/bugfix This PR fixes a bug. component/sink Sink component. labels Sep 2, 2020
@amyangfei amyangfei added this to the v4.0.6 milestone Sep 2, 2020
@amyangfei
Copy link
Contributor Author

/run-all-tests

1 similar comment
@amyangfei
Copy link
Contributor Author

/run-all-tests

Copy link
Contributor

@zier-one zier-one left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@ti-srebot ti-srebot added the status/LGT1 Indicates that a PR has LGTM 1. label Sep 2, 2020
@liuzix
Copy link
Contributor

liuzix commented Sep 2, 2020

Could you add a unit test where you send data asynchronously while closing the Producer? This is a scenario that might produce a data race (This might have been fixed by the adjustment to the select statement, but just to be sure).

@amyangfei
Copy link
Contributor Author

amyangfei commented Sep 2, 2020

Could you add a unit test where you send data asynchronously while closing the Producer? This is a scenario that might produce a data race (This might have been fixed by the adjustment to the select statement, but just to be sure).

Yes, you are right, there still exists data race and send on closed chan risk, using following code @liuzix

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type worker struct {
	inputCh chan struct{}
	closeCh chan struct{}
	count   int64
	l       sync.RWMutex
}

func (w *worker) send(ctx context.Context) bool {
	w.l.Lock()
	defer w.l.Unlock()
	select {
	case <-ctx.Done():
		return false
	case <-w.closeCh:
		return true
	default:
		w.inputCh <- struct{}{}
	}
	return false
}

func (w *worker) produce(ctx context.Context) {
	finish := false
	for !finish {
		finish = w.send(ctx)
	}
}

func (w *worker) consume(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-w.inputCh:
			w.count++
		}
	}
}

func (w *worker) stop(lock bool) {
	if lock {
		w.l.Lock()
	}
	close(w.closeCh)
	if lock {
		w.l.Unlock()
	}
	close(w.inputCh)
}

func test(i int, enableLock bool) {
	w := &worker{
		inputCh: make(chan struct{}, 1),
		closeCh: make(chan struct{}),
	}
	ctx, cancel := context.WithCancel(context.Background())
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		w.produce(ctx)
	}()
	go func() {
		defer wg.Done()
		w.consume(ctx)
	}()

	time.Sleep(time.Millisecond * 1)
	w.stop(enableLock)
	cancel()
	wg.Wait()
	fmt.Printf("idx: %d count:%d\n", i, w.count)
}

func main() {
	enableLock := true
	// enableLock := false
	for i := 0; i < 200; i++ {
		test(i, enableLock)
	}
}

If we run with enableLock := false

➜   go run chan_data_race.go
idx: 0 count:3980
idx: 1 count:2541
idx: 2 count:2494
idx: 3 count:2433
panic: send on closed channel

goroutine 10 [running]:
main.(*worker).send(0xc0000641b0, 0x4f0060, 0xc00007c0c0, 0xffffffffffffff00)
        /home/apple/tmp/can-delete/chan_data_race.go:26 +0x16e
main.(*worker).produce(0xc0000641b0, 0x4f0060, 0xc00007c0c0)
        /home/apple/tmp/can-delete/chan_data_race.go:34 +0x3f
main.test.func1(0xc00001e100, 0xc0000641b0, 0x4f0060, 0xc00007c0c0)
        /home/apple/tmp/can-delete/chan_data_race.go:70 +0x6b
created by main.test
        /home/apple/tmp/can-delete/chan_data_race.go:68 +0x15f
exit status 2
➜  go run -race chan_data_race.go
==================
WARNING: DATA RACE
Write at 0x00c0001000d0 by main goroutine:
  runtime.closechan()
      /home/apple/.gvm/gos/go1.15/src/runtime/chan.go:352 +0x0
  main.(*worker).stop()
      /home/apple/tmp/can-delete/chan_data_race.go:57 +0x85
  main.test()
      /home/apple/tmp/can-delete/chan_data_race.go:78 +0x28e
  main.main()
      /home/apple/tmp/can-delete/chan_data_race.go:88 +0x44

Previous read at 0x00c0001000d0 by goroutine 7:
  runtime.chansend()
      /home/apple/.gvm/gos/go1.15/src/runtime/chan.go:158 +0x0
  main.(*worker).send()
      /home/apple/tmp/can-delete/chan_data_race.go:26 +0x227
  main.(*worker).produce()
      /home/apple/tmp/can-delete/chan_data_race.go:34 +0x4c
  main.test.func1()
      /home/apple/tmp/can-delete/chan_data_race.go:70 +0x89

Goroutine 7 (finished) created at:
  main.test()
      /home/apple/tmp/can-delete/chan_data_race.go:68 +0x224
  main.main()
      /home/apple/tmp/can-delete/chan_data_race.go:88 +0x44
==================
idx: 0 count:557
panic: send on closed channel

goroutine 21 [running]:
main.(*worker).send(0xc000114180, 0x57dfa0, 0xc00013a040, 0xffffffffffffff00)
        /home/apple/tmp/can-delete/chan_data_race.go:26 +0x228
main.(*worker).produce(0xc000114180, 0x57dfa0, 0xc00013a040)
        /home/apple/tmp/can-delete/chan_data_race.go:34 +0x4d
main.test.func1(0xc00011a050, 0xc000114180, 0x57dfa0, 0xc00013a040)
        /home/apple/tmp/can-delete/chan_data_race.go:70 +0x8a
created by main.test
        /home/apple/tmp/can-delete/chan_data_race.go:68 +0x225
exit status 2

But with lock enabled, everything goes well

@amyangfei
Copy link
Contributor Author

/run-kafka-tests

@amyangfei
Copy link
Contributor Author

/run-all-tests

@codecov-commenter
Copy link

codecov-commenter commented Sep 3, 2020

Codecov Report

Merging #912 into master will not change coverage.
The diff coverage is n/a.

@@             Coverage Diff             @@
##             master       #912   +/-   ##
===========================================
  Coverage   32.6893%   32.6893%           
===========================================
  Files            99         99           
  Lines         11698      11698           
===========================================
  Hits           3824       3824           
  Misses         7491       7491           
  Partials        383        383           

@liuzix
Copy link
Contributor

liuzix commented Sep 3, 2020

/lgtm

@ti-srebot ti-srebot removed the status/LGT1 Indicates that a PR has LGTM 1. label Sep 3, 2020
@ti-srebot ti-srebot added the status/LGT2 Indicates that a PR has LGTM 2. label Sep 3, 2020
@amyangfei
Copy link
Contributor Author

/merge

@ti-srebot ti-srebot added the status/can-merge Indicates a PR has been approved by a committer. label Sep 3, 2020
@ti-srebot
Copy link
Contributor

/run-all-tests

@ti-srebot
Copy link
Contributor

@amyangfei merge failed.

@amyangfei
Copy link
Contributor Author

/run-kafka-tests

@amyangfei
Copy link
Contributor Author

/merge

@ti-srebot
Copy link
Contributor

/run-all-tests

@ti-srebot
Copy link
Contributor

@amyangfei merge failed.

@zier-one
Copy link
Contributor

zier-one commented Sep 3, 2020

/merge

@ti-srebot
Copy link
Contributor

/run-all-tests

@ti-srebot ti-srebot merged commit bd7de7f into pingcap:master Sep 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/sink Sink component. status/can-merge Indicates a PR has been approved by a committer. status/LGT2 Indicates that a PR has LGTM 2. type/bugfix This PR fixes a bug.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

panic found in Kafka sink
5 participants