Permalink
mkorolyov
Introduced new pluggable logging API (#301)
45c0585
Sep 17, 2019
Join GitHub today
GitHub is home to over 40 million developers working together to host and review code, manage projects, and build software together.
Sign uppackage kafka | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"io" | |
"math/rand" | |
"sort" | |
"sync" | |
"time" | |
) | |
// The Writer type provides the implementation of a producer of kafka messages | |
// that automatically distributes messages across partitions of a single topic | |
// using a configurable balancing policy. | |
// | |
// Instances of Writer are safe to use concurrently from multiple goroutines. | |
type Writer struct { | |
config WriterConfig | |
mutex sync.RWMutex | |
closed bool | |
join sync.WaitGroup | |
msgs chan writerMessage | |
done chan struct{} | |
// writer stats are all made of atomic values, no need for synchronization. | |
// Use a pointer to ensure 64-bit alignment of the values. | |
stats *writerStats | |
} | |
// WriterConfig is a configuration type used to create new instances of Writer. | |
type WriterConfig struct { | |
// The list of brokers used to discover the partitions available on the | |
// kafka cluster. | |
// | |
// This field is required, attempting to create a writer with an empty list | |
// of brokers will panic. | |
Brokers []string | |
// The topic that the writer will produce messages to. | |
// | |
// This field is required, attempting to create a writer with an empty topic | |
// will panic. | |
Topic string | |
// The dialer used by the writer to establish connections to the kafka | |
// cluster. | |
// | |
// If nil, the default dialer is used instead. | |
Dialer *Dialer | |
// The balancer used to distribute messages across partitions. | |
// | |
// The default is to use a round-robin distribution. | |
Balancer Balancer | |
// Limit on how many attempts will be made to deliver a message. | |
// | |
// The default is to try at most 10 times. | |
MaxAttempts int | |
// A hint on the capacity of the writer's internal message queue. | |
// | |
// The default is to use a queue capacity of 100 messages. | |
QueueCapacity int | |
// Limit on how many messages will be buffered before being sent to a | |
// partition. | |
// | |
// The default is to use a target batch size of 100 messages. | |
BatchSize int | |
// Limit the maximum size of a request in bytes before being sent to | |
// a partition. | |
// | |
// The default is to use a kafka default value of 1048576. | |
BatchBytes int | |
// Time limit on how often incomplete message batches will be flushed to | |
// kafka. | |
// | |
// The default is to flush at least every second. | |
BatchTimeout time.Duration | |
// Timeout for read operations performed by the Writer. | |
// | |
// Defaults to 10 seconds. | |
ReadTimeout time.Duration | |
// Timeout for write operation performed by the Writer. | |
// | |
// Defaults to 10 seconds. | |
WriteTimeout time.Duration | |
// This interval defines how often the list of partitions is refreshed from | |
// kafka. It allows the writer to automatically handle when new partitions | |
// are added to a topic. | |
// | |
// The default is to refresh partitions every 15 seconds. | |
RebalanceInterval time.Duration | |
// Connections that were idle for this duration will not be reused. | |
// | |
// Defaults to 9 minutes. | |
IdleConnTimeout time.Duration | |
// Number of acknowledges from partition replicas required before receiving | |
// a response to a produce request (default to -1, which means to wait for | |
// all replicas). | |
RequiredAcks int | |
// Setting this flag to true causes the WriteMessages method to never block. | |
// It also means that errors are ignored since the caller will not receive | |
// the returned value. Use this only if you don't care about guarantees of | |
// whether the messages were written to kafka. | |
Async bool | |
// CompressionCodec set the codec to be used to compress Kafka messages. | |
// Note that messages are allowed to overwrite the compression codec individually. | |
CompressionCodec | |
// If not nil, specifies a logger used to report internal changes within the | |
// writer. | |
Logger Logger | |
// ErrorLogger is the logger used to report errors. If nil, the writer falls | |
// back to using Logger instead. | |
ErrorLogger Logger | |
newPartitionWriter func(partition int, config WriterConfig, stats *writerStats) partitionWriter | |
} | |
// WriterStats is a data structure returned by a call to Writer.Stats that | |
// exposes details about the behavior of the writer. | |
type WriterStats struct { | |
Dials int64 `metric:"kafka.writer.dial.count" type:"counter"` | |
Writes int64 `metric:"kafka.writer.write.count" type:"counter"` | |
Messages int64 `metric:"kafka.writer.message.count" type:"counter"` | |
Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"` | |
Rebalances int64 `metric:"kafka.writer.rebalance.count" type:"counter"` | |
Errors int64 `metric:"kafka.writer.error.count" type:"counter"` | |
DialTime DurationStats `metric:"kafka.writer.dial.seconds"` | |
WriteTime DurationStats `metric:"kafka.writer.write.seconds"` | |
WaitTime DurationStats `metric:"kafka.writer.wait.seconds"` | |
Retries SummaryStats `metric:"kafka.writer.retries.count"` | |
BatchSize SummaryStats `metric:"kafka.writer.batch.size"` | |
BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"` | |
MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"` | |
MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"` | |
BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"` | |
ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"` | |
WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"` | |
RebalanceInterval time.Duration `metric:"kafka.writer.rebalance.interval" type:"gauge"` | |
RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"` | |
Async bool `metric:"kafka.writer.async" type:"gauge"` | |
QueueLength int64 `metric:"kafka.writer.queue.length" type:"gauge"` | |
QueueCapacity int64 `metric:"kafka.writer.queue.capacity" type:"gauge"` | |
ClientID string `tag:"client_id"` | |
Topic string `tag:"topic"` | |
} | |
// writerStats is a struct that contains statistics on a writer. | |
// | |
// Since atomic is used to mutate the statistics the values must be 64-bit aligned. | |
// This is easily accomplished by always allocating this struct directly, (i.e. using a pointer to the struct). | |
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG | |
type writerStats struct { | |
dials counter | |
writes counter | |
messages counter | |
bytes counter | |
rebalances counter | |
errors counter | |
dialTime summary | |
writeTime summary | |
waitTime summary | |
retries summary | |
batchSize summary | |
batchSizeBytes summary | |
} | |
// Validate method validates WriterConfig properties. | |
func (config *WriterConfig) Validate() error { | |
if len(config.Brokers) == 0 { | |
return errors.New("cannot create a kafka writer with an empty list of brokers") | |
} | |
if len(config.Topic) == 0 { | |
return errors.New("cannot create a kafka writer with an empty topic") | |
} | |
return nil | |
} | |
// NewWriter creates and returns a new Writer configured with config. | |
func NewWriter(config WriterConfig) *Writer { | |
if err := config.Validate(); err != nil { | |
panic(err) | |
} | |
if config.Dialer == nil { | |
config.Dialer = DefaultDialer | |
} | |
if config.Balancer == nil { | |
config.Balancer = &RoundRobin{} | |
} | |
if config.newPartitionWriter == nil { | |
config.newPartitionWriter = func(partition int, config WriterConfig, stats *writerStats) partitionWriter { | |
return newWriter(partition, config, stats) | |
} | |
} | |
if config.MaxAttempts == 0 { | |
config.MaxAttempts = 10 | |
} | |
if config.QueueCapacity == 0 { | |
config.QueueCapacity = 100 | |
} | |
if config.BatchSize == 0 { | |
config.BatchSize = 100 | |
} | |
if config.BatchBytes == 0 { | |
// 1048576 == 1MB which is the Kafka default. | |
config.BatchBytes = 1048576 | |
} | |
if config.BatchTimeout == 0 { | |
config.BatchTimeout = 1 * time.Second | |
} | |
if config.ReadTimeout == 0 { | |
config.ReadTimeout = 10 * time.Second | |
} | |
if config.WriteTimeout == 0 { | |
config.WriteTimeout = 10 * time.Second | |
} | |
if config.RebalanceInterval == 0 { | |
config.RebalanceInterval = 15 * time.Second | |
} | |
if config.IdleConnTimeout == 0 { | |
config.IdleConnTimeout = 9 * time.Minute | |
} | |
w := &Writer{ | |
config: config, | |
msgs: make(chan writerMessage, config.QueueCapacity), | |
done: make(chan struct{}), | |
stats: &writerStats{ | |
dialTime: makeSummary(), | |
writeTime: makeSummary(), | |
waitTime: makeSummary(), | |
retries: makeSummary(), | |
}, | |
} | |
w.join.Add(1) | |
go w.run() | |
return w | |
} | |
// WriteMessages writes a batch of messages to the kafka topic configured on this | |
// writer. | |
// | |
// Unless the writer was configured to write messages asynchronously, the method | |
// blocks until all messages have been written, or until the maximum number of | |
// attempts was reached. | |
// | |
// When sending synchronously and the writer's batch size is configured to be | |
// greater than 1, this method blocks until either a full batch can be assembled | |
// or the batch timeout is reached. The batch size and timeouts are evaluated | |
// per partition, so the choice of Balancer can also influence the flushing | |
// behavior. For example, the Hash balancer will require on average N * batch | |
// size messages to trigger a flush where N is the number of partitions. The | |
// best way to achieve good batching behavior is to share one Writer amongst | |
// multiple go routines. | |
// | |
// When the method returns an error, there's no way to know yet which messages | |
// have succeeded of failed. | |
// | |
// The context passed as first argument may also be used to asynchronously | |
// cancel the operation. Note that in this case there are no guarantees made on | |
// whether messages were written to kafka. The program should assume that the | |
// whole batch failed and re-write the messages later (which could then cause | |
// duplicates). | |
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { | |
if len(msgs) == 0 { | |
return nil | |
} | |
var err error | |
var res chan error | |
if !w.config.Async { | |
res = make(chan error, len(msgs)) | |
} | |
t0 := time.Now() | |
for attempt := 0; attempt < w.config.MaxAttempts; attempt++ { | |
w.mutex.RLock() | |
if w.closed { | |
w.mutex.RUnlock() | |
return io.ErrClosedPipe | |
} | |
for i, msg := range msgs { | |
if int(msg.size()) > w.config.BatchBytes { | |
err := MessageTooLargeError{ | |
Message: msg, | |
Remaining: msgs[i+1:], | |
} | |
w.mutex.RUnlock() | |
return err | |
} | |
select { | |
case w.msgs <- writerMessage{ | |
msg: msg, | |
res: res, | |
}: | |
case <-ctx.Done(): | |
w.mutex.RUnlock() | |
return ctx.Err() | |
} | |
} | |
w.mutex.RUnlock() | |
if w.config.Async { | |
break | |
} | |
var retry []Message | |
for i := 0; i != len(msgs); i++ { | |
select { | |
case e := <-res: | |
if e != nil { | |
if we, ok := e.(*writerError); ok { | |
w.stats.retries.observe(1) | |
retry, err = append(retry, we.msg), we.err | |
} else { | |
err = e | |
} | |
} | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
if msgs = retry; len(msgs) == 0 { | |
break | |
} | |
timer := time.NewTimer(backoff(attempt+1, 100*time.Millisecond, 1*time.Second)) | |
select { | |
case <-timer.C: | |
// Only clear the error (so we retry the loop) if we have more retries, otherwise | |
// we risk silencing the error. | |
if attempt < w.config.MaxAttempts-1 { | |
err = nil | |
} | |
case <-ctx.Done(): | |
err = ctx.Err() | |
case <-w.done: | |
err = io.ErrClosedPipe | |
} | |
timer.Stop() | |
if err != nil { | |
break | |
} | |
} | |
w.stats.writeTime.observeDuration(time.Since(t0)) | |
return err | |
} | |
// Stats returns a snapshot of the writer stats since the last time the method | |
// was called, or since the writer was created if it is called for the first | |
// time. | |
// | |
// A typical use of this method is to spawn a goroutine that will periodically | |
// call Stats on a kafka writer and report the metrics to a stats collection | |
// system. | |
func (w *Writer) Stats() WriterStats { | |
return WriterStats{ | |
Dials: w.stats.dials.snapshot(), | |
Writes: w.stats.writes.snapshot(), | |
Messages: w.stats.messages.snapshot(), | |
Bytes: w.stats.bytes.snapshot(), | |
Rebalances: w.stats.rebalances.snapshot(), | |
Errors: w.stats.errors.snapshot(), | |
DialTime: w.stats.dialTime.snapshotDuration(), | |
WriteTime: w.stats.writeTime.snapshotDuration(), | |
WaitTime: w.stats.waitTime.snapshotDuration(), | |
Retries: w.stats.retries.snapshot(), | |
BatchSize: w.stats.batchSize.snapshot(), | |
BatchBytes: w.stats.batchSizeBytes.snapshot(), | |
MaxAttempts: int64(w.config.MaxAttempts), | |
MaxBatchSize: int64(w.config.BatchSize), | |
BatchTimeout: w.config.BatchTimeout, | |
ReadTimeout: w.config.ReadTimeout, | |
WriteTimeout: w.config.WriteTimeout, | |
RebalanceInterval: w.config.RebalanceInterval, | |
RequiredAcks: int64(w.config.RequiredAcks), | |
Async: w.config.Async, | |
QueueLength: int64(len(w.msgs)), | |
QueueCapacity: int64(cap(w.msgs)), | |
ClientID: w.config.Dialer.ClientID, | |
Topic: w.config.Topic, | |
} | |
} | |
// Close flushes all buffered messages and closes the writer. The call to Close | |
// aborts any concurrent calls to WriteMessages, which then return with the | |
// io.ErrClosedPipe error. | |
func (w *Writer) Close() (err error) { | |
w.mutex.Lock() | |
if !w.closed { | |
w.closed = true | |
close(w.msgs) | |
close(w.done) | |
} | |
w.mutex.Unlock() | |
w.join.Wait() | |
return | |
} | |
func (w *Writer) run() { | |
defer w.join.Done() | |
ticker := time.NewTicker(w.config.RebalanceInterval) | |
defer ticker.Stop() | |
var rebalance = true | |
var writers = make(map[int]partitionWriter) | |
var partitions []int | |
var err error | |
for { | |
if rebalance { | |
w.stats.rebalances.observe(1) | |
rebalance = false | |
var newPartitions []int | |
var oldPartitions = partitions | |
if newPartitions, err = w.partitions(); err == nil { | |
for _, partition := range diffp(oldPartitions, newPartitions) { | |
w.close(writers[partition]) | |
delete(writers, partition) | |
} | |
for _, partition := range diffp(newPartitions, oldPartitions) { | |
writers[partition] = w.open(partition) | |
} | |
partitions = newPartitions | |
} | |
} | |
select { | |
case wm, ok := <-w.msgs: | |
if !ok { | |
for _, writer := range writers { | |
w.close(writer) | |
} | |
return | |
} | |
if len(partitions) != 0 { | |
selectedPartition := w.config.Balancer.Balance(wm.msg, partitions...) | |
writers[selectedPartition].messages() <- wm | |
} else { | |
// No partitions were found because the topic doesn't exist. | |
if err == nil { | |
err = fmt.Errorf("failed to find any partitions for topic %s", w.config.Topic) | |
} | |
if wm.res != nil { | |
wm.res <- &writerError{msg: wm.msg, err: err} | |
} | |
} | |
case <-ticker.C: | |
rebalance = true | |
} | |
} | |
} | |
func (w *Writer) partitions() (partitions []int, err error) { | |
for _, broker := range shuffledStrings(w.config.Brokers) { | |
var conn *Conn | |
var plist []Partition | |
if conn, err = w.config.Dialer.Dial("tcp", broker); err != nil { | |
continue | |
} | |
conn.SetReadDeadline(time.Now().Add(w.config.ReadTimeout)) | |
plist, err = conn.ReadPartitions(w.config.Topic) | |
conn.Close() | |
if err == nil { | |
partitions = make([]int, len(plist)) | |
for i, p := range plist { | |
partitions[i] = p.ID | |
} | |
break | |
} | |
} | |
sort.Ints(partitions) | |
return | |
} | |
func (w *Writer) open(partition int) partitionWriter { | |
return w.config.newPartitionWriter(partition, w.config, w.stats) | |
} | |
func (w *Writer) close(writer partitionWriter) { | |
w.join.Add(1) | |
go func() { | |
writer.close() | |
w.join.Done() | |
}() | |
} | |
func diffp(new []int, old []int) (diff []int) { | |
for _, p := range new { | |
if i := sort.SearchInts(old, p); i == len(old) || old[i] != p { | |
diff = append(diff, p) | |
} | |
} | |
return | |
} | |
type partitionWriter interface { | |
messages() chan<- writerMessage | |
close() | |
} | |
type writer struct { | |
brokers []string | |
topic string | |
partition int | |
requiredAcks int | |
batchSize int | |
maxMessageBytes int | |
batchTimeout time.Duration | |
writeTimeout time.Duration | |
idleConnTimeout time.Duration | |
dialer *Dialer | |
msgs chan writerMessage | |
join sync.WaitGroup | |
stats *writerStats | |
codec CompressionCodec | |
logger Logger | |
errorLogger Logger | |
} | |
func newWriter(partition int, config WriterConfig, stats *writerStats) *writer { | |
w := &writer{ | |
brokers: config.Brokers, | |
topic: config.Topic, | |
partition: partition, | |
requiredAcks: config.RequiredAcks, | |
batchSize: config.BatchSize, | |
maxMessageBytes: config.BatchBytes, | |
batchTimeout: config.BatchTimeout, | |
writeTimeout: config.WriteTimeout, | |
idleConnTimeout: config.IdleConnTimeout, | |
dialer: config.Dialer, | |
msgs: make(chan writerMessage, config.QueueCapacity), | |
stats: stats, | |
codec: config.CompressionCodec, | |
logger: config.Logger, | |
errorLogger: config.ErrorLogger, | |
} | |
w.join.Add(1) | |
go w.run() | |
return w | |
} | |
func (w *writer) close() { | |
close(w.msgs) | |
w.join.Wait() | |
} | |
func (w *writer) messages() chan<- writerMessage { | |
return w.msgs | |
} | |
func (w *writer) withLogger(do func(Logger)) { | |
if w.logger != nil { | |
do(w.logger) | |
} | |
} | |
func (w *writer) withErrorLogger(do func(Logger)) { | |
if w.errorLogger != nil { | |
do(w.errorLogger) | |
} else { | |
w.withLogger(do) | |
} | |
} | |
func (w *writer) run() { | |
defer w.join.Done() | |
batchTimer := time.NewTimer(0) | |
<-batchTimer.C | |
batchTimerRunning := false | |
defer batchTimer.Stop() | |
var conn *Conn | |
var done bool | |
var batch = make([]Message, 0, w.batchSize) | |
var resch = make([](chan<- error), 0, w.batchSize) | |
var lastMsg writerMessage | |
var batchSizeBytes int | |
var idleConnDeadline time.Time | |
defer func() { | |
if conn != nil { | |
conn.Close() | |
} | |
}() | |
for !done { | |
var mustFlush bool | |
// lstMsg gets set when the next message would put the maxMessageBytes over the limit. | |
// If a lstMsg exists we need to add it to the batch so we don't lose it. | |
if len(lastMsg.msg.Value) != 0 { | |
batch = append(batch, lastMsg.msg) | |
if lastMsg.res != nil { | |
resch = append(resch, lastMsg.res) | |
} | |
batchSizeBytes += int(lastMsg.msg.size()) | |
lastMsg = writerMessage{} | |
if !batchTimerRunning { | |
batchTimer.Reset(w.batchTimeout) | |
batchTimerRunning = true | |
} | |
} | |
select { | |
case wm, ok := <-w.msgs: | |
if !ok { | |
done, mustFlush = true, true | |
} else { | |
if int(wm.msg.size())+batchSizeBytes > w.maxMessageBytes { | |
// If the size of the current message puts us over the maxMessageBytes limit, | |
// store the message but don't send it in this batch. | |
mustFlush = true | |
lastMsg = wm | |
break | |
} | |
batch = append(batch, wm.msg) | |
if wm.res != nil { | |
resch = append(resch, wm.res) | |
} | |
batchSizeBytes += int(wm.msg.size()) | |
mustFlush = len(batch) >= w.batchSize || batchSizeBytes >= w.maxMessageBytes | |
} | |
if !batchTimerRunning { | |
batchTimer.Reset(w.batchTimeout) | |
batchTimerRunning = true | |
} | |
case <-batchTimer.C: | |
mustFlush = true | |
batchTimerRunning = false | |
} | |
if mustFlush { | |
w.stats.batchSizeBytes.observe(int64(batchSizeBytes)) | |
if batchTimerRunning { | |
if stopped := batchTimer.Stop(); !stopped { | |
<-batchTimer.C | |
} | |
batchTimerRunning = false | |
} | |
if conn != nil && time.Now().After(idleConnDeadline) { | |
conn.Close() | |
conn = nil | |
} | |
if len(batch) == 0 { | |
continue | |
} | |
var err error | |
if conn, err = w.write(conn, batch, resch); err != nil { | |
if conn != nil { | |
conn.Close() | |
conn = nil | |
} | |
} | |
idleConnDeadline = time.Now().Add(w.idleConnTimeout) | |
for i := range batch { | |
batch[i] = Message{} | |
} | |
for i := range resch { | |
resch[i] = nil | |
} | |
batch = batch[:0] | |
resch = resch[:0] | |
batchSizeBytes = 0 | |
} | |
} | |
} | |
func (w *writer) dial() (conn *Conn, err error) { | |
for _, broker := range shuffledStrings(w.brokers) { | |
t0 := time.Now() | |
if conn, err = w.dialer.DialLeader(context.Background(), "tcp", broker, w.topic, w.partition); err == nil { | |
t1 := time.Now() | |
w.stats.dials.observe(1) | |
w.stats.dialTime.observeDuration(t1.Sub(t0)) | |
conn.SetRequiredAcks(w.requiredAcks) | |
break | |
} | |
} | |
return | |
} | |
func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret *Conn, err error) { | |
w.stats.writes.observe(1) | |
if conn == nil { | |
if conn, err = w.dial(); err != nil { | |
w.stats.errors.observe(1) | |
w.withErrorLogger(func(logger Logger) { | |
logger.Printf("error dialing kafka brokers for topic %s (partition %d): %s", w.topic, w.partition, err) | |
}) | |
for i, res := range resch { | |
res <- &writerError{msg: batch[i], err: err} | |
} | |
return | |
} | |
} | |
t0 := time.Now() | |
conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) | |
if _, err = conn.WriteCompressedMessages(w.codec, batch...); err != nil { | |
w.stats.errors.observe(1) | |
w.withErrorLogger(func(logger Logger) { | |
logger.Printf("error writing messages to %s (partition %d): %s", w.topic, w.partition, err) | |
}) | |
for i, res := range resch { | |
res <- &writerError{msg: batch[i], err: err} | |
} | |
} else { | |
for _, m := range batch { | |
w.stats.messages.observe(1) | |
w.stats.bytes.observe(int64(len(m.Key) + len(m.Value))) | |
} | |
for _, res := range resch { | |
res <- nil | |
} | |
} | |
t1 := time.Now() | |
w.stats.waitTime.observeDuration(t1.Sub(t0)) | |
w.stats.batchSize.observe(int64(len(batch))) | |
ret = conn | |
return | |
} | |
type writerMessage struct { | |
msg Message | |
res chan<- error | |
} | |
type writerError struct { | |
msg Message | |
err error | |
} | |
func (e *writerError) Cause() error { | |
return e.err | |
} | |
func (e *writerError) Error() string { | |
return e.err.Error() | |
} | |
func (e *writerError) Temporary() bool { | |
return isTemporary(e.err) | |
} | |
func (e *writerError) Timeout() bool { | |
return isTimeout(e.err) | |
} | |
func shuffledStrings(list []string) []string { | |
shuffledList := make([]string, len(list)) | |
copy(shuffledList, list) | |
shufflerMutex.Lock() | |
for i := range shuffledList { | |
j := shuffler.Intn(i + 1) | |
shuffledList[i], shuffledList[j] = shuffledList[j], shuffledList[i] | |
} | |
shufflerMutex.Unlock() | |
return shuffledList | |
} | |
var ( | |
shufflerMutex = sync.Mutex{} | |
shuffler = rand.New(rand.NewSource(time.Now().Unix())) | |
) |