From 6c0f5832b9fab160fe2502dfeaed1fd8faa4422d Mon Sep 17 00:00:00 2001 From: Maksim Lebedev Date: Sun, 4 Sep 2022 21:30:27 +0400 Subject: [PATCH 1/4] Improve retry logic --- server/appconfig/appconfig.go | 17 ++++++ server/appconfig/config.template.yaml | 9 +++ server/config/destination_configs.go | 1 + server/events/native_queue.go | 42 ++++++++----- server/events/queue_factory.go | 16 ++--- server/main.go | 7 +-- server/storages/factory.go | 8 ++- server/storages/mock_factory.go | 2 +- server/storages/streaming.go | 86 +++++++++++++-------------- 9 files changed, 114 insertions(+), 74 deletions(-) diff --git a/server/appconfig/appconfig.go b/server/appconfig/appconfig.go index 650c5410b..07c75a261 100644 --- a/server/appconfig/appconfig.go +++ b/server/appconfig/appconfig.go @@ -35,6 +35,9 @@ type AppConfig struct { EmptyGIFPixelOnexOne []byte + StreamingRetryDelay int + ErrorRetryPeriod int + UaResolver useragent.Resolver AuthorizationService *authorization.Service @@ -93,6 +96,8 @@ func setDefaultParams(containerized bool) { viper.SetDefault("server.max_columns", 100) viper.SetDefault("server.max_event_size", 51200) viper.SetDefault("server.configurator_urn", "/configurator") + viper.SetDefault("server.streaming_retry_delay_minutes", 1) + viper.SetDefault("server.error_retry_period_hours", 24) //unique IDs viper.SetDefault("server.fields_configuration.unique_id_field", "/eventn_ctx/event_id||/eventn_ctx_event_id||/event_id") viper.SetDefault("server.fields_configuration.user_agent_path", "/eventn_ctx/user_agent||/user_agent") @@ -402,6 +407,18 @@ func Init(containerized bool, dockerHubID string) error { enrichWithHTTPContext := viper.GetBool("server.event_enrichment.http_context") appConfig.EnrichWithHTTPContext = enrichWithHTTPContext + appConfig.StreamingRetryDelay = viper.GetInt("server.streaming_retry_delay_minutes") + if appConfig.StreamingRetryDelay < 0 { + return fmt.Errorf("server.streaming_retry_delay_minutes expects to be positive value") + } + logging.Infof("server.streaming_retry_delay_minutes: %d", appConfig.StreamingRetryDelay) + + appConfig.ErrorRetryPeriod = viper.GetInt("server.error_retry_period_hours") + if appConfig.ErrorRetryPeriod < 0 { + return fmt.Errorf("server.error_retry_period_hours expects to be positive value") + } + logging.Infof("server.error_retry_period_hours: %d", appConfig.ErrorRetryPeriod) + Instance = &appConfig return nil } diff --git a/server/appconfig/config.template.yaml b/server/appconfig/config.template.yaml index b846db492..d972d3a08 100644 --- a/server/appconfig/config.template.yaml +++ b/server/appconfig/config.template.yaml @@ -32,6 +32,11 @@ server: ### Sources reloading. If 'sources' key is http or file:/// URL than it will be reloaded every sources_reload_sec #sources_reload_sec: 1 #Optional. Default value is 1. + # Interval to retry failed event. Optional. Default value is 1 + streaming_retry_delay_minutes: 1 + # Clear failed event after retry period. Optional. Default value is 24 + error_retry_period_hours: 24 + ### Application metrics ### At present only Prometheus is supported. Read more about application metrics https://jitsu.com/docs/other-features/application-metrics # metrics: @@ -126,6 +131,7 @@ destinations: # type: varchar(256) #SQL type # column_type: varchar(256) encode zstd # max_columns: 100 # Optional. The limit of the count of columns. +# error_retry_period_hours: 24 # Optional. Clear failed event after retry period. # table_name_template: '{{.event_type}}_{{._timestamp.Format "2006_01"}}' #Optional. Default value constant is 'events'. Template for extracting table name # ### BigQuery https://jitsu.com/docs/destinations-configuration/bigquery @@ -138,6 +144,7 @@ destinations: # key_file: /home/eventnative/data/config/bqkey.json # or json string of key e.g. "{"service_account":...}" # data_layout: # max_columns: 100 # Optional. The limit of the count of columns. +# error_retry_period_hours: 24 # Optional. Clear failed event after retry period. # table_name_template: 'my_events' #Optional. Default value constant is 'events'. Template for extracting table name ### Postgres https://jitsu.com/docs/destinations-configuration/postgres @@ -156,6 +163,7 @@ destinations: # connect_timeout: 300 # data_layout: # max_columns: 100 # Optional. The limit of the count of columns. +# error_retry_period_hours: 24 # Optional. Clear failed event after retry period. # table_name_template: 'my_events' #Optional. Default value constant is 'events'. Template for extracting table name # #Required for Postgres users recognition feature. # primary_key_fields: @@ -207,6 +215,7 @@ destinations: # endpoint: #Optional. Default value is AWS s3 endpoint. If you use DigitalOcean spaces or others - specify your endpoint # data_layout: # max_columns: 100 # Optional. The limit of the count of columns. +# error_retry_period_hours: 24 # Optional. Clear failed event after retry period. # table_name_template: '{{.event_type}}_{{._timestamp.Format "2006_01"}}' #Template will be used for file naming ### Snowflake https://jitsu.com/docs/destinations-configuration/snowflake diff --git a/server/config/destination_configs.go b/server/config/destination_configs.go index fba4fd67f..f12216124 100644 --- a/server/config/destination_configs.go +++ b/server/config/destination_configs.go @@ -98,6 +98,7 @@ type DataLayout struct { //Deprecated Mappings *Mapping `mapstructure:"mappings" json:"mappings,omitempty" yaml:"mappings,omitempty"` MaxColumns int `mapstructure:"max_columns" json:"max_columns,omitempty" yaml:"max_columns,omitempty"` + ErrorRetryPeriod int `mapstructure:"error_retry_period_hours" json:"error_retry_period_hours,omitempty" yaml:"error_retry_period_hours,omitempty"` TableNameTemplate string `mapstructure:"table_name_template" json:"table_name_template,omitempty" yaml:"table_name_template,omitempty"` PrimaryKeyFields []string `mapstructure:"primary_key_fields" json:"primary_key_fields,omitempty" yaml:"primary_key_fields,omitempty"` UniqueIDField string `mapstructure:"unique_id_field" json:"unique_id_field,omitempty" yaml:"unique_id_field,omitempty"` diff --git a/server/events/native_queue.go b/server/events/native_queue.go index f6229ebe9..52cc4d973 100644 --- a/server/events/native_queue.go +++ b/server/events/native_queue.go @@ -27,11 +27,14 @@ type NativeQueue struct { identifier string queue queue.Queue + streamingRetryDelay int + errorRetryPeriod int + metricsReporter internal.MetricReporter closed chan struct{} } -func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue queue.Queue) (Queue, error) { +func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue queue.Queue, streamingRetryDelay int, errorRetryPeriod int) (Queue, error) { var metricsReporter internal.MetricReporter if underlyingQueue.Type() == queue.RedisType { metricsReporter = &internal.SharedQueueMetricReporter{} @@ -42,12 +45,14 @@ func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue que metricsReporter.SetMetrics(subsystem, identifier, int(underlyingQueue.Size()), int(underlyingQueue.BufferSize())) nq := &NativeQueue{ - queue: underlyingQueue, - namespace: namespace, - subsystem: subsystem, - identifier: identifier, - metricsReporter: metricsReporter, - closed: make(chan struct{}, 1), + queue: underlyingQueue, + namespace: namespace, + subsystem: subsystem, + identifier: identifier, + streamingRetryDelay: streamingRetryDelay, + errorRetryPeriod: errorRetryPeriod, + metricsReporter: metricsReporter, + closed: make(chan struct{}, 1), } safego.Run(nq.startMonitor) @@ -71,13 +76,16 @@ func (q *NativeQueue) startMonitor() { } func (q *NativeQueue) Consume(f map[string]interface{}, tokenID string) { - q.ConsumeTimed(f, timestamp.Now().UTC(), tokenID) + initial := timestamp.Now().UTC() + finish := initial.Add(time.Duration(q.errorRetryPeriod) * time.Hour) + q.ConsumeTimed(f, initial, finish, tokenID) } -func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time, tokenID string) { +func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string) { te := &TimedEvent{ Payload: payload, - DequeuedTime: t, + DequeuedTime: dequeuedTime, + FinishTime: finishTime, TokenID: tokenID, } @@ -89,24 +97,28 @@ func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time, q.metricsReporter.EnqueuedEvent(q.subsystem, q.identifier) } -func (q *NativeQueue) DequeueBlock() (Event, time.Time, string, error) { +func (q *NativeQueue) DequeueBlock() (Event, time.Time, time.Time, string, error) { ite, err := q.queue.Pop() if err != nil { if err == queue.ErrQueueClosed { - return nil, time.Time{}, "", ErrQueueClosed + return nil, time.Time{}, time.Time{}, "", ErrQueueClosed } - return nil, time.Time{}, "", err + return nil, time.Time{}, time.Time{}, "", err } q.metricsReporter.DequeuedEvent(q.subsystem, q.identifier) te, ok := ite.(*TimedEvent) if !ok { - return nil, time.Time{}, "", fmt.Errorf("wrong type of event dto in queue. Expected: *TimedEvent, actual: %T (%s)", ite, ite) + return nil, time.Time{}, time.Time{}, "", fmt.Errorf("wrong type of event dto in queue. Expected: *TimedEvent, actual: %T (%s)", ite, ite) } - return te.Payload, te.DequeuedTime, te.TokenID, nil + return te.Payload, te.DequeuedTime, te.FinishTime, te.TokenID, nil +} + +func (q *NativeQueue) GetDelay() int { + return q.streamingRetryDelay } //Close closes underlying queue diff --git a/server/events/queue_factory.go b/server/events/queue_factory.go index 58b05e2f7..21212b9e5 100644 --- a/server/events/queue_factory.go +++ b/server/events/queue_factory.go @@ -14,6 +14,7 @@ import ( type TimedEvent struct { Payload map[string]interface{} DequeuedTime time.Time + FinishTime time.Time TokenID string } @@ -27,19 +28,20 @@ func (d *DummyQueue) Close() error { func (d *DummyQueue) Consume(f map[string]interface{}, tokenID string) { } -func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string) { +func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string) { } -func (d *DummyQueue) DequeueBlock() (Event, time.Time, string, error) { - return nil, time.Time{}, "", fmt.Errorf("DequeueBlock not supported on DummyQueue") +func (d *DummyQueue) DequeueBlock() (Event, time.Time, time.Time, string, error) { + return nil, time.Time{}, time.Time{}, "", fmt.Errorf("DequeueBlock not supported on DummyQueue") } //Queue is an events queue. Possible implementations (dque, leveldbqueue, native) type Queue interface { io.Closer Consume(f map[string]interface{}, tokenID string) - ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string) - DequeueBlock() (Event, time.Time, string, error) + ConsumeTimed(f map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string) + DequeueBlock() (Event, time.Time, time.Time, string, error) + GetDelay() int } type QueueFactory struct { @@ -51,7 +53,7 @@ func NewQueueFactory(redisPool *meta.RedisPool, redisReadTimeout time.Duration) return &QueueFactory{redisPool: redisPool, redisReadTimeout: redisReadTimeout} } -func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string) (Queue, error) { +func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string, streamingRetryDelay int, errorRetryPeriod int) (Queue, error) { var underlyingQueue queue.Queue if qf.redisPool != nil { logging.Infof("[%s] initializing redis events queue", identifier) @@ -60,7 +62,7 @@ func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string) (Queue, logging.Infof("[%s] initializing inmemory events queue", identifier) underlyingQueue = queue.NewInMemory(1_000_000) } - return NewNativeQueue(queue.DestinationNamespace, subsystem, identifier, underlyingQueue) + return NewNativeQueue(queue.DestinationNamespace, subsystem, identifier, underlyingQueue, streamingRetryDelay, errorRetryPeriod) } func (qf *QueueFactory) CreateHTTPQueue(identifier string, serializationModelBuilder func() interface{}) queue.Queue { diff --git a/server/main.go b/server/main.go index 053670468..35319340c 100644 --- a/server/main.go +++ b/server/main.go @@ -7,7 +7,6 @@ import ( "errors" "flag" "fmt" - "github.com/jitsucom/jitsu/server/script" "math/rand" "net/http" "os" @@ -19,9 +18,6 @@ import ( "syscall" "time" - "github.com/jitsucom/jitsu/server/script/node" - "github.com/jitsucom/jitsu/server/templates" - "github.com/gin-gonic/gin/binding" "github.com/jitsucom/jitsu/server/airbyte" "github.com/jitsucom/jitsu/server/appconfig" @@ -49,12 +45,15 @@ import ( "github.com/jitsucom/jitsu/server/safego" "github.com/jitsucom/jitsu/server/scheduling" "github.com/jitsucom/jitsu/server/schema" + "github.com/jitsucom/jitsu/server/script" + "github.com/jitsucom/jitsu/server/script/node" "github.com/jitsucom/jitsu/server/singer" "github.com/jitsucom/jitsu/server/sources" "github.com/jitsucom/jitsu/server/storages" "github.com/jitsucom/jitsu/server/synchronization" "github.com/jitsucom/jitsu/server/system" "github.com/jitsucom/jitsu/server/telemetry" + "github.com/jitsucom/jitsu/server/templates" "github.com/jitsucom/jitsu/server/timestamp" "github.com/jitsucom/jitsu/server/users" "github.com/jitsucom/jitsu/server/uuid" diff --git a/server/storages/factory.go b/server/storages/factory.go index bf42ffee4..fe247eb65 100644 --- a/server/storages/factory.go +++ b/server/storages/factory.go @@ -158,6 +158,8 @@ func (f *FactoryImpl) Configure(destinationID string, destination config.Destina pkFields := map[string]bool{} maxColumns := f.maxColumns + streamingRetryDelay := appconfig.Instance.StreamingRetryDelay + errorRetryPeriod := appconfig.Instance.ErrorRetryPeriod uniqueIDField := appconfig.Instance.GlobalUniqueIDField if destination.DataLayout != nil { for _, field := range destination.DataLayout.PrimaryKeyFields { @@ -167,6 +169,10 @@ func (f *FactoryImpl) Configure(destinationID string, destination config.Destina maxColumns = destination.DataLayout.MaxColumns logging.Infof("[%s] uses max_columns setting: %d", destinationID, maxColumns) } + if destination.DataLayout.ErrorRetryPeriod > 0 { + errorRetryPeriod = destination.DataLayout.ErrorRetryPeriod + logging.Infof("[%s] uses error_retry_period_hours setting: %d", destinationID, errorRetryPeriod) + } if destination.DataLayout.UniqueIDField != "" { uniqueIDField = identifiers.NewUniqueID(destination.DataLayout.UniqueIDField) } @@ -185,7 +191,7 @@ func (f *FactoryImpl) Configure(destinationID string, destination config.Destina var eventQueue events.Queue if destination.Mode != SynchronousMode { - eventQueue, err = f.eventsQueueFactory.CreateEventsQueue(destination.Type, destinationID) + eventQueue, err = f.eventsQueueFactory.CreateEventsQueue(destination.Type, destinationID, streamingRetryDelay, errorRetryPeriod) if err != nil { return nil, nil, err } diff --git a/server/storages/mock_factory.go b/server/storages/mock_factory.go index 829a51f5e..97f2293c3 100644 --- a/server/storages/mock_factory.go +++ b/server/storages/mock_factory.go @@ -54,7 +54,7 @@ func (mf *MockFactory) Create(id string, destination config.DestinationConfig) ( var eventQueue events.Queue if destination.Mode == StreamMode { qf := events.NewQueueFactory(nil, 0) - eventQueue, _ = qf.CreateEventsQueue(destination.Type, id) + eventQueue, _ = qf.CreateEventsQueue(destination.Type, id, appconfig.Instance.StreamingRetryDelay, appconfig.Instance.ErrorRetryPeriod) } return &testProxyMock{mode: destination.Mode}, eventQueue, nil } diff --git a/server/storages/streaming.go b/server/storages/streaming.go index 7cb2f13a5..7327eaee1 100644 --- a/server/storages/streaming.go +++ b/server/storages/streaming.go @@ -1,6 +1,10 @@ package storages import ( + "fmt" + "math/rand" + "time" + "github.com/jitsucom/jitsu/server/adapters" "github.com/jitsucom/jitsu/server/appconfig" "github.com/jitsucom/jitsu/server/errorj" @@ -11,8 +15,6 @@ import ( "github.com/jitsucom/jitsu/server/timestamp" "github.com/jitsucom/jitsu/server/utils" "go.uber.org/atomic" - "math/rand" - "time" ) //StreamingStorage supports Insert operation @@ -62,7 +64,7 @@ func (sw *StreamingWorker) start() { break } - fact, dequeuedTime, tokenID, err := sw.eventQueue.DequeueBlock() + fact, dequeuedTime, finishTime, tokenID, err := sw.eventQueue.DequeueBlock() if err != nil { if err == events.ErrQueueClosed && sw.closed.Load() { continue @@ -74,7 +76,7 @@ func (sw *StreamingWorker) start() { //dequeued event was from retry call and retry timeout hasn't come if timestamp.Now().Before(dequeuedTime) { - sw.eventQueue.ConsumeTimed(fact, dequeuedTime, tokenID) + sw.eventQueue.ConsumeTimed(fact, dequeuedTime, finishTime, tokenID) continue } _, recognizedEvent := fact[schema.JitsuUserRecognizedEvent] @@ -83,6 +85,9 @@ func (sw *StreamingWorker) start() { continue } + // Configuration parameter expected in minues + seconds_delay := 60 * sw.eventQueue.GetDelay() + //is used in writing counters/metrics/events cache preliminaryEventContext := &adapters.EventContext{ CacheDisabled: sw.streamingStorage.IsCachingDisabled(), @@ -133,50 +138,39 @@ func (sw *StreamingWorker) start() { Table: table, RecognizedEvent: recognizedEvent, } + + var actionName string + var processedError error + var failedMessage string if recognizedEvent { - if updateErr := sw.streamingStorage.Update(eventContext); updateErr != nil { - err := errorj.Decorate(updateErr, "failed to update event"). - WithProperty(errorj.DestinationID, sw.streamingStorage.ID()). - WithProperty(errorj.DestinationType, sw.streamingStorage.Type()) - - var retryInfoInLog string - retry := IsConnectionError(err) - if retry { - retryInfoInLog = "connection problem. event will be re-updated after 20 seconds\n" - } - if errorj.IsSystemError(err) { - logging.SystemErrorf("%+v\n%sorigin event: %s", err, retryInfoInLog, flattenObject.DebugString()) - } else { - logging.Errorf("%+v\n%sorigin event: %s", err, retryInfoInLog, flattenObject.DebugString()) - } - - if retry { - //retry - sw.eventQueue.ConsumeTimed(fact, timestamp.Now().Add(20*time.Second), tokenID) - } - } + actionName = "updated" + failedMessage = "failed to update event" + processedError = sw.streamingStorage.Update(eventContext) } else { - if insertErr := sw.streamingStorage.Insert(eventContext); insertErr != nil { - err := errorj.Decorate(insertErr, "failed to insert event"). - WithProperty(errorj.DestinationID, sw.streamingStorage.ID()). - WithProperty(errorj.DestinationType, sw.streamingStorage.Type()) - - var retryInfoInLog string - retry := IsConnectionError(err) - if retry { - retryInfoInLog = "connection problem. event will be re-inserted after 20 seconds\n" - } - if errorj.IsSystemError(err) { - logging.SystemErrorf("%+v\n%sorigin event: %s", err, retryInfoInLog, flattenObject.DebugString()) - } else { - logging.Errorf("%+v\n%sorigin event: %s", err, retryInfoInLog, flattenObject.DebugString()) - } - - if retry { - //retry - sw.eventQueue.ConsumeTimed(fact, timestamp.Now().Add(20*time.Second), tokenID) - } - } + actionName = "inserted" + failedMessage = "failed to insert event" + processedError = sw.streamingStorage.Insert(eventContext) + } + + if processedError == nil { + continue + } + + err := errorj.Decorate(processedError, failedMessage). + WithProperty(errorj.DestinationID, sw.streamingStorage.ID()). + WithProperty(errorj.DestinationType, sw.streamingStorage.Type()) + + retryInfoInLog := fmt.Sprintf("event will be re-%s after %d seconds\n", actionName, seconds_delay) + if errorj.IsSystemError(err) { + logging.SystemErrorf("%+v\n%sorigin event: %s", err, retryInfoInLog, flattenObject.DebugString()) + } else { + logging.Errorf("%+v\n%sorigin event: %s", err, retryInfoInLog, flattenObject.DebugString()) + } + + // Retry event if it is not late + retryTime := timestamp.Now().Add(time.Duration(seconds_delay) * time.Second) + if retryTime.Before(finishTime) { + sw.eventQueue.ConsumeTimed(fact, retryTime, finishTime, tokenID) } } } From f90c0da2e60041b014750e99b8719a21bf50e907 Mon Sep 17 00:00:00 2001 From: Maksim Lebedev Date: Sat, 17 Sep 2022 19:47:07 +0400 Subject: [PATCH 2/4] Logging retired events --- server/adapters/http.go | 23 ++++++----- server/logevents/factory.go | 17 +++++++- server/storages/abstract.go | 76 ++++++++++++++++++------------------ server/storages/streaming.go | 9 ++++- server/storages/sync.go | 11 ++++-- 5 files changed, 78 insertions(+), 58 deletions(-) diff --git a/server/adapters/http.go b/server/adapters/http.go index 8182f5531..9f84e2ab2 100644 --- a/server/adapters/http.go +++ b/server/adapters/http.go @@ -4,6 +4,11 @@ import ( "bytes" "encoding/json" "fmt" + "io/ioutil" + "math" + "net/http" + "time" + "github.com/jitsucom/jitsu/server/events" "github.com/jitsucom/jitsu/server/logging" "github.com/jitsucom/jitsu/server/queue" @@ -11,10 +16,6 @@ import ( "github.com/jitsucom/jitsu/server/timestamp" "github.com/panjf2000/ants/v2" "go.uber.org/atomic" - "io/ioutil" - "math" - "net/http" - "time" ) //HTTPAdapterConfiguration is a dto for creating HTTPAdapter @@ -26,7 +27,7 @@ type HTTPAdapterConfiguration struct { QueueFactory *events.QueueFactory PoolWorkers int DebugLogger *logging.QueryLogger - ErrorHandler func(fallback bool, eventContext *EventContext, err error) + ErrorHandler func(eventContext *EventContext, err error) SuccessHandler func(eventContext *EventContext) } @@ -51,7 +52,7 @@ type HTTPAdapter struct { debugLogger *logging.QueryLogger httpReqFactory HTTPRequestFactory - errorHandler func(fallback bool, eventContext *EventContext, err error) + errorHandler func(eventContext *EventContext, err error) successHandler func(eventContext *EventContext) destinationID string @@ -122,7 +123,7 @@ func (h *HTTPAdapter) startObserver() { if timestamp.Now().UTC().Before(retryableRequest.DequeuedTime) { if err := h.queue.AddRequest(retryableRequest); err != nil { logging.SystemErrorf("[%s] Error enqueueing HTTP request after dequeuing: %v", h.destinationID, err) - h.errorHandler(true, retryableRequest.EventContext, err) + h.errorHandler(retryableRequest.EventContext, err) } continue @@ -134,7 +135,7 @@ func (h *HTTPAdapter) startObserver() { if err := h.queue.AddRequest(retryableRequest); err != nil { logging.SystemErrorf("[%s] Error enqueueing HTTP request after invoking: %v", h.destinationID, err) - h.errorHandler(true, retryableRequest.EventContext, err) + h.errorHandler(retryableRequest.EventContext, err) } } } else { @@ -196,9 +197,7 @@ func (h *HTTPAdapter) doRetry(retryableRequest *RetryableRequest, sendErr error) retryableRequest.DequeuedTime = timestamp.Now().UTC().Add(delay) if err := h.queue.AddRequest(retryableRequest); err != nil { logging.SystemErrorf("[%s] Error enqueueing HTTP request after sending: %v", h.destinationID, err) - h.errorHandler(true, retryableRequest.EventContext, sendErr) - } else { - h.errorHandler(false, retryableRequest.EventContext, sendErr) + h.errorHandler(retryableRequest.EventContext, sendErr) } return } @@ -207,7 +206,7 @@ func (h *HTTPAdapter) doRetry(retryableRequest *RetryableRequest, sendErr error) headersJSON, _ := json.Marshal(retryableRequest.Request.Headers) logging.Errorf("[%s] Error sending HTTP request URL: [%s] Method: [%s] Body: [%s] Headers: [%s]: %v", h.destinationID, retryableRequest.Request.URL, retryableRequest.Request.Method, string(retryableRequest.Request.Body), headersJSON, sendErr) - h.errorHandler(true, retryableRequest.EventContext, sendErr) + h.errorHandler(retryableRequest.EventContext, sendErr) } func (h *HTTPAdapter) doRequest(req *Request) error { diff --git a/server/logevents/factory.go b/server/logevents/factory.go index c505df873..3a5f6df35 100644 --- a/server/logevents/factory.go +++ b/server/logevents/factory.go @@ -1,15 +1,17 @@ package logevents import ( - "github.com/jitsucom/jitsu/server/logging" "io" "path" + + "github.com/jitsucom/jitsu/server/logging" ) const ( ArchiveDir = "archive" FailedDir = "failed" IncomingDir = "incoming" + RetireDir = "retired" ) type Factory struct { @@ -101,6 +103,19 @@ func (f *Factory) CreateSQLQueryLogger(destinationName string) *logging.QueryLog return logging.NewQueryLogger(destinationName, f.ddlLogsWriter, f.queryLogsWriter) } +func (f *Factory) CreateRetireLogger(destinationName string) logging.ObjectLogger { + retireWriter := logging.NewRollingWriter(&logging.Config{ + FileName: "retire.dst=" + destinationName, + FileDir: path.Join(f.logEventPath, RetireDir), + RotationMin: f.logRotationMin, + RotateOnClose: true, + }) + if f.asyncLoggers { + return NewAsyncLogger(retireWriter, false, f.asyncLoggerPoolSize) + } + return NewSyncLogger(retireWriter, false) +} + func (f *Factory) CreateStreamingArchiveLogger(destinationName string) logging.ObjectLogger { archiveWriter := logging.NewRollingWriter(&logging.Config{ FileName: "streaming-archive.dst=" + destinationName, diff --git a/server/storages/abstract.go b/server/storages/abstract.go index c6fd17524..7d943f350 100644 --- a/server/storages/abstract.go +++ b/server/storages/abstract.go @@ -2,26 +2,24 @@ package storages import ( "fmt" - "github.com/jitsucom/jitsu/server/appconfig" - "github.com/jitsucom/jitsu/server/enrichment" - "github.com/jitsucom/jitsu/server/errorj" - "github.com/jitsucom/jitsu/server/timestamp" - "github.com/jitsucom/jitsu/server/typing" "math/rand" - "github.com/jitsucom/jitsu/server/config" - "github.com/jitsucom/jitsu/server/logging" - - "github.com/jitsucom/jitsu/server/identifiers" - "github.com/hashicorp/go-multierror" "github.com/jitsucom/jitsu/server/adapters" + "github.com/jitsucom/jitsu/server/appconfig" "github.com/jitsucom/jitsu/server/caching" + "github.com/jitsucom/jitsu/server/config" "github.com/jitsucom/jitsu/server/counters" + "github.com/jitsucom/jitsu/server/enrichment" + "github.com/jitsucom/jitsu/server/errorj" "github.com/jitsucom/jitsu/server/events" + "github.com/jitsucom/jitsu/server/identifiers" + "github.com/jitsucom/jitsu/server/logging" "github.com/jitsucom/jitsu/server/metrics" "github.com/jitsucom/jitsu/server/schema" "github.com/jitsucom/jitsu/server/telemetry" + "github.com/jitsucom/jitsu/server/timestamp" + "github.com/jitsucom/jitsu/server/typing" ) //Abstract is an Abstract destination storage @@ -30,7 +28,6 @@ import ( type Abstract struct { implementation Storage destinationID string - fallbackLogger logging.ObjectLogger eventsCache *caching.EventsCache processor *schema.Processor @@ -44,7 +41,9 @@ type Abstract struct { streamingWorker *StreamingWorker - archiveLogger logging.ObjectLogger + archiveLogger logging.ObjectLogger + fallbackLogger logging.ObjectLogger + retireLogger logging.ObjectLogger } //ID returns destination ID @@ -77,7 +76,7 @@ func (a *Abstract) DryRun(payload events.Event) ([][]adapters.TableField, error) } //ErrorEvent writes error to metrics/counters/telemetry/events cache -func (a *Abstract) ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error) { +func (a *Abstract) ErrorEvent(eventCtx *adapters.EventContext, err error) { metrics.ErrorTokenEvent(eventCtx.TokenID, a.Processor().DestinationType(), a.destinationID) counters.ErrorPushDestinationEvents(a.destinationID, 1) telemetry.Error(eventCtx.TokenID, a.destinationID, eventCtx.Src, "", 1) @@ -85,13 +84,11 @@ func (a *Abstract) ErrorEvent(fallback bool, eventCtx *adapters.EventContext, er //cache a.eventsCache.Error(eventCtx.CacheDisabled, a.ID(), eventCtx.GetSerializedOriginalEvent(), err.Error()) - if fallback { - a.Fallback(&events.FailedEvent{ - Event: []byte(eventCtx.RawEvent.Serialize()), - Error: err.Error(), - EventID: eventCtx.EventID, - }) - } + a.Fallback(&events.FailedEvent{ + Event: []byte(eventCtx.RawEvent.Serialize()), + Error: err.Error(), + EventID: eventCtx.EventID, + }) } //SuccessEvent writes success to metrics/counters/telemetry/events cache @@ -102,6 +99,9 @@ func (a *Abstract) SuccessEvent(eventCtx *adapters.EventContext) { //cache a.eventsCache.Succeed(eventCtx) + + //archive + a.archiveLogger.Consume(eventCtx.RawEvent, eventCtx.TokenID) } //SkipEvent writes skip to metrics/counters/telemetry and error to events cache @@ -120,16 +120,20 @@ func (a *Abstract) Fallback(failedEvents ...*events.FailedEvent) { } } +// Retire logs event with error to retire logger +func (a *Abstract) RetireEvent(failedEvent *events.Event) { + a.retireLogger.ConsumeAny(failedEvent) +} + //Insert ensures table and sends input event to Destination (with 1 retry if error) func (a *Abstract) Insert(eventContext *adapters.EventContext) (insertErr error) { defer func() { if !eventContext.RecognizedEvent { //metrics/counters/cache/fallback - a.AccountResult(eventContext, insertErr) - - //archive - if insertErr == nil { - a.archiveLogger.Consume(eventContext.RawEvent, eventContext.TokenID) + if insertErr != nil { + a.ErrorEvent(eventContext, insertErr) + } else { + a.SuccessEvent(eventContext) } } }() @@ -276,19 +280,6 @@ func (a *Abstract) retryInsert(sqlAdapter adapters.SQLAdapter, tableHelper *Tabl return nil } -//AccountResult checks input error and calls ErrorEvent or SuccessEvent -func (a *Abstract) AccountResult(eventContext *adapters.EventContext, err error) { - if err != nil { - if IsConnectionError(err) { - a.ErrorEvent(false, eventContext, err) - } else { - a.ErrorEvent(true, eventContext, err) - } - } else { - a.SuccessEvent(eventContext) - } -} - //Clean removes all records from storage func (a *Abstract) Clean(tableName string) error { return nil @@ -300,6 +291,11 @@ func (a *Abstract) close() (multiErr error) { multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing streaming worker: %v", a.ID(), err)) } } + if a.retireLogger != nil { + if err := a.retireLogger.Close(); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing retire logger: %v", a.ID(), err)) + } + } if a.fallbackLogger != nil { if err := a.fallbackLogger.Close(); err != nil { multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing fallback logger: %v", a.ID(), err)) @@ -340,12 +336,14 @@ func (a *Abstract) Init(config *Config, impl Storage, preinstalledJavaScript str } func (a *Abstract) Start(config *Config) error { - a.fallbackLogger = config.loggerFactory.CreateFailedLogger(config.destinationID) a.archiveLogger = config.loggerFactory.CreateStreamingArchiveLogger(config.destinationID) + a.fallbackLogger = config.loggerFactory.CreateFailedLogger(config.destinationID) + a.retireLogger = config.loggerFactory.CreateRetireLogger(config.destinationID) if a.streamingWorker != nil { a.streamingWorker.start() } + return nil } diff --git a/server/storages/streaming.go b/server/storages/streaming.go index 7327eaee1..ba9a991a1 100644 --- a/server/storages/streaming.go +++ b/server/storages/streaming.go @@ -24,12 +24,15 @@ type StreamingStorage interface { Insert(eventContext *adapters.EventContext) (err error) Update(eventContext *adapters.EventContext) (err error) + //SuccessEvent writes metrics/counters/events cache, etc SuccessEvent(eventCtx *adapters.EventContext) //ErrorEvent writes metrics/counters/events cache, etc - ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error) + ErrorEvent(eventCtx *adapters.EventContext, err error) //SkipEvent writes metrics/counters/events cache, etc SkipEvent(eventCtx *adapters.EventContext, err error) + //RetireEvent writes metrics/counters/events cache, etc + RetireEvent(eventContext *events.Event) } //StreamingWorker reads events from queue and using events.StreamingStorage writes them @@ -109,7 +112,7 @@ func (sw *StreamingWorker) start() { sw.streamingStorage.SkipEvent(preliminaryEventContext, err) } else { logging.Errorf("[%s] Unable to process object %s: %v", sw.streamingStorage.ID(), fact.DebugString(), err) - sw.streamingStorage.ErrorEvent(true, preliminaryEventContext, err) + sw.streamingStorage.ErrorEvent(preliminaryEventContext, err) } continue @@ -171,6 +174,8 @@ func (sw *StreamingWorker) start() { retryTime := timestamp.Now().Add(time.Duration(seconds_delay) * time.Second) if retryTime.Before(finishTime) { sw.eventQueue.ConsumeTimed(fact, retryTime, finishTime, tokenID) + } else { + sw.streamingStorage.RetireEvent(&fact) } } } diff --git a/server/storages/sync.go b/server/storages/sync.go index c81ef3ca7..d975eba92 100644 --- a/server/storages/sync.go +++ b/server/storages/sync.go @@ -1,6 +1,8 @@ package storages import ( + "math/rand" + "github.com/jitsucom/jitsu/server/adapters" "github.com/jitsucom/jitsu/server/appconfig" "github.com/jitsucom/jitsu/server/errorj" @@ -9,18 +11,18 @@ import ( "github.com/jitsucom/jitsu/server/schema" "github.com/jitsucom/jitsu/server/utils" "go.uber.org/atomic" - "math/rand" ) //SyncStorage supports ProcessEvent synchronous operation type SyncStorage interface { Storage + //ProcessEvent process event in sync fashion. Return resulting object immediately ProcessEvent(eventContext *adapters.EventContext) (map[string]interface{}, error) //SuccessEvent writes metrics/counters/events cache, etc SuccessEvent(eventCtx *adapters.EventContext) //ErrorEvent writes metrics/counters/events cache, etc - ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error) + ErrorEvent(eventCtx *adapters.EventContext, err error) //SkipEvent writes metrics/counters/events cache, etc SkipEvent(eventCtx *adapters.EventContext, err error) } @@ -77,7 +79,7 @@ func (sw *SyncWorker) ProcessEvent(fact events.Event, tokenID string) []map[stri sw.syncStorage.SkipEvent(preliminaryEventContext, err) } else { logging.Errorf("[%s] Unable to process object %s: %v", sw.syncStorage.ID(), fact.DebugString(), err) - sw.syncStorage.ErrorEvent(true, preliminaryEventContext, err) + sw.syncStorage.ErrorEvent(preliminaryEventContext, err) } return nil @@ -118,7 +120,8 @@ func (sw *SyncWorker) ProcessEvent(fact events.Event, tokenID string) []map[stri } else { logging.Errorf("%+v\norigin event: %s", err, flattenObject.DebugString()) } - sw.syncStorage.ErrorEvent(true, eventContext, err) + + sw.syncStorage.ErrorEvent(eventContext, err) } else { results = append(results, result) sw.syncStorage.SuccessEvent(eventContext) From 2dd2654148f233c6838765de68442b1a43271292 Mon Sep 17 00:00:00 2001 From: Maksim Lebedev Date: Sun, 18 Sep 2022 15:15:44 +0400 Subject: [PATCH 3/4] Retire old files at batch uploading --- server/logevents/factory.go | 77 +++++++------------------ server/logfiles/uploader.go | 107 ++++++++++++++++++++++++----------- server/main.go | 2 +- server/storages/abstract.go | 16 +++--- server/storages/streaming.go | 6 +- 5 files changed, 108 insertions(+), 100 deletions(-) diff --git a/server/logevents/factory.go b/server/logevents/factory.go index 3a5f6df35..d2d7acafd 100644 --- a/server/logevents/factory.go +++ b/server/logevents/factory.go @@ -11,7 +11,7 @@ const ( ArchiveDir = "archive" FailedDir = "failed" IncomingDir = "incoming" - RetireDir = "retired" + RetiredDir = "retired" ) type Factory struct { @@ -25,8 +25,8 @@ type Factory struct { queryLogsWriter io.Writer } -func NewFactory(logEventPath string, logRotationMin int64, showInServer bool, ddlLogsWriter io.Writer, queryLogsWriter io.Writer, - asyncLoggers bool, asyncLoggerPoolSize int) *Factory { +func NewFactory(logEventPath string, logRotationMin int64, showInServer bool, + ddlLogsWriter io.Writer, queryLogsWriter io.Writer, asyncLoggers bool, asyncLoggerPoolSize int) *Factory { if asyncLoggers { var defaultValueMsg string if asyncLoggerPoolSize == 0 { @@ -71,74 +71,41 @@ func (f *Factory) NewFactoryWithQueryLogsWriter(overriddenQueryLogsWriter io.Wri } } -func (f *Factory) CreateIncomingLogger(tokenID string) logging.ObjectLogger { - eventLogWriter := logging.NewRollingWriter(&logging.Config{ - FileName: "incoming.tok=" + tokenID, - FileDir: path.Join(f.logEventPath, IncomingDir), - RotationMin: f.logRotationMin, - RotateOnClose: true, - }) - - if f.asyncLoggers { - return NewAsyncLogger(eventLogWriter, f.showInServer, f.asyncLoggerPoolSize) - } - return NewSyncLogger(eventLogWriter, f.showInServer) +func (f *Factory) CreateSQLQueryLogger(destinationName string) *logging.QueryLogger { + return logging.NewQueryLogger(destinationName, f.ddlLogsWriter, f.queryLogsWriter) } -func (f *Factory) CreateFailedLogger(destinationName string) logging.ObjectLogger { - failedEventWriter := logging.NewRollingWriter(&logging.Config{ - FileName: "failed.dst=" + destinationName, - FileDir: path.Join(f.logEventPath, FailedDir), - RotationMin: f.logRotationMin, - RotateOnClose: true, - }) - - if f.asyncLoggers { - return NewAsyncLogger(failedEventWriter, false, f.asyncLoggerPoolSize) - } - return NewSyncLogger(failedEventWriter, false) +func (f *Factory) CreateIncomingLogger(tokenID string) logging.ObjectLogger { + return f.createLogger(IncomingDir, "incoming.tok="+tokenID, f.showInServer) } -func (f *Factory) CreateSQLQueryLogger(destinationName string) *logging.QueryLogger { - return logging.NewQueryLogger(destinationName, f.ddlLogsWriter, f.queryLogsWriter) +func (f *Factory) CreateFailedLogger(destinationName string) logging.ObjectLogger { + return f.createLogger(FailedDir, "failed.dst="+destinationName, false) } -func (f *Factory) CreateRetireLogger(destinationName string) logging.ObjectLogger { - retireWriter := logging.NewRollingWriter(&logging.Config{ - FileName: "retire.dst=" + destinationName, - FileDir: path.Join(f.logEventPath, RetireDir), - RotationMin: f.logRotationMin, - RotateOnClose: true, - }) - if f.asyncLoggers { - return NewAsyncLogger(retireWriter, false, f.asyncLoggerPoolSize) - } - return NewSyncLogger(retireWriter, false) +func (f *Factory) CreateRetiredLogger(destinationName string) logging.ObjectLogger { + return f.createLogger(RetiredDir, "retired.dst="+destinationName, false) } func (f *Factory) CreateStreamingArchiveLogger(destinationName string) logging.ObjectLogger { - archiveWriter := logging.NewRollingWriter(&logging.Config{ - FileName: "streaming-archive.dst=" + destinationName, - FileDir: path.Join(f.logEventPath, ArchiveDir), - RotationMin: f.logRotationMin, - RotateOnClose: true, - }) - if f.asyncLoggers { - return NewAsyncLogger(archiveWriter, false, f.asyncLoggerPoolSize) - } - return NewSyncLogger(archiveWriter, false) + return f.createLogger(ArchiveDir, "streaming-archive.dst="+destinationName, false) } func (f *Factory) CreateWriteAheadLogger() logging.ObjectLogger { - walWriter := logging.NewRollingWriter(&logging.Config{ - FileName: "write-ahead-log", - FileDir: path.Join(f.logEventPath, IncomingDir), + return f.createLogger(IncomingDir, "write-ahead-log", false) +} + +func (f *Factory) createLogger(subDir, fileName string, showInGlobalLogger bool) logging.ObjectLogger { + logWriter := logging.NewRollingWriter(&logging.Config{ + FileName: fileName, + FileDir: path.Join(f.logEventPath, subDir), RotationMin: f.logRotationMin, RotateOnClose: true, }) if f.asyncLoggers { - return NewAsyncLogger(walWriter, false, f.asyncLoggerPoolSize) + return NewAsyncLogger(logWriter, showInGlobalLogger, f.asyncLoggerPoolSize) } - return NewSyncLogger(walWriter, false) + + return NewSyncLogger(logWriter, showInGlobalLogger) } diff --git a/server/logfiles/uploader.go b/server/logfiles/uploader.go index 5ae78c1dd..9135830b5 100644 --- a/server/logfiles/uploader.go +++ b/server/logfiles/uploader.go @@ -1,6 +1,7 @@ package logfiles import ( + "fmt" "io/ioutil" "os" "path" @@ -33,16 +34,19 @@ type PeriodicUploader struct { logIncomingEventPath string fileMask string uploadEvery time.Duration + errorRetryPeriod int archiver *Archiver + retirer *Archiver statusManager *StatusManager destinationService *destinations.Service } // NewUploader returns new configured PeriodicUploader instance -func NewUploader(logEventPath, fileMask string, uploadEveryMin int, destinationService *destinations.Service) (*PeriodicUploader, error) { - logIncomingEventPath := path.Join(logEventPath, logevents.IncomingDir) +func NewUploader(logEventPath, fileMask string, uploadEveryMin, errorRetryPeriod int, destinationService *destinations.Service) (*PeriodicUploader, error) { logArchiveEventPath := path.Join(logEventPath, logevents.ArchiveDir) + logIncomingEventPath := path.Join(logEventPath, logevents.IncomingDir) + logRetiredEventPath := path.Join(logEventPath, logevents.RetiredDir) statusManager, err := NewStatusManager(logIncomingEventPath) if err != nil { return nil, err @@ -51,7 +55,9 @@ func NewUploader(logEventPath, fileMask string, uploadEveryMin int, destinationS logIncomingEventPath: logIncomingEventPath, fileMask: path.Join(logIncomingEventPath, fileMask), uploadEvery: time.Duration(uploadEveryMin) * time.Minute, + errorRetryPeriod: errorRetryPeriod, archiver: NewArchiver(logIncomingEventPath, logArchiveEventPath), + retirer: NewArchiver(logIncomingEventPath, logRetiredEventPath), statusManager: statusManager, destinationService: destinationService, }, nil @@ -82,14 +88,9 @@ func (u *PeriodicUploader) Start() { for _, filePath := range files { fileName := filepath.Base(filePath) - regexResult := DateExtractRegexp.FindStringSubmatch(fileName) - if len(regexResult) != 2 { - logging.SystemErrorf("Error processing file %s. Malformed name", filePath) - continue - } - fileDate, err := time.Parse("2006-01-02T15-04-05", regexResult[1]) + fileDate, err := parseFileTime(fileName) if err != nil { - logging.SystemErrorf("Error processing file %s. Cant parse file date: %s", filePath, fileDate) + logging.SystemErrorf("Error processing file [%s]: %v", fileName, err) continue } @@ -99,13 +100,12 @@ func (u *PeriodicUploader) Start() { } //get token from filename - regexResult = logging.TokenIDExtractRegexp.FindStringSubmatch(fileName) - if len(regexResult) != 2 { - logging.SystemErrorf("Error processing file %s. Malformed name", filePath) + tokenID, err := parseFileToken(fileName) + if err != nil { + logging.SystemErrorf("Error processing file [%s]: %v", err) continue } - tokenID := regexResult[1] storageProxies := u.destinationService.GetBatchStorages(tokenID) if len(storageProxies) == 0 { logging.Warnf("Destination storages weren't found for file [%s] and token [%s]", filePath, tokenID) @@ -159,8 +159,7 @@ func (u *PeriodicUploader) Start() { resultPerTable, failedEvents, skippedEvents, err := storage.Store(fileName, objects, alreadyUploadedTables, needCopyEvent) if !skippedEvents.IsEmpty() { - metrics.SkipTokenEvents(tokenID, storage.Type(), storage.ID(), len(skippedEvents.Events)) - counters.SkipPushDestinationEvents(storage.ID(), int64(len(skippedEvents.Events))) + u.skipEvent(tokenID, storage.Type(), storage.ID(), len(skippedEvents.Events)) } if err != nil { @@ -173,12 +172,7 @@ func (u *PeriodicUploader) Start() { eventsSrc[events.ExtractSrc(obj)]++ } - errRowsCount := len(objects) - metrics.ErrorTokenEvents(tokenID, storage.Type(), storage.ID(), errRowsCount) - counters.ErrorPushDestinationEvents(storage.ID(), int64(errRowsCount)) - - telemetry.PushedErrorsPerSrc(tokenID, storage.ID(), eventsSrc) - + u.errorEvent(tokenID, storage.Type(), storage.ID(), len(objects), eventsSrc) continue } @@ -193,24 +187,19 @@ func (u *PeriodicUploader) Start() { }) } storage.Fallback(parsingFailedEvents...) - telemetry.PushedErrorsPerSrc(tokenID, storage.ID(), map[string]int{parsingErrSrc: len(parsingErrors)}) + u.errorEvent(tokenID, storage.Type(), storage.ID(), len(parsingErrors), map[string]int{parsingErrSrc: len(parsingErrors)}) } //events that are failed to be processed if !failedEvents.IsEmpty() { storage.Fallback(failedEvents.Events...) - metrics.ErrorTokenEvents(tokenID, storage.Type(), storage.ID(), len(failedEvents.Events)) - counters.ErrorPushDestinationEvents(storage.ID(), int64(len(failedEvents.Events))) - telemetry.PushedErrorsPerSrc(tokenID, storage.ID(), failedEvents.Src) + u.errorEvent(tokenID, storage.Type(), storage.ID(), len(failedEvents.Events), failedEvents.Src) } for tableName, result := range resultPerTable { if result.Err != nil { archiveFile = false logging.Errorf("[%s] Error storing table %s from file %s: %v", storage.ID(), tableName, filePath, result.Err) - metrics.ErrorTokenEvents(tokenID, storage.Type(), storage.ID(), result.RowsCount) - counters.ErrorPushDestinationEvents(storage.ID(), int64(result.RowsCount)) - - telemetry.PushedErrorsPerSrc(tokenID, storage.ID(), result.EventsSrc) + u.errorEvent(tokenID, storage.Type(), storage.ID(), result.RowsCount, result.EventsSrc) } else { pHandles := storageProxy.GetPostHandleDestinations() if pHandles != nil && result.RowsCount > 0 { @@ -223,10 +212,8 @@ func (u *PeriodicUploader) Start() { mp[storage.ID()] = true } } - metrics.SuccessTokenEvents(tokenID, storage.Type(), storage.ID(), result.RowsCount) - counters.SuccessPushDestinationEvents(storage.ID(), int64(result.RowsCount)) - telemetry.PushedEventsPerSrc(tokenID, storage.ID(), result.EventsSrc) + u.successEvent(tokenID, storage.Type(), storage.ID(), result.RowsCount, result.EventsSrc) } u.statusManager.UpdateStatus(fileName, storage.ID(), tableName, result.Err) @@ -240,15 +227,46 @@ func (u *PeriodicUploader) Start() { } else { u.statusManager.CleanUp(fileName) } + } else { + // If file exist more than errorRetryPeriod hours it should be retired + if timestamp.Now().Sub(fileDate) > time.Duration(u.errorRetryPeriod)*time.Hour { + logging.Infof("Retired file [%s]. File is more than %d hours old: %s", filePath, u.errorRetryPeriod, fileDate) + err := u.retirer.Archive(fileName) + if err != nil { + logging.SystemErrorf("Error retiring [%s] file: %v", filePath, err) + } else { + u.statusManager.CleanUp(fileName) + } + } } } + u.postHandle(startTime, timestamp.Now(), postHandlesMap) time.Sleep(u.uploadEvery - time.Since(startTime)) - } }) } +// errorEvent writes metrics/counters/events +func (u *PeriodicUploader) errorEvent(tokenID, storageType, storageID string, errorsCount int, result map[string]int) { + metrics.ErrorTokenEvents(tokenID, storageType, storageID, errorsCount) + counters.ErrorPushDestinationEvents(storageID, int64(errorsCount)) + telemetry.PushedErrorsPerSrc(tokenID, storageID, result) +} + +// skipEvent writes metrics/counters/events +func (u *PeriodicUploader) skipEvent(tokenID, storageType, storageID string, skipCount int) { + metrics.SkipTokenEvents(tokenID, storageType, storageID, skipCount) + counters.SkipPushDestinationEvents(storageID, int64(skipCount)) +} + +// successEvent writes metrics/counters/events +func (u *PeriodicUploader) successEvent(tokenID, storageType, storageID string, rowsCount int, result map[string]int) { + metrics.SuccessTokenEvents(tokenID, storageType, storageID, rowsCount) + counters.SuccessPushDestinationEvents(storageID, int64(rowsCount)) + telemetry.PushedEventsPerSrc(tokenID, storageID, result) +} + func (u *PeriodicUploader) postHandle(start, end time.Time, postHandlesMap map[string]map[string]bool) { for phID, destsMap := range postHandlesMap { dests := make([]string, 0, len(destsMap)) @@ -268,5 +286,28 @@ func (u *PeriodicUploader) postHandle(start, end time.Time, postHandlesMap map[s } logging.Infof("Successful run of %v triggered postHandle destination: %s", dests, phID) } +} + +func parseFileTime(fileName string) (time.Time, error) { + regexResult := DateExtractRegexp.FindStringSubmatch(fileName) + if len(regexResult) != 2 { + return time.Time{}, fmt.Errorf("Malformed name") + } + + fileDate, err := time.Parse("2006-01-02T15-04-05", regexResult[1]) + if err != nil { + return time.Time{}, fmt.Errorf("Cannot parse file date: %s", regexResult[1]) + } + + return fileDate, nil +} + +func parseFileToken(fileName string) (string, error) { + regexResult := logging.TokenIDExtractRegexp.FindStringSubmatch(fileName) + if len(regexResult) != 2 { + return "", fmt.Errorf("Malformed name") + } + token := regexResult[1] + return token, nil } diff --git a/server/main.go b/server/main.go index 35319340c..1c8db1f47 100644 --- a/server/main.go +++ b/server/main.go @@ -457,7 +457,7 @@ func main() { //for now use the same interval as for log rotation uploaderRunInterval := viper.GetInt("log.rotation_min") //Uploader must read event logger directory - uploader, err := logfiles.NewUploader(logEventPath, uploaderFileMask, uploaderRunInterval, destinationsService) + uploader, err := logfiles.NewUploader(logEventPath, uploaderFileMask, uploaderRunInterval, appconfig.Instance.ErrorRetryPeriod, destinationsService) if err != nil { logging.Fatal("Error while creating file uploader", err) } diff --git a/server/storages/abstract.go b/server/storages/abstract.go index 7d943f350..ff7a09df1 100644 --- a/server/storages/abstract.go +++ b/server/storages/abstract.go @@ -43,7 +43,7 @@ type Abstract struct { archiveLogger logging.ObjectLogger fallbackLogger logging.ObjectLogger - retireLogger logging.ObjectLogger + retiredLogger logging.ObjectLogger } //ID returns destination ID @@ -120,9 +120,9 @@ func (a *Abstract) Fallback(failedEvents ...*events.FailedEvent) { } } -// Retire logs event with error to retire logger -func (a *Abstract) RetireEvent(failedEvent *events.Event) { - a.retireLogger.ConsumeAny(failedEvent) +// RetiredEvent logs event with error to retired logger +func (a *Abstract) RetiredEvent(retiredEvent *adapters.EventContext) { + a.retiredLogger.ConsumeAny(retiredEvent.RawEvent) } //Insert ensures table and sends input event to Destination (with 1 retry if error) @@ -291,9 +291,9 @@ func (a *Abstract) close() (multiErr error) { multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing streaming worker: %v", a.ID(), err)) } } - if a.retireLogger != nil { - if err := a.retireLogger.Close(); err != nil { - multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing retire logger: %v", a.ID(), err)) + if a.retiredLogger != nil { + if err := a.retiredLogger.Close(); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing retired logger: %v", a.ID(), err)) } } if a.fallbackLogger != nil { @@ -338,7 +338,7 @@ func (a *Abstract) Init(config *Config, impl Storage, preinstalledJavaScript str func (a *Abstract) Start(config *Config) error { a.archiveLogger = config.loggerFactory.CreateStreamingArchiveLogger(config.destinationID) a.fallbackLogger = config.loggerFactory.CreateFailedLogger(config.destinationID) - a.retireLogger = config.loggerFactory.CreateRetireLogger(config.destinationID) + a.retiredLogger = config.loggerFactory.CreateRetiredLogger(config.destinationID) if a.streamingWorker != nil { a.streamingWorker.start() diff --git a/server/storages/streaming.go b/server/storages/streaming.go index ba9a991a1..b407aecd2 100644 --- a/server/storages/streaming.go +++ b/server/storages/streaming.go @@ -31,8 +31,8 @@ type StreamingStorage interface { ErrorEvent(eventCtx *adapters.EventContext, err error) //SkipEvent writes metrics/counters/events cache, etc SkipEvent(eventCtx *adapters.EventContext, err error) - //RetireEvent writes metrics/counters/events cache, etc - RetireEvent(eventContext *events.Event) + //RetiredEvent writes metrics/counters/events cache, etc + RetiredEvent(eventCtx *adapters.EventContext) } //StreamingWorker reads events from queue and using events.StreamingStorage writes them @@ -175,7 +175,7 @@ func (sw *StreamingWorker) start() { if retryTime.Before(finishTime) { sw.eventQueue.ConsumeTimed(fact, retryTime, finishTime, tokenID) } else { - sw.streamingStorage.RetireEvent(&fact) + sw.streamingStorage.RetiredEvent(eventContext) } } } From 32f8af4dc1832ad0b67d558ea9706f889514c29a Mon Sep 17 00:00:00 2001 From: Maksim Lebedev Date: Sun, 18 Sep 2022 18:21:40 +0400 Subject: [PATCH 4/4] Documentation --- .../configuration/error-handling.mdx | 38 +++++++++++++++++++ server/appconfig/config.template.yaml | 13 ++++--- server/logfiles/uploader.go | 2 +- 3 files changed, 46 insertions(+), 7 deletions(-) create mode 100644 documentation/configuration/error-handling.mdx diff --git a/documentation/configuration/error-handling.mdx b/documentation/configuration/error-handling.mdx new file mode 100644 index 000000000..c267597fe --- /dev/null +++ b/documentation/configuration/error-handling.mdx @@ -0,0 +1,38 @@ +# Error Handling and Retries + +**Jitsu** supports sending events and re-sending them at failures. + +## Configuration + +```yaml +server: + ... + streaming_retry_delay_minutes: 1 # Optional. Default value is 1 + error_retry_period_hours: 24 # Optional. Default value is 24 +log: + ... + rotation_min: 5 # Optional. Default value is 5 +... +destinations: + ... + destination_id: + ... + data_layout: + ... + # Positive value overrides server option for destination + error_retry_period_hours: 0 # Optional. Default value is 0 +``` + +## Batch mode + +**Jitsu** receives a lot of events. All of them are saved into a file system. +Every `log.rotation_min` minutes **Jitsu** parses saved files with events and tries to send them to destinations. +If an event was sent correctly it will be archived into a special folder with successful events. Otherwise, **Jitsu** tries to resend it at the next uploading iteration. +If an event cannot be sent for any reason during `server.error_retry_period_hours` hours **Jitsu** stops resending an event and archives it into a special folder with unsuccessful events. + +## Streaming mode + +**Jitsu** receives an event and tries to send it to destinations. +If an event was sent correctly it will be archived into a special folder with successful events. Otherwise, **Jitsu** tries to resend it at the next uploading iteration that happens every `server.streaming_retry_delay_minutes` minutes. +If an event cannot be sent for any reason during `server.error_retry_period_hours` hours **Jitsu** stops resending an event and archives it into a special folder with unsuccessful events. +Any destination supports local `destinations.destination_id.data_layout.error_retry_period_hours` option that overrides global `server.error_retry_period_hours` option. diff --git a/server/appconfig/config.template.yaml b/server/appconfig/config.template.yaml index d972d3a08..f572c4589 100644 --- a/server/appconfig/config.template.yaml +++ b/server/appconfig/config.template.yaml @@ -32,9 +32,10 @@ server: ### Sources reloading. If 'sources' key is http or file:/// URL than it will be reloaded every sources_reload_sec #sources_reload_sec: 1 #Optional. Default value is 1. - # Interval to retry failed event. Optional. Default value is 1 + # Error handling and retries https://jitsu.com/docs/configuration/error-handling + # Interval to retry failed events. Optional. Default value is 1 streaming_retry_delay_minutes: 1 - # Clear failed event after retry period. Optional. Default value is 24 + # Stop resending failed events after retry period. Optional. Default value is 24 error_retry_period_hours: 24 ### Application metrics @@ -131,7 +132,7 @@ destinations: # type: varchar(256) #SQL type # column_type: varchar(256) encode zstd # max_columns: 100 # Optional. The limit of the count of columns. -# error_retry_period_hours: 24 # Optional. Clear failed event after retry period. +# error_retry_period_hours: 0 # Optional. Default value is 0. Positive value overrides server option # table_name_template: '{{.event_type}}_{{._timestamp.Format "2006_01"}}' #Optional. Default value constant is 'events'. Template for extracting table name # ### BigQuery https://jitsu.com/docs/destinations-configuration/bigquery @@ -144,7 +145,7 @@ destinations: # key_file: /home/eventnative/data/config/bqkey.json # or json string of key e.g. "{"service_account":...}" # data_layout: # max_columns: 100 # Optional. The limit of the count of columns. -# error_retry_period_hours: 24 # Optional. Clear failed event after retry period. +# error_retry_period_hours: 0 # Optional. Default value is 0. Positive value overrides server option # table_name_template: 'my_events' #Optional. Default value constant is 'events'. Template for extracting table name ### Postgres https://jitsu.com/docs/destinations-configuration/postgres @@ -163,7 +164,7 @@ destinations: # connect_timeout: 300 # data_layout: # max_columns: 100 # Optional. The limit of the count of columns. -# error_retry_period_hours: 24 # Optional. Clear failed event after retry period. +# error_retry_period_hours: 0 # Optional. Default value is 0. Positive value overrides server option # table_name_template: 'my_events' #Optional. Default value constant is 'events'. Template for extracting table name # #Required for Postgres users recognition feature. # primary_key_fields: @@ -215,7 +216,7 @@ destinations: # endpoint: #Optional. Default value is AWS s3 endpoint. If you use DigitalOcean spaces or others - specify your endpoint # data_layout: # max_columns: 100 # Optional. The limit of the count of columns. -# error_retry_period_hours: 24 # Optional. Clear failed event after retry period. +# error_retry_period_hours: 0 # Optional. Default value is 0. Positive value overrides server option # table_name_template: '{{.event_type}}_{{._timestamp.Format "2006_01"}}' #Template will be used for file naming ### Snowflake https://jitsu.com/docs/destinations-configuration/snowflake diff --git a/server/logfiles/uploader.go b/server/logfiles/uploader.go index 9135830b5..8a8234b6d 100644 --- a/server/logfiles/uploader.go +++ b/server/logfiles/uploader.go @@ -102,7 +102,7 @@ func (u *PeriodicUploader) Start() { //get token from filename tokenID, err := parseFileToken(fileName) if err != nil { - logging.SystemErrorf("Error processing file [%s]: %v", err) + logging.SystemErrorf("Error processing file [%s]: %v", fileName, err) continue }