Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving retry logic #964

Open
wants to merge 5 commits into
base: beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 38 additions & 0 deletions documentation/configuration/error-handling.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Error Handling and Retries

**Jitsu** supports sending events and re-sending them at failures.

## Configuration

```yaml
server:
...
streaming_retry_delay_minutes: 1 # Optional. Default value is 1
error_retry_period_hours: 24 # Optional. Default value is 24
log:
...
rotation_min: 5 # Optional. Default value is 5
...
destinations:
...
destination_id:
...
data_layout:
...
# Positive value overrides server option for destination
error_retry_period_hours: 0 # Optional. Default value is 0
```

## Batch mode

**Jitsu** receives a lot of events. All of them are saved into a file system.
Every `log.rotation_min` minutes **Jitsu** parses saved files with events and tries to send them to destinations.
If an event was sent correctly it will be archived into a special folder with successful events. Otherwise, **Jitsu** tries to resend it at the next uploading iteration.
If an event cannot be sent for any reason during `server.error_retry_period_hours` hours **Jitsu** stops resending an event and archives it into a special folder with unsuccessful events.

## Streaming mode

**Jitsu** receives an event and tries to send it to destinations.
If an event was sent correctly it will be archived into a special folder with successful events. Otherwise, **Jitsu** tries to resend it at the next uploading iteration that happens every `server.streaming_retry_delay_minutes` minutes.
If an event cannot be sent for any reason during `server.error_retry_period_hours` hours **Jitsu** stops resending an event and archives it into a special folder with unsuccessful events.
Any destination supports local `destinations.destination_id.data_layout.error_retry_period_hours` option that overrides global `server.error_retry_period_hours` option.
23 changes: 11 additions & 12 deletions server/adapters/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"time"

"github.com/jitsucom/jitsu/server/events"
"github.com/jitsucom/jitsu/server/logging"
"github.com/jitsucom/jitsu/server/queue"
"github.com/jitsucom/jitsu/server/safego"
"github.com/jitsucom/jitsu/server/timestamp"
"github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
"io/ioutil"
"math"
"net/http"
"time"
)

//HTTPAdapterConfiguration is a dto for creating HTTPAdapter
Expand All @@ -26,7 +27,7 @@ type HTTPAdapterConfiguration struct {
QueueFactory *events.QueueFactory
PoolWorkers int
DebugLogger *logging.QueryLogger
ErrorHandler func(fallback bool, eventContext *EventContext, err error)
ErrorHandler func(eventContext *EventContext, err error)
SuccessHandler func(eventContext *EventContext)
}

