Skip to content

Commit

Permalink
Add support for reliable acks based on configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Apr 22, 2024
1 parent d30d947 commit 326de78
Show file tree
Hide file tree
Showing 14 changed files with 226 additions and 124 deletions.
61 changes: 48 additions & 13 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ type Config struct {
// RateLimit is a configuration for the ratelimit
RateLimit *RateLimit `json:"rate_limit,omitempty"`

// ReliableAck if true, the server will send an ack back to the client only when the message has been stored in a datastore
ReliableAck bool `json:"reliable_ack,omitempty"`

// ReliableAckWorkers is the number of workers that will handle the acknowledgment
ReliableAckWorkers int `json:"reliable_ack_workers,omitempty"`
// ReliableAckSources
ReliableAckSources map[string]telemetry.Dispatcher `json:"reliable_ack_sources,omitempty"`

// Kafka is a configuration for the standard librdkafka configuration properties
// seen here: https://raw.githubusercontent.com/confluentinc/librdkafka/master/CONFIGURATION.md
Expand Down Expand Up @@ -245,12 +242,13 @@ func (c *Config) prometheusEnabled() bool {
return false
}

func (c *Config) ReliableAcksDisabled() bool {
return c.ReliableAck == false && c.ReliableAckWorkers == 0
}

// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (map[string][]telemetry.Producer, error) {
reliableAckSources, err := c.configureReliableAckSources()
if err != nil {
return nil, err
}

producers := make(map[telemetry.Dispatcher]telemetry.Producer)
producers[telemetry.Logger] = simple.NewProtoLogger(logger)

Expand All @@ -266,7 +264,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
return nil, errors.New("Expected Kafka to be configured")
}
convertKafkaConfig(c.Kafka)
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kafka], logger)
if err != nil {
return nil, err
}
Expand All @@ -277,7 +275,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
if c.Pubsub == nil {
return nil, errors.New("Expected Pubsub to be configured")
}
googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, logger)
googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger)
if err != nil {
return nil, err
}
Expand All @@ -293,7 +291,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
maxRetries = *c.Kinesis.MaxRetries
}
streamMapping := c.CreateKinesisStreamMapping(recordNames)
kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kinesis], logger)
if err != nil {
return nil, err
}
Expand All @@ -304,7 +302,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
if c.ZMQ == nil {
return nil, errors.New("Expected ZMQ to be configured")
}
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, logger)
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.ZMQ], logger)
if err != nil {
return nil, err
}
Expand All @@ -327,6 +325,43 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
return dispatchProducerRules, nil
}

func (c *Config) configureReliableAckSources() (map[telemetry.Dispatcher][]string, error) {
reliableAckSources := make(map[telemetry.Dispatcher][]string, 0)
for txType, dispatchRule := range c.ReliableAckSources {
if dispatchRule == telemetry.Logger {
return nil, fmt.Errorf("Logger cannot be configured as reliable ack for record: %s", txType)
}
dispatchers, ok := c.Records[txType]
if !ok {
return nil, fmt.Errorf("%s cannot be configured as reliable ack for record: %s since no record mapping exists", dispatchRule, txType)
}
dispatchRuleFound := false
validDispatchers := parseValidDispatchers(dispatchers)
for _, dispatcher := range validDispatchers {
if dispatcher == dispatchRule {
dispatchRuleFound = true
reliableAckSources[dispatchRule] = append(reliableAckSources[dispatchRule], txType)
break
}
}
if !dispatchRuleFound {
return nil, fmt.Errorf("%s cannot be configured as reliable ack for record: %s. Only %v are configured by the service", dispatchRule, validDispatchers)
}
}
return reliableAckSources, nil
}

// parseValidDispatchers removes no-op dispatcher from the input i.e. Logger
func parseValidDispatchers(input []telemetry.Dispatcher) []telemetry.Dispatcher {
var result []telemetry.Dispatcher
for _, v := range input {
if v != telemetry.Logger {
result = append(result, v)
}
}
return result
}

