From 326de7879d22e83648b5a17b95b1df4bbedee847 Mon Sep 17 00:00:00 2001 From: Abhishek Patro Date: Mon, 22 Apr 2024 16:03:25 -0700 Subject: [PATCH] Add support for reliable acks based on configuration --- config/config.go | 61 +++++++++++++++++++------ config/config_initializer.go | 5 --- config/config_initializer_test.go | 20 +++------ config/test_configs_test.go | 1 - datastore/googlepubsub/publisher.go | 31 +++++++++---- datastore/kafka/kafka.go | 30 +++++++++---- datastore/kinesis/kinesis.go | 31 +++++++++---- datastore/simple/logger.go | 4 ++ datastore/zmq/zmq.go | 34 ++++++++++---- server/streaming/server.go | 69 ++++++++++++++++++++++++----- server/streaming/socket.go | 41 +++-------------- telemetry/producer.go | 1 + telemetry/serializer.go | 11 +---- telemetry/serializer_test.go | 11 +++-- 14 files changed, 226 insertions(+), 124 deletions(-) diff --git a/config/config.go b/config/config.go index 81bf4eb..b9bd3aa 100644 --- a/config/config.go +++ b/config/config.go @@ -53,11 +53,8 @@ type Config struct { // RateLimit is a configuration for the ratelimit RateLimit *RateLimit `json:"rate_limit,omitempty"` - // ReliableAck if true, the server will send an ack back to the client only when the message has been stored in a datastore - ReliableAck bool `json:"reliable_ack,omitempty"` - - // ReliableAckWorkers is the number of workers that will handle the acknowledgment - ReliableAckWorkers int `json:"reliable_ack_workers,omitempty"` + // ReliableAckSources + ReliableAckSources map[string]telemetry.Dispatcher `json:"reliable_ack_sources,omitempty"` // Kafka is a configuration for the standard librdkafka configuration properties // seen here: https://raw.githubusercontent.com/confluentinc/librdkafka/master/CONFIGURATION.md @@ -245,12 +242,13 @@ func (c *Config) prometheusEnabled() bool { return false } -func (c *Config) ReliableAcksDisabled() bool { - return c.ReliableAck == false && c.ReliableAckWorkers == 0 -} - // ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger) func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (map[string][]telemetry.Producer, error) { + reliableAckSources, err := c.configureReliableAckSources() + if err != nil { + return nil, err + } + producers := make(map[telemetry.Dispatcher]telemetry.Producer) producers[telemetry.Logger] = simple.NewProtoLogger(logger) @@ -266,7 +264,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l return nil, errors.New("Expected Kafka to be configured") } convertKafkaConfig(c.Kafka) - kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger) + kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kafka], logger) if err != nil { return nil, err } @@ -277,7 +275,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l if c.Pubsub == nil { return nil, errors.New("Expected Pubsub to be configured") } - googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, logger) + googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger) if err != nil { return nil, err } @@ -293,7 +291,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l maxRetries = *c.Kinesis.MaxRetries } streamMapping := c.CreateKinesisStreamMapping(recordNames) - kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger) + kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kinesis], logger) if err != nil { return nil, err } @@ -304,7 +302,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l if c.ZMQ == nil { return nil, errors.New("Expected ZMQ to be configured") } - zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, logger) + zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.ZMQ], logger) if err != nil { return nil, err } @@ -327,6 +325,43 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l return dispatchProducerRules, nil } +func (c *Config) configureReliableAckSources() (map[telemetry.Dispatcher][]string, error) { + reliableAckSources := make(map[telemetry.Dispatcher][]string, 0) + for txType, dispatchRule := range c.ReliableAckSources { + if dispatchRule == telemetry.Logger { + return nil, fmt.Errorf("Logger cannot be configured as reliable ack for record: %s", txType) + } + dispatchers, ok := c.Records[txType] + if !ok { + return nil, fmt.Errorf("%s cannot be configured as reliable ack for record: %s since no record mapping exists", dispatchRule, txType) + } + dispatchRuleFound := false + validDispatchers := parseValidDispatchers(dispatchers) + for _, dispatcher := range validDispatchers { + if dispatcher == dispatchRule { + dispatchRuleFound = true + reliableAckSources[dispatchRule] = append(reliableAckSources[dispatchRule], txType) + break + } + } + if !dispatchRuleFound { + return nil, fmt.Errorf("%s cannot be configured as reliable ack for record: %s. Only %v are configured by the service", dispatchRule, validDispatchers) + } + } + return reliableAckSources, nil +} + +// parseValidDispatchers removes no-op dispatcher from the input i.e. Logger +func parseValidDispatchers(input []telemetry.Dispatcher) []telemetry.Dispatcher { + var result []telemetry.Dispatcher + for _, v := range input { + if v != telemetry.Logger { + result = append(result, v) + } + } + return result +} + // convertKafkaConfig will prioritize int over float // see: https://github.com/confluentinc/confluent-kafka-go/blob/cde2827bc49655eca0f9ce3fc1cda13cb6cdabc9/kafka/config.go#L108-L125 func convertKafkaConfig(input *confluent.ConfigMap) { diff --git a/config/config_initializer.go b/config/config_initializer.go index 6bd01e2..05fcbfe 100644 --- a/config/config_initializer.go +++ b/config/config_initializer.go @@ -2,7 +2,6 @@ package config import ( "encoding/json" - "errors" "flag" "log" "os" @@ -55,10 +54,6 @@ func loadApplicationConfig(configFilePath string) (*Config, error) { } config.MetricCollector = metrics.NewCollector(config.Monitoring, logger) - // TODO disble this check when reliable acks are properly supported - if !config.ReliableAcksDisabled() { - return nil, errors.New("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file") - } config.AckChan = make(chan *telemetry.Record) return config, err } diff --git a/config/config_initializer_test.go b/config/config_initializer_test.go index ce7e3ce..3462e9b 100644 --- a/config/config_initializer_test.go +++ b/config/config_initializer_test.go @@ -17,14 +17,13 @@ import ( var _ = Describe("Test application config initialization", func() { It("loads the config properly", func() { expectedConfig := &Config{ - Host: "127.0.0.1", - Port: 443, - StatusPort: 8080, - Namespace: "tesla_telemetry", - TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"}, - RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30}, - ReliableAck: false, - ReliableAckWorkers: 0, + Host: "127.0.0.1", + Port: 443, + StatusPort: 8080, + Namespace: "tesla_telemetry", + TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"}, + RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30}, + ReliableAck: false, Kafka: &confluent.ConfigMap{ "bootstrap.servers": "some.broker1:9093,some.broker1:9093", "ssl.ca.location": "kafka.ca", @@ -73,11 +72,6 @@ var _ = Describe("Test application config initialization", func() { Expect(loadedConfig).To(Equal(expectedConfig)) }) - It("fails when reliable acks are set", func() { - _, err := loadTestApplicationConfig(TestReliableAckConfig) - Expect(err).Should(MatchError("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file")) - }) - It("returns an error if config is not appropriate", func() { _, err := loadTestApplicationConfig(BadTopicConfig) Expect(err).To(MatchError("invalid character '}' looking for beginning of object key string")) diff --git a/config/test_configs_test.go b/config/test_configs_test.go index 9a0a31d..2ee17bd 100644 --- a/config/test_configs_test.go +++ b/config/test_configs_test.go @@ -66,7 +66,6 @@ const TestReliableAckConfig = ` "status_port": 8080, "namespace": "tesla_telemetry", "reliable_ack": true, - "reliable_ack_workers": 15, "kafka": { "bootstrap.servers": "some.broker1:9093,some.broker1:9093", "ssl.ca.location": "kafka.ca", diff --git a/datastore/googlepubsub/publisher.go b/datastore/googlepubsub/publisher.go index 88fe7a0..8302388 100644 --- a/datastore/googlepubsub/publisher.go +++ b/datastore/googlepubsub/publisher.go @@ -19,13 +19,15 @@ import ( // Producer client to handle google pubsub interactions type Producer struct { - pubsubClient *pubsub.Client - projectID string - namespace string - metricsCollector metrics.MetricCollector - prometheusEnabled bool - logger *logrus.Logger - airbrakeHandler *airbrake.AirbrakeHandler + pubsubClient *pubsub.Client + projectID string + namespace string + metricsCollector metrics.MetricCollector + prometheusEnabled bool + logger *logrus.Logger + airbrakeHandler *airbrake.AirbrakeHandler + ackChan chan (*telemetry.Record) + reliableAckTxTypes []string } // Metrics stores metrics reported from this package @@ -54,7 +56,7 @@ func configurePubsub(projectID string) (*pubsub.Client, error) { } // NewProducer establishes the pubsub connection and define the dispatch method -func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) { +func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, ackChan chan (*telemetry.Record), reliableAckTxTypes []string, logger *logrus.Logger) (telemetry.Producer, error) { registerMetricsOnce(metricsCollector) pubsubClient, err := configurePubsub(projectID) if err != nil { @@ -69,6 +71,7 @@ func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string, metricsCollector: metricsCollector, logger: logger, airbrakeHandler: airbrakeHandler, + ackChan: ackChan, } p.logger.ActivityLog("pubsub_registerd", logrus.LogInfo{"project": projectID, "namespace": namespace}) return p, nil @@ -103,12 +106,24 @@ func (p *Producer) Produce(entry *telemetry.Record) { p.ReportError("pubsub_err", err, logInfo) metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType}) return + } else { + p.ProcessReliableAck(entry) } metricsRegistry.publishBytesTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType}) metricsRegistry.publishCount.Inc(map[string]string{"record_type": entry.TxType}) } +// SetReliableAckTxType sets the tx types that need to be acked +func (p *Producer) ProcessReliableAck(entry *telemetry.Record) { + for _, txType := range p.reliableAckTxTypes { + if entry.TxType == txType { + p.ackChan <- entry + break + } + } +} + func (p *Producer) createTopicIfNotExists(ctx context.Context, topic string) (*pubsub.Topic, error) { pubsubTopic := p.pubsubClient.Topic(topic) exists, err := pubsubTopic.Exists(ctx) diff --git a/datastore/kafka/kafka.go b/datastore/kafka/kafka.go index 5144473..7ee1bfd 100644 --- a/datastore/kafka/kafka.go +++ b/datastore/kafka/kafka.go @@ -16,13 +16,15 @@ import ( // Producer client to handle kafka interactions type Producer struct { - kafkaProducer *kafka.Producer - namespace string - prometheusEnabled bool - metricsCollector metrics.MetricCollector - logger *logrus.Logger - airbrakeHandler *airbrake.AirbrakeHandler - deliveryChan chan kafka.Event + kafkaProducer *kafka.Producer + namespace string + prometheusEnabled bool + metricsCollector metrics.MetricCollector + logger *logrus.Logger + airbrakeHandler *airbrake.AirbrakeHandler + deliveryChan chan kafka.Event + ackChan chan (*telemetry.Record) + reliableAckTxTypes []string } // Metrics stores metrics reported from this package @@ -41,7 +43,7 @@ var ( ) // NewProducer establishes the kafka connection and define the dispatch method -func NewProducer(config *kafka.ConfigMap, namespace string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) { +func NewProducer(config *kafka.ConfigMap, namespace string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, ackChan chan (*telemetry.Record), reliableAckTxTypes []string, logger *logrus.Logger) (telemetry.Producer, error) { registerMetricsOnce(metricsCollector) kafkaProducer, err := kafka.NewProducer(config) @@ -57,6 +59,7 @@ func NewProducer(config *kafka.ConfigMap, namespace string, prometheusEnabled bo logger: logger, airbrakeHandler: airbrakeHandler, deliveryChan: make(chan kafka.Event), + ackChan: ackChan, } go producer.handleProducerEvents() @@ -120,6 +123,7 @@ func (p *Producer) handleProducerEvents() { p.logError(fmt.Errorf("opaque_record_missing %v", ev)) continue } + p.ProcessReliableAck(entry) metricsRegistry.producerAckCount.Inc(map[string]string{"record_type": entry.TxType}) metricsRegistry.bytesAckTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType}) default: @@ -128,6 +132,16 @@ func (p *Producer) handleProducerEvents() { } } +// SetReliableAckTxType sets the tx types that need to be acked +func (p *Producer) ProcessReliableAck(entry *telemetry.Record) { + for _, txType := range p.reliableAckTxTypes { + if entry.TxType == txType { + p.ackChan <- entry + break + } + } +} + func (p *Producer) logError(err error) { p.ReportError("kafka_err", err, nil) metricsRegistry.errorCount.Inc(map[string]string{}) diff --git a/datastore/kinesis/kinesis.go b/datastore/kinesis/kinesis.go index 60e6b5e..5ca3b35 100644 --- a/datastore/kinesis/kinesis.go +++ b/datastore/kinesis/kinesis.go @@ -18,12 +18,14 @@ import ( // Producer client to handle kinesis interactions type Producer struct { - kinesis *kinesis.Kinesis - logger *logrus.Logger - prometheusEnabled bool - metricsCollector metrics.MetricCollector - streams map[string]string - airbrakeHandler *airbrake.AirbrakeHandler + kinesis *kinesis.Kinesis + logger *logrus.Logger + prometheusEnabled bool + metricsCollector metrics.MetricCollector + streams map[string]string + airbrakeHandler *airbrake.AirbrakeHandler + ackChan chan (*telemetry.Record) + reliableAckTxTypes []string } // Metrics stores metrics reported from this package @@ -39,7 +41,7 @@ var ( ) // NewProducer configures and tests the kinesis connection -func NewProducer(maxRetries int, streams map[string]string, overrideHost string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) { +func NewProducer(maxRetries int, streams map[string]string, overrideHost string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, ackChan chan (*telemetry.Record), reliableAckTxTypes []string, logger *logrus.Logger) (telemetry.Producer, error) { registerMetricsOnce(metricsCollector) config := &aws.Config{ @@ -69,6 +71,7 @@ func NewProducer(maxRetries int, streams map[string]string, overrideHost string, metricsCollector: metricsCollector, streams: streams, airbrakeHandler: airbrakeHandler, + ackChan: ackChan, }, nil } @@ -91,13 +94,25 @@ func (p *Producer) Produce(entry *telemetry.Record) { p.ReportError("kinesis_err", err, nil) metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType}) return + } else { + p.ProcessReliableAck(entry) } - p.logger.Log(logrus.DEBUG, "kinesis_err", logrus.LogInfo{"vin": entry.Vin, "record_type": entry.TxType, "txid": entry.Txid, "shard_id": *kinesisRecordOutput.ShardId, "sequence_number": *kinesisRecordOutput.SequenceNumber}) + p.logger.Log(logrus.DEBUG, "kinesis_message_dispatched", logrus.LogInfo{"vin": entry.Vin, "record_type": entry.TxType, "txid": entry.Txid, "shard_id": *kinesisRecordOutput.ShardId, "sequence_number": *kinesisRecordOutput.SequenceNumber}) metricsRegistry.publishCount.Inc(map[string]string{"record_type": entry.TxType}) metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType}) } +// SetReliableAckTxType sets the tx types that need to be acked +func (p *Producer) ProcessReliableAck(entry *telemetry.Record) { + for _, txType := range p.reliableAckTxTypes { + if entry.TxType == txType { + p.ackChan <- entry + break + } + } +} + // ReportError to airbrake and logger func (p *Producer) ReportError(message string, err error, logInfo logrus.LogInfo) { p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo) diff --git a/datastore/simple/logger.go b/datastore/simple/logger.go index 845fec4..e624bd1 100644 --- a/datastore/simple/logger.go +++ b/datastore/simple/logger.go @@ -15,6 +15,10 @@ func NewProtoLogger(logger *logrus.Logger) telemetry.Producer { return &ProtoLogger{logger: logger} } +// SetReliableAckTxType no-op for logger datastore +func (p *ProtoLogger) ProcessReliableAck(entry *telemetry.Record) { +} + // Produce sends the data to the logger func (p *ProtoLogger) Produce(entry *telemetry.Record) { data, err := entry.GetJSONPayload() diff --git a/datastore/zmq/zmq.go b/datastore/zmq/zmq.go index 25bdfc5..7aa43ad 100644 --- a/datastore/zmq/zmq.go +++ b/datastore/zmq/zmq.go @@ -59,11 +59,13 @@ const MonitorSocketAddr = "inproc://zmq_socket_monitor.rep" // ZMQProducer implements the telemetry.Producer interface by publishing to a // bound zmq socket. type ZMQProducer struct { - namespace string - ctx context.Context - sock *zmq4.Socket - logger *logrus.Logger - airbrakeHandler *airbrake.AirbrakeHandler + namespace string + ctx context.Context + sock *zmq4.Socket + logger *logrus.Logger + airbrakeHandler *airbrake.AirbrakeHandler + ackChan chan (*telemetry.Record) + reliableAckTxTypes []string } // Publish the record to the socket. @@ -71,13 +73,16 @@ func (p *ZMQProducer) Produce(rec *telemetry.Record) { if p.ctx.Err() != nil { return } - if nBytes, err := p.sock.SendMessage(telemetry.BuildTopicName(p.namespace, rec.TxType), rec.Payload()); err != nil { + nBytes, err := p.sock.SendMessage(telemetry.BuildTopicName(p.namespace, rec.TxType), rec.Payload()) + if err != nil { metricsRegistry.errorCount.Inc(map[string]string{"record_type": rec.TxType}) p.ReportError("zmq_dispatch_error", err, nil) + return } else { - metricsRegistry.byteTotal.Add(int64(nBytes), map[string]string{"record_type": rec.TxType}) - metricsRegistry.publishCount.Inc(map[string]string{"record_type": rec.TxType}) + p.ProcessReliableAck(rec) } + metricsRegistry.byteTotal.Add(int64(nBytes), map[string]string{"record_type": rec.TxType}) + metricsRegistry.publishCount.Inc(map[string]string{"record_type": rec.TxType}) } // ReportError to airbrake and logger @@ -97,8 +102,18 @@ func (p *ZMQProducer) Close() error { return nil } +// SetReliableAckTxType sets the tx types that need to be acked +func (p *ZMQProducer) ProcessReliableAck(entry *telemetry.Record) { + for _, txType := range p.reliableAckTxTypes { + if entry.TxType == txType { + p.ackChan <- entry + break + } + } +} + // NewProducer creates a ZMQProducer with the given config. -func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricCollector, namespace string, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (producer telemetry.Producer, err error) { +func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricCollector, namespace string, airbrakeHandler *airbrake.AirbrakeHandler, ackChan chan (*telemetry.Record), reliableAckTxTypes []string, logger *logrus.Logger) (producer telemetry.Producer, err error) { registerMetricsOnce(metrics) sock, err := zmq4.NewSocket(zmq4.PUB) if err != nil { @@ -159,6 +174,7 @@ func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricColl sock: sock, logger: logger, airbrakeHandler: airbrakeHandler, + ackChan: ackChan, }, nil } diff --git a/server/streaming/server.go b/server/streaming/server.go index 00dc5e0..ce620b4 100644 --- a/server/streaming/server.go +++ b/server/streaming/server.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "fmt" "net/http" + "sync" "time" "github.com/google/uuid" @@ -14,6 +15,7 @@ import ( logrus "github.com/teslamotors/fleet-telemetry/logger" "github.com/teslamotors/fleet-telemetry/messages" "github.com/teslamotors/fleet-telemetry/metrics" + "github.com/teslamotors/fleet-telemetry/metrics/adapter" "github.com/teslamotors/fleet-telemetry/server/airbrake" "github.com/teslamotors/fleet-telemetry/telemetry" ) @@ -25,8 +27,19 @@ var ( ReadBufferSize: 1024, WriteBufferSize: 1024, } + + serverMetricsRegistry ServerMetrics + serverMetricsOnce sync.Once ) +// Metrics stores metrics reported from this package +type ServerMetrics struct { + reliableAckCount adapter.Counter + reliableAckMissCount adapter.Counter +} + +var () + // Server stores server resources type Server struct { // DispatchRules is a mapping of topics (records type) to their dispatching methods (loaded from Records json) @@ -36,34 +49,47 @@ type Server struct { // Metrics collects metrics for the application metricsCollector metrics.MetricCollector - reliableAck bool - airbrakeHandler *airbrake.AirbrakeHandler + + registry *SocketRegistry + + ackChan chan (*telemetry.Record) } // InitServer initializes the main server func InitServer(c *config.Config, airbrakeHandler *airbrake.AirbrakeHandler, producerRules map[string][]telemetry.Producer, logger *logrus.Logger, registry *SocketRegistry) (*http.Server, *Server, error) { - reliableAck := false - if c.Kafka != nil { - reliableAck = c.ReliableAck - } socketServer := &Server{ DispatchRules: producerRules, metricsCollector: c.MetricCollector, - reliableAck: reliableAck, logger: logger, airbrakeHandler: airbrakeHandler, + registry: registry, + ackChan: c.AckChan, } mux := http.NewServeMux() - mux.HandleFunc("/", socketServer.ServeBinaryWs(c, registry)) + mux.HandleFunc("/", socketServer.ServeBinaryWs(c)) mux.Handle("/status", socketServer.airbrakeHandler.WithReporting(http.HandlerFunc(socketServer.Status()))) server := &http.Server{Addr: fmt.Sprintf("%v:%v", c.Host, c.Port), Handler: serveHTTPWithLogs(mux, logger)} + go socketServer.handleAcks() return server, socketServer, nil } +func (s *Server) handleAcks() { + for record := range s.ackChan { + if record.Serializer != nil { + if socket := s.registry.GetSocket(record.SocketID); socket != nil { + serverMetricsRegistry.reliableAckCount.Inc(map[string]string{"record_type": record.TxType}) + socket.respondToVehicle(record, nil) + } else { + serverMetricsRegistry.reliableAckMissCount.Inc(map[string]string{"record_type": record.TxType}) + } + } + } +} + // serveHTTPWithLogs wraps a handler and logs the request func serveHTTPWithLogs(h http.Handler, logger *logrus.Logger) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -89,7 +115,7 @@ func (s *Server) Status() func(w http.ResponseWriter, r *http.Request) { } // ServeBinaryWs serves a http query and upgrades it to a websocket -- only serves binary data coming from the ws -func (s *Server) ServeBinaryWs(config *config.Config, registry *SocketRegistry) func(w http.ResponseWriter, r *http.Request) { +func (s *Server) ServeBinaryWs(config *config.Config) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { if ws := s.promoteToWebsocket(w, r); ws != nil { ctx := context.WithValue(context.Background(), SocketContext, map[string]interface{}{"request": r}) @@ -99,10 +125,10 @@ func (s *Server) ServeBinaryWs(config *config.Config, registry *SocketRegistry) } socketManager := NewSocketManager(ctx, requestIdentity, ws, config, s.logger) - registry.RegisterSocket(socketManager) - defer registry.DeregisterSocket(socketManager) + s.registry.RegisterSocket(socketManager) + defer s.registry.DeregisterSocket(socketManager) - binarySerializer := telemetry.NewBinarySerializer(requestIdentity, s.DispatchRules, s.reliableAck, s.logger) + binarySerializer := telemetry.NewBinarySerializer(requestIdentity, s.DispatchRules, s.logger) socketManager.ProcessTelemetry(binarySerializer) } } @@ -145,3 +171,22 @@ func extractCertFromHeaders(ctx context.Context, r *http.Request) (*x509.Certifi return r.TLS.PeerCertificates[nbCerts-1], nil } + +func registerServerMetricsOnce(metricsCollector metrics.MetricCollector) { + serverMetricsOnce.Do(func() { registerServerMetrics(metricsCollector) }) +} + +func registerServerMetrics(metricsCollector metrics.MetricCollector) { + + serverMetricsRegistry.reliableAckCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ + Name: "reliable_ack", + Help: "The number of reliable acknowledgements.", + Labels: []string{"record_type"}, + }) + + serverMetricsRegistry.reliableAckMissCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ + Name: "reliable_ack_miss", + Help: "The number of missing reliable acknowledgements.", + Labels: []string{"record_type"}, + }) +} diff --git a/server/streaming/socket.go b/server/streaming/socket.go index cc0a7e7..f420ecc 100644 --- a/server/streaming/socket.go +++ b/server/streaming/socket.go @@ -67,11 +67,6 @@ type Metrics struct { socketErrorCount adapter.Counter recordSizeBytesTotal adapter.Counter recordCount adapter.Counter - kafkaWriteCount adapter.Counter - kafkaWriteBytesTotal adapter.Counter - kafkaWriteMs adapter.Timer - reliableAckCount adapter.Counter - reliableAckMissCount adapter.Counter } var ( @@ -257,11 +252,16 @@ func (sm *SocketManager) ParseAndProcessRecord(serializer *telemetry.BinarySeria sm.processRecord(record) // respond instantly to the client if we are not doing reliable ACKs - if !serializer.ReliableAck() { + if !sm.reliableAck(record) { sm.respondToVehicle(record, nil) } } +func (sm *SocketManager) reliableAck(record *telemetry.Record) bool { + _, ok := sm.config.ReliableAckSources[record.TxType] + return ok +} + func (sm *SocketManager) processRecord(record *telemetry.Record) { record.Dispatch() metricsRegistry.dispatchCount.Inc(map[string]string{"record_type": record.TxType}) @@ -383,33 +383,4 @@ func registerMetrics(metricsCollector metrics.MetricCollector) { Labels: []string{"record_type"}, }) - metricsRegistry.kafkaWriteCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ - Name: "kafka_write_total", - Help: "The number of writes to Kafka.", - Labels: []string{"record_type"}, - }) - - metricsRegistry.kafkaWriteBytesTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{ - Name: "kafka_write_total_bytes", - Help: "The number of bytes written to Kafka.", - Labels: []string{"record_type"}, - }) - - metricsRegistry.kafkaWriteMs = metricsCollector.RegisterTimer(adapter.CollectorOptions{ - Name: "kafka_write_ms", - Help: "The ms spent writing to Kafka.", - Labels: []string{}, - }) - - metricsRegistry.reliableAckCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ - Name: "reliable_ack", - Help: "The number of reliable acknowledgements.", - Labels: []string{"record_type"}, - }) - - metricsRegistry.reliableAckMissCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ - Name: "reliable_ack_miss", - Help: "The number of missing reliable acknowledgements.", - Labels: []string{"record_type"}, - }) } diff --git a/telemetry/producer.go b/telemetry/producer.go index 16c051c..fbc7656 100644 --- a/telemetry/producer.go +++ b/telemetry/producer.go @@ -30,5 +30,6 @@ func BuildTopicName(namespace, recordName string) string { // Producer handles dispatching data received from the vehicle type Producer interface { Produce(entry *Record) + ProcessReliableAck(entry *Record) ReportError(message string, err error, logInfo logrus.LogInfo) } diff --git a/telemetry/serializer.go b/telemetry/serializer.go index e40aeec..92d9b70 100644 --- a/telemetry/serializer.go +++ b/telemetry/serializer.go @@ -20,17 +20,15 @@ type BinarySerializer struct { DispatchRules map[string][]Producer RequestIdentity *RequestIdentity - logger *logrus.Logger - reliableAck bool + logger *logrus.Logger } // NewBinarySerializer returns a dedicated serializer for a current socket connection -func NewBinarySerializer(requestIdentity *RequestIdentity, dispatchRules map[string][]Producer, reliableAck bool, logger *logrus.Logger) *BinarySerializer { +func NewBinarySerializer(requestIdentity *RequestIdentity, dispatchRules map[string][]Producer, logger *logrus.Logger) *BinarySerializer { return &BinarySerializer{ DispatchRules: dispatchRules, RequestIdentity: requestIdentity, logger: logger, - reliableAck: reliableAck, } } @@ -93,11 +91,6 @@ func (bs *BinarySerializer) Dispatch(record *Record) { } } -// ReliableAck returns true if serializer supports reliable acks (only ack to car once datastore acked the data) -func (bs *BinarySerializer) ReliableAck() bool { - return bs.reliableAck -} - // Logger returns logger for the serializer func (bs *BinarySerializer) Logger() *logrus.Logger { return bs.logger diff --git a/telemetry/serializer_test.go b/telemetry/serializer_test.go index 07dbaaa..022b30d 100644 --- a/telemetry/serializer_test.go +++ b/telemetry/serializer_test.go @@ -13,20 +13,25 @@ import ( ) type CallbackTester struct { - counter int - errors int + counter int + errors int + reliableAck int } func (c *CallbackTester) Produce(entry *telemetry.Record) { c.counter++ } +func (c *CallbackTester) ProcessReliableAck(entry *telemetry.Record) { + c.reliableAck++ +} + func (c *CallbackTester) ReportError(message string, err error, logInfo logrus.LogInfo) { c.errors++ } var _ = Describe("BinarySerializer", func() { - DispatchKafkaGlobal := &CallbackTester{counter: 0, errors: 0} + DispatchKafkaGlobal := &CallbackTester{counter: 0, errors: 0, reliableAck: 0} DispatchRules := map[string][]telemetry.Producer{ "T": {DispatchKafkaGlobal}, "D7": {DispatchKafkaGlobal},