Expand All @@ -51,7 +52,7 @@ type HTTPAdapter struct {
debugLogger *logging.QueryLogger
httpReqFactory HTTPRequestFactory

errorHandler func(fallback bool, eventContext *EventContext, err error)
errorHandler func(eventContext *EventContext, err error)
successHandler func(eventContext *EventContext)

destinationID string
Expand Down Expand Up @@ -122,7 +123,7 @@ func (h *HTTPAdapter) startObserver() {
if timestamp.Now().UTC().Before(retryableRequest.DequeuedTime) {
if err := h.queue.AddRequest(retryableRequest); err != nil {
logging.SystemErrorf("[%s] Error enqueueing HTTP request after dequeuing: %v", h.destinationID, err)
h.errorHandler(true, retryableRequest.EventContext, err)
h.errorHandler(retryableRequest.EventContext, err)
}

continue
Expand All @@ -134,7 +135,7 @@ func (h *HTTPAdapter) startObserver() {

if err := h.queue.AddRequest(retryableRequest); err != nil {
logging.SystemErrorf("[%s] Error enqueueing HTTP request after invoking: %v", h.destinationID, err)
h.errorHandler(true, retryableRequest.EventContext, err)
h.errorHandler(retryableRequest.EventContext, err)
}
}
} else {
Expand Down Expand Up @@ -196,9 +197,7 @@ func (h *HTTPAdapter) doRetry(retryableRequest *RetryableRequest, sendErr error)
retryableRequest.DequeuedTime = timestamp.Now().UTC().Add(delay)
if err := h.queue.AddRequest(retryableRequest); err != nil {
logging.SystemErrorf("[%s] Error enqueueing HTTP request after sending: %v", h.destinationID, err)
h.errorHandler(true, retryableRequest.EventContext, sendErr)
} else {
h.errorHandler(false, retryableRequest.EventContext, sendErr)
h.errorHandler(retryableRequest.EventContext, sendErr)
}
return
}
Expand All @@ -207,7 +206,7 @@ func (h *HTTPAdapter) doRetry(retryableRequest *RetryableRequest, sendErr error)
headersJSON, _ := json.Marshal(retryableRequest.Request.Headers)
logging.Errorf("[%s] Error sending HTTP request URL: [%s] Method: [%s] Body: [%s] Headers: [%s]: %v", h.destinationID, retryableRequest.Request.URL, retryableRequest.Request.Method, string(retryableRequest.Request.Body), headersJSON, sendErr)

h.errorHandler(true, retryableRequest.EventContext, sendErr)
h.errorHandler(retryableRequest.EventContext, sendErr)
}

func (h *HTTPAdapter) doRequest(req *Request) error {
Expand Down
17 changes: 17 additions & 0 deletions server/appconfig/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type AppConfig struct {

EmptyGIFPixelOnexOne []byte

StreamingRetryDelay int
ErrorRetryPeriod int

UaResolver useragent.Resolver
AuthorizationService *authorization.Service

Expand Down Expand Up @@ -93,6 +96,8 @@ func setDefaultParams(containerized bool) {
viper.SetDefault("server.max_columns", 100)
viper.SetDefault("server.max_event_size", 51200)
viper.SetDefault("server.configurator_urn", "/configurator")
viper.SetDefault("server.streaming_retry_delay_minutes", 1)
viper.SetDefault("server.error_retry_period_hours", 24)
//unique IDs
viper.SetDefault("server.fields_configuration.unique_id_field", "/eventn_ctx/event_id||/eventn_ctx_event_id||/event_id")
viper.SetDefault("server.fields_configuration.user_agent_path", "/eventn_ctx/user_agent||/user_agent")
Expand Down Expand Up @@ -406,6 +411,18 @@ func Init(containerized bool, dockerHubID string) error {
enrichWithHTTPContext := viper.GetBool("server.event_enrichment.http_context")
appConfig.EnrichWithHTTPContext = enrichWithHTTPContext

appConfig.StreamingRetryDelay = viper.GetInt("server.streaming_retry_delay_minutes")
if appConfig.StreamingRetryDelay < 0 {
return fmt.Errorf("server.streaming_retry_delay_minutes expects to be positive value")
}
logging.Infof("server.streaming_retry_delay_minutes: %d", appConfig.StreamingRetryDelay)

appConfig.ErrorRetryPeriod = viper.GetInt("server.error_retry_period_hours")
if appConfig.ErrorRetryPeriod < 0 {
return fmt.Errorf("server.error_retry_period_hours expects to be positive value")
}
logging.Infof("server.error_retry_period_hours: %d", appConfig.ErrorRetryPeriod)

Instance = &appConfig
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions server/appconfig/config.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ server:
### Sources reloading. If 'sources' key is http or file:/// URL than it will be reloaded every sources_reload_sec
#sources_reload_sec: 1 #Optional. Default value is 1.

# Error handling and retries https://jitsu.com/docs/configuration/error-handling
# Interval to retry failed events. Optional. Default value is 1
streaming_retry_delay_minutes: 1
# Stop resending failed events after retry period. Optional. Default value is 24
error_retry_period_hours: 24

### Application metrics
### At present only Prometheus is supported. Read more about application metrics https://jitsu.com/docs/other-features/application-metrics
# metrics:
Expand Down Expand Up @@ -126,6 +132,7 @@ destinations:
# type: varchar(256) #SQL type
# column_type: varchar(256) encode zstd
# max_columns: 100 # Optional. The limit of the count of columns.
# error_retry_period_hours: 0 # Optional. Default value is 0. Positive value overrides server option
# table_name_template: '{{.event_type}}_{{._timestamp.Format "2006_01"}}' #Optional. Default value constant is 'events'. Template for extracting table name
#
### BigQuery https://jitsu.com/docs/destinations-configuration/bigquery
Expand All @@ -138,6 +145,7 @@ destinations:
# key_file: /home/eventnative/data/config/bqkey.json # or json string of key e.g. "{"service_account":...}"
# data_layout:
# max_columns: 100 # Optional. The limit of the count of columns.
# error_retry_period_hours: 0 # Optional. Default value is 0. Positive value overrides server option
# table_name_template: 'my_events' #Optional. Default value constant is 'events'. Template for extracting table name

### Postgres https://jitsu.com/docs/destinations-configuration/postgres
Expand All @@ -156,6 +164,7 @@ destinations:
# connect_timeout: 300
# data_layout:
# max_columns: 100 # Optional. The limit of the count of columns.
# error_retry_period_hours: 0 # Optional. Default value is 0. Positive value overrides server option
# table_name_template: 'my_events' #Optional. Default value constant is 'events'. Template for extracting table name
# #Required for Postgres users recognition feature.
# primary_key_fields:
Expand Down Expand Up @@ -207,6 +216,7 @@ destinations:
# endpoint: #Optional. Default value is AWS s3 endpoint. If you use DigitalOcean spaces or others - specify your endpoint
# data_layout:
# max_columns: 100 # Optional. The limit of the count of columns.
# error_retry_period_hours: 0 # Optional. Default value is 0. Positive value overrides server option
# table_name_template: '{{.event_type}}_{{._timestamp.Format "2006_01"}}' #Template will be used for file naming

### Snowflake https://jitsu.com/docs/destinations-configuration/snowflake
Expand Down
1 change: 1 addition & 0 deletions server/config/destination_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type DataLayout struct {
//Deprecated
Mappings *Mapping `mapstructure:"mappings" json:"mappings,omitempty" yaml:"mappings,omitempty"`
MaxColumns int `mapstructure:"max_columns" json:"max_columns,omitempty" yaml:"max_columns,omitempty"`
ErrorRetryPeriod int `mapstructure:"error_retry_period_hours" json:"error_retry_period_hours,omitempty" yaml:"error_retry_period_hours,omitempty"`
TableNameTemplate string `mapstructure:"table_name_template" json:"table_name_template,omitempty" yaml:"table_name_template,omitempty"`
PrimaryKeyFields []string `mapstructure:"primary_key_fields" json:"primary_key_fields,omitempty" yaml:"primary_key_fields,omitempty"`
UniqueIDField string `mapstructure:"unique_id_field" json:"unique_id_field,omitempty" yaml:"unique_id_field,omitempty"`
Expand Down
42 changes: 27 additions & 15 deletions server/events/native_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ type NativeQueue struct {
identifier string
queue queue.Queue

streamingRetryDelay int
errorRetryPeriod int

metricsReporter internal.MetricReporter
closed chan struct{}
}

func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue queue.Queue) (Queue, error) {
func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue queue.Queue, streamingRetryDelay int, errorRetryPeriod int) (Queue, error) {
var metricsReporter internal.MetricReporter
if underlyingQueue.Type() == queue.RedisType {
metricsReporter = &internal.SharedQueueMetricReporter{}
Expand All @@ -42,12 +45,14 @@ func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue que
metricsReporter.SetMetrics(subsystem, identifier, int(underlyingQueue.Size()), int(underlyingQueue.BufferSize()))

nq := &NativeQueue{
queue: underlyingQueue,
namespace: namespace,
subsystem: subsystem,
identifier: identifier,
metricsReporter: metricsReporter,
closed: make(chan struct{}, 1),
queue: underlyingQueue,
namespace: namespace,
subsystem: subsystem,
identifier: identifier,
streamingRetryDelay: streamingRetryDelay,
errorRetryPeriod: errorRetryPeriod,
metricsReporter: metricsReporter,
closed: make(chan struct{}, 1),
}

safego.Run(nq.startMonitor)
Expand All @@ -71,13 +76,16 @@ func (q *NativeQueue) startMonitor() {
}

func (q *NativeQueue) Consume(f map[string]interface{}, tokenID string) {
q.ConsumeTimed(f, timestamp.Now().UTC(), tokenID)
initial := timestamp.Now().UTC()
finish := initial.Add(time.Duration(q.errorRetryPeriod) * time.Hour)
q.ConsumeTimed(f, initial, finish, tokenID)
}

func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time, tokenID string) {
func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string) {
te := &TimedEvent{
Payload: payload,
DequeuedTime: t,
DequeuedTime: dequeuedTime,
FinishTime: finishTime,
TokenID: tokenID,
}

Expand All @@ -89,24 +97,28 @@ func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time,
q.metricsReporter.EnqueuedEvent(q.subsystem, q.identifier)
}

func (q *NativeQueue) DequeueBlock() (Event, time.Time, string, error) {
func (q *NativeQueue) DequeueBlock() (Event, time.Time, time.Time, string, error) {
ite, err := q.queue.Pop()
if err != nil {
if err == queue.ErrQueueClosed {
return nil, time.Time{}, "", ErrQueueClosed
return nil, time.Time{}, time.Time{}, "", ErrQueueClosed
}

return nil, time.Time{}, "", err
return nil, time.Time{}, time.Time{}, "", err
}

q.metricsReporter.DequeuedEvent(q.subsystem, q.identifier)

te, ok := ite.(*TimedEvent)
if !ok {
return nil, time.Time{}, "", fmt.Errorf("wrong type of event dto in queue. Expected: *TimedEvent, actual: %T (%s)", ite, ite)
return nil, time.Time{}, time.Time{}, "", fmt.Errorf("wrong type of event dto in queue. Expected: *TimedEvent, actual: %T (%s)", ite, ite)
}

return te.Payload, te.DequeuedTime, te.TokenID, nil
return te.Payload, te.DequeuedTime, te.FinishTime, te.TokenID, nil
}

func (q *NativeQueue) GetDelay() int {
return q.streamingRetryDelay
}

//Close closes underlying queue
Expand Down
16 changes: 9 additions & 7 deletions server/events/queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type TimedEvent struct {
Payload map[string]interface{}
DequeuedTime time.Time
FinishTime time.Time
TokenID string
}

Expand All @@ -27,19 +28,20 @@ func (d *DummyQueue) Close() error {
func (d *DummyQueue) Consume(f map[string]interface{}, tokenID string) {
}

func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string) {
func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string) {
}

func (d *DummyQueue) DequeueBlock() (Event, time.Time, string, error) {
return nil, time.Time{}, "", fmt.Errorf("DequeueBlock not supported on DummyQueue")
func (d *DummyQueue) DequeueBlock() (Event, time.Time, time.Time, string, error) {
return nil, time.Time{}, time.Time{}, "", fmt.Errorf("DequeueBlock not supported on DummyQueue")
}

//Queue is an events queue. Possible implementations (dque, leveldbqueue, native)
type Queue interface {
io.Closer
Consume(f map[string]interface{}, tokenID string)
ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string)
DequeueBlock() (Event, time.Time, string, error)
ConsumeTimed(f map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string)
DequeueBlock() (Event, time.Time, time.Time, string, error)
GetDelay() int
}

type QueueFactory struct {
Expand All @@ -51,7 +53,7 @@ func NewQueueFactory(redisPool *meta.RedisPool, redisReadTimeout time.Duration)
return &QueueFactory{redisPool: redisPool, redisReadTimeout: redisReadTimeout}
}

func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string) (Queue, error) {
func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string, streamingRetryDelay int, errorRetryPeriod int) (Queue, error) {
var underlyingQueue queue.Queue
if qf.redisPool != nil {
logging.Infof("[%s] initializing redis events queue", identifier)
Expand All @@ -60,7 +62,7 @@ func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string) (Queue,
logging.Infof("[%s] initializing inmemory events queue", identifier)
underlyingQueue = queue.NewInMemory(1_000_000)
}
return NewNativeQueue(queue.DestinationNamespace, subsystem, identifier, underlyingQueue)
return NewNativeQueue(queue.DestinationNamespace, subsystem, identifier, underlyingQueue, streamingRetryDelay, errorRetryPeriod)
}

func (qf *QueueFactory) CreateHTTPQueue(identifier string, serializationModelBuilder func() interface{}) queue.Queue {
Expand Down