// convertKafkaConfig will prioritize int over float
// see: https://github.com/confluentinc/confluent-kafka-go/blob/cde2827bc49655eca0f9ce3fc1cda13cb6cdabc9/kafka/config.go#L108-L125
func convertKafkaConfig(input *confluent.ConfigMap) {
Expand Down
5 changes: 0 additions & 5 deletions config/config_initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package config

import (
"encoding/json"
"errors"
"flag"
"log"
"os"
Expand Down Expand Up @@ -55,10 +54,6 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
}
config.MetricCollector = metrics.NewCollector(config.Monitoring, logger)

// TODO disble this check when reliable acks are properly supported
if !config.ReliableAcksDisabled() {
return nil, errors.New("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file")
}
config.AckChan = make(chan *telemetry.Record)
return config, err
}
Expand Down
20 changes: 7 additions & 13 deletions config/config_initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (
var _ = Describe("Test application config initialization", func() {
It("loads the config properly", func() {
expectedConfig := &Config{
Host: "127.0.0.1",
Port: 443,
StatusPort: 8080,
Namespace: "tesla_telemetry",
TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"},
RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30},
ReliableAck: false,
ReliableAckWorkers: 0,
Host: "127.0.0.1",
Port: 443,
StatusPort: 8080,
Namespace: "tesla_telemetry",
TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"},
RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30},
ReliableAck: false,

Check failure on line 26 in config/config_initializer_test.go

View workflow job for this annotation

GitHub Actions / build

unknown field ReliableAck in struct literal of type Config
Kafka: &confluent.ConfigMap{
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
Expand Down Expand Up @@ -73,11 +72,6 @@ var _ = Describe("Test application config initialization", func() {
Expect(loadedConfig).To(Equal(expectedConfig))
})

It("fails when reliable acks are set", func() {
_, err := loadTestApplicationConfig(TestReliableAckConfig)
Expect(err).Should(MatchError("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file"))
})

It("returns an error if config is not appropriate", func() {
_, err := loadTestApplicationConfig(BadTopicConfig)
Expect(err).To(MatchError("invalid character '}' looking for beginning of object key string"))
Expand Down
1 change: 0 additions & 1 deletion config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ const TestReliableAckConfig = `
"status_port": 8080,
"namespace": "tesla_telemetry",
"reliable_ack": true,
"reliable_ack_workers": 15,
"kafka": {
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
Expand Down
31 changes: 23 additions & 8 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (

// Producer client to handle google pubsub interactions
type Producer struct {
pubsubClient *pubsub.Client
projectID string
namespace string
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
pubsubClient *pubsub.Client
projectID string
namespace string
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
ackChan chan (*telemetry.Record)
reliableAckTxTypes []string
}

// Metrics stores metrics reported from this package
Expand Down Expand Up @@ -54,7 +56,7 @@ func configurePubsub(projectID string) (*pubsub.Client, error) {
}

// NewProducer establishes the pubsub connection and define the dispatch method
func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, ackChan chan (*telemetry.Record), reliableAckTxTypes []string, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)
pubsubClient, err := configurePubsub(projectID)
if err != nil {
Expand All @@ -69,6 +71,7 @@ func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string,
metricsCollector: metricsCollector,
logger: logger,
airbrakeHandler: airbrakeHandler,
ackChan: ackChan,
}
p.logger.ActivityLog("pubsub_registerd", logrus.LogInfo{"project": projectID, "namespace": namespace})
return p, nil
Expand Down Expand Up @@ -103,12 +106,24 @@ func (p *Producer) Produce(entry *telemetry.Record) {
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
} else {
p.ProcessReliableAck(entry)
}
metricsRegistry.publishBytesTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
metricsRegistry.publishCount.Inc(map[string]string{"record_type": entry.TxType})

}

// SetReliableAckTxType sets the tx types that need to be acked
func (p *Producer) ProcessReliableAck(entry *telemetry.Record) {
for _, txType := range p.reliableAckTxTypes {
if entry.TxType == txType {
p.ackChan <- entry
break
}
}
}

func (p *Producer) createTopicIfNotExists(ctx context.Context, topic string) (*pubsub.Topic, error) {
pubsubTopic := p.pubsubClient.Topic(topic)
exists, err := pubsubTopic.Exists(ctx)
Expand Down
30 changes: 22 additions & 8 deletions datastore/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import (

// Producer client to handle kafka interactions
type Producer struct {
kafkaProducer *kafka.Producer
namespace string
prometheusEnabled bool
metricsCollector metrics.MetricCollector
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
deliveryChan chan kafka.Event
kafkaProducer *kafka.Producer
namespace string
prometheusEnabled bool
metricsCollector metrics.MetricCollector
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
deliveryChan chan kafka.Event
ackChan chan (*telemetry.Record)
reliableAckTxTypes []string
}

// Metrics stores metrics reported from this package
Expand All @@ -41,7 +43,7 @@ var (
)

// NewProducer establishes the kafka connection and define the dispatch method
func NewProducer(config *kafka.ConfigMap, namespace string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(config *kafka.ConfigMap, namespace string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, ackChan chan (*telemetry.Record), reliableAckTxTypes []string, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)

kafkaProducer, err := kafka.NewProducer(config)
Expand All @@ -57,6 +59,7 @@ func NewProducer(config *kafka.ConfigMap, namespace string, prometheusEnabled bo
logger: logger,
airbrakeHandler: airbrakeHandler,
deliveryChan: make(chan kafka.Event),
ackChan: ackChan,
}

go producer.handleProducerEvents()
Expand Down Expand Up @@ -120,6 +123,7 @@ func (p *Producer) handleProducerEvents() {
p.logError(fmt.Errorf("opaque_record_missing %v", ev))
continue
}
p.ProcessReliableAck(entry)
metricsRegistry.producerAckCount.Inc(map[string]string{"record_type": entry.TxType})
metricsRegistry.bytesAckTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
default:
Expand All @@ -128,6 +132,16 @@ func (p *Producer) handleProducerEvents() {
}
}

// SetReliableAckTxType sets the tx types that need to be acked
func (p *Producer) ProcessReliableAck(entry *telemetry.Record) {
for _, txType := range p.reliableAckTxTypes {
if entry.TxType == txType {
p.ackChan <- entry
break
}
}
}

func (p *Producer) logError(err error) {
p.ReportError("kafka_err", err, nil)
metricsRegistry.errorCount.Inc(map[string]string{})
Expand Down
31 changes: 23 additions & 8 deletions datastore/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (

// Producer client to handle kinesis interactions
type Producer struct {
kinesis *kinesis.Kinesis
logger *logrus.Logger
prometheusEnabled bool
metricsCollector metrics.MetricCollector
streams map[string]string
airbrakeHandler *airbrake.AirbrakeHandler
kinesis *kinesis.Kinesis
logger *logrus.Logger
prometheusEnabled bool
metricsCollector metrics.MetricCollector
streams map[string]string
airbrakeHandler *airbrake.AirbrakeHandler
ackChan chan (*telemetry.Record)
reliableAckTxTypes []string
}

// Metrics stores metrics reported from this package
Expand All @@ -39,7 +41,7 @@ var (
)

// NewProducer configures and tests the kinesis connection
func NewProducer(maxRetries int, streams map[string]string, overrideHost string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(maxRetries int, streams map[string]string, overrideHost string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, ackChan chan (*telemetry.Record), reliableAckTxTypes []string, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)

config := &aws.Config{
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewProducer(maxRetries int, streams map[string]string, overrideHost string,
metricsCollector: metricsCollector,
streams: streams,
airbrakeHandler: airbrakeHandler,
ackChan: ackChan,
}, nil
}

Expand All @@ -91,13 +94,25 @@ func (p *Producer) Produce(entry *telemetry.Record) {
p.ReportError("kinesis_err", err, nil)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
} else {
p.ProcessReliableAck(entry)
}

p.logger.Log(logrus.DEBUG, "kinesis_err", logrus.LogInfo{"vin": entry.Vin, "record_type": entry.TxType, "txid": entry.Txid, "shard_id": *kinesisRecordOutput.ShardId, "sequence_number": *kinesisRecordOutput.SequenceNumber})
p.logger.Log(logrus.DEBUG, "kinesis_message_dispatched", logrus.LogInfo{"vin": entry.Vin, "record_type": entry.TxType, "txid": entry.Txid, "shard_id": *kinesisRecordOutput.ShardId, "sequence_number": *kinesisRecordOutput.SequenceNumber})
metricsRegistry.publishCount.Inc(map[string]string{"record_type": entry.TxType})
metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
}

// SetReliableAckTxType sets the tx types that need to be acked
func (p *Producer) ProcessReliableAck(entry *telemetry.Record) {
for _, txType := range p.reliableAckTxTypes {
if entry.TxType == txType {
p.ackChan <- entry
break
}
}
}

// ReportError to airbrake and logger
func (p *Producer) ReportError(message string, err error, logInfo logrus.LogInfo) {
p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo)
Expand Down
4 changes: 4 additions & 0 deletions datastore/simple/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ func NewProtoLogger(logger *logrus.Logger) telemetry.Producer {
return &ProtoLogger{logger: logger}
}

// SetReliableAckTxType no-op for logger datastore
func (p *ProtoLogger) ProcessReliableAck(entry *telemetry.Record) {
}

// Produce sends the data to the logger
func (p *ProtoLogger) Produce(entry *telemetry.Record) {
data, err := entry.GetJSONPayload()
Expand Down
Loading

0 comments on commit 326de78

Please sign in to comment.