Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving retry logic #964

Open
wants to merge 5 commits into
base: beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions server/appconfig/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type AppConfig struct {

EmptyGIFPixelOnexOne []byte

StreamingRetryDelay int
ErrorRetryPeriod int

UaResolver useragent.Resolver
AuthorizationService *authorization.Service

Expand Down Expand Up @@ -93,6 +96,8 @@ func setDefaultParams(containerized bool) {
viper.SetDefault("server.max_columns", 100)
viper.SetDefault("server.max_event_size", 51200)
viper.SetDefault("server.configurator_urn", "/configurator")
viper.SetDefault("server.streaming_retry_delay_minutes", 1)
viper.SetDefault("server.error_retry_period_hours", 24)
//unique IDs
viper.SetDefault("server.fields_configuration.unique_id_field", "/eventn_ctx/event_id||/eventn_ctx_event_id||/event_id")
viper.SetDefault("server.fields_configuration.user_agent_path", "/eventn_ctx/user_agent||/user_agent")
Expand Down Expand Up @@ -402,6 +407,18 @@ func Init(containerized bool, dockerHubID string) error {
enrichWithHTTPContext := viper.GetBool("server.event_enrichment.http_context")
appConfig.EnrichWithHTTPContext = enrichWithHTTPContext

appConfig.StreamingRetryDelay = viper.GetInt("server.streaming_retry_delay_minutes")
if appConfig.StreamingRetryDelay < 0 {
return fmt.Errorf("server.streaming_retry_delay_minutes expects to be positive value")
}
logging.Infof("server.streaming_retry_delay_minutes: %d", appConfig.StreamingRetryDelay)

appConfig.ErrorRetryPeriod = viper.GetInt("server.error_retry_period_hours")
if appConfig.ErrorRetryPeriod < 0 {
return fmt.Errorf("server.error_retry_period_hours expects to be positive value")
}
logging.Infof("server.error_retry_period_hours: %d", appConfig.ErrorRetryPeriod)

Instance = &appConfig
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions server/appconfig/config.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ server:
### Sources reloading. If 'sources' key is http or file:/// URL than it will be reloaded every sources_reload_sec
#sources_reload_sec: 1 #Optional. Default value is 1.

# Interval to retry failed event. Optional. Default value is 1
streaming_retry_delay_minutes: 1
# Clear failed event after retry period. Optional. Default value is 24
Alessar marked this conversation as resolved.
Show resolved Hide resolved
error_retry_period_hours: 24

### Application metrics
### At present only Prometheus is supported. Read more about application metrics https://jitsu.com/docs/other-features/application-metrics
# metrics:
Expand Down Expand Up @@ -126,6 +131,7 @@ destinations:
# type: varchar(256) #SQL type
# column_type: varchar(256) encode zstd
# max_columns: 100 # Optional. The limit of the count of columns.
# error_retry_period_hours: 24 # Optional. Clear failed event after retry period.
# table_name_template: '{{.event_type}}_{{._timestamp.Format "2006_01"}}' #Optional. Default value constant is 'events'. Template for extracting table name
#
### BigQuery https://jitsu.com/docs/destinations-configuration/bigquery
Expand All @@ -138,6 +144,7 @@ destinations:
# key_file: /home/eventnative/data/config/bqkey.json # or json string of key e.g. "{"service_account":...}"
# data_layout:
# max_columns: 100 # Optional. The limit of the count of columns.
# error_retry_period_hours: 24 # Optional. Clear failed event after retry period.
# table_name_template: 'my_events' #Optional. Default value constant is 'events'. Template for extracting table name

### Postgres https://jitsu.com/docs/destinations-configuration/postgres
Expand All @@ -156,6 +163,7 @@ destinations:
# connect_timeout: 300
# data_layout:
# max_columns: 100 # Optional. The limit of the count of columns.
# error_retry_period_hours: 24 # Optional. Clear failed event after retry period.
# table_name_template: 'my_events' #Optional. Default value constant is 'events'. Template for extracting table name
# #Required for Postgres users recognition feature.
# primary_key_fields:
Expand Down Expand Up @@ -207,6 +215,7 @@ destinations:
# endpoint: #Optional. Default value is AWS s3 endpoint. If you use DigitalOcean spaces or others - specify your endpoint
# data_layout:
# max_columns: 100 # Optional. The limit of the count of columns.
# error_retry_period_hours: 24 # Optional. Clear failed event after retry period.
# table_name_template: '{{.event_type}}_{{._timestamp.Format "2006_01"}}' #Template will be used for file naming

### Snowflake https://jitsu.com/docs/destinations-configuration/snowflake
Expand Down
1 change: 1 addition & 0 deletions server/config/destination_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type DataLayout struct {
//Deprecated
Mappings *Mapping `mapstructure:"mappings" json:"mappings,omitempty" yaml:"mappings,omitempty"`
MaxColumns int `mapstructure:"max_columns" json:"max_columns,omitempty" yaml:"max_columns,omitempty"`
ErrorRetryPeriod int `mapstructure:"error_retry_period_hours" json:"error_retry_period_hours,omitempty" yaml:"error_retry_period_hours,omitempty"`
TableNameTemplate string `mapstructure:"table_name_template" json:"table_name_template,omitempty" yaml:"table_name_template,omitempty"`
PrimaryKeyFields []string `mapstructure:"primary_key_fields" json:"primary_key_fields,omitempty" yaml:"primary_key_fields,omitempty"`
UniqueIDField string `mapstructure:"unique_id_field" json:"unique_id_field,omitempty" yaml:"unique_id_field,omitempty"`
Expand Down
42 changes: 27 additions & 15 deletions server/events/native_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ type NativeQueue struct {
identifier string
queue queue.Queue

streamingRetryDelay int
errorRetryPeriod int

metricsReporter internal.MetricReporter
closed chan struct{}
}

func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue queue.Queue) (Queue, error) {
func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue queue.Queue, streamingRetryDelay int, errorRetryPeriod int) (Queue, error) {
var metricsReporter internal.MetricReporter
if underlyingQueue.Type() == queue.RedisType {
metricsReporter = &internal.SharedQueueMetricReporter{}
Expand All @@ -42,12 +45,14 @@ func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue que
metricsReporter.SetMetrics(subsystem, identifier, int(underlyingQueue.Size()), int(underlyingQueue.BufferSize()))

nq := &NativeQueue{
queue: underlyingQueue,
namespace: namespace,
subsystem: subsystem,
identifier: identifier,
metricsReporter: metricsReporter,
closed: make(chan struct{}, 1),
queue: underlyingQueue,
namespace: namespace,
subsystem: subsystem,
identifier: identifier,
streamingRetryDelay: streamingRetryDelay,
errorRetryPeriod: errorRetryPeriod,
metricsReporter: metricsReporter,
closed: make(chan struct{}, 1),
}

safego.Run(nq.startMonitor)
Expand All @@ -71,13 +76,16 @@ func (q *NativeQueue) startMonitor() {
}

func (q *NativeQueue) Consume(f map[string]interface{}, tokenID string) {
q.ConsumeTimed(f, timestamp.Now().UTC(), tokenID)
initial := timestamp.Now().UTC()
finish := initial.Add(time.Duration(q.errorRetryPeriod) * time.Hour)
q.ConsumeTimed(f, initial, finish, tokenID)
}

func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time, tokenID string) {
func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string) {
te := &TimedEvent{
Payload: payload,
DequeuedTime: t,
DequeuedTime: dequeuedTime,
FinishTime: finishTime,
TokenID: tokenID,
}

Expand All @@ -89,24 +97,28 @@ func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time,
q.metricsReporter.EnqueuedEvent(q.subsystem, q.identifier)
}

func (q *NativeQueue) DequeueBlock() (Event, time.Time, string, error) {
func (q *NativeQueue) DequeueBlock() (Event, time.Time, time.Time, string, error) {
ite, err := q.queue.Pop()
if err != nil {
if err == queue.ErrQueueClosed {
return nil, time.Time{}, "", ErrQueueClosed
return nil, time.Time{}, time.Time{}, "", ErrQueueClosed
}

return nil, time.Time{}, "", err
return nil, time.Time{}, time.Time{}, "", err
}

q.metricsReporter.DequeuedEvent(q.subsystem, q.identifier)

te, ok := ite.(*TimedEvent)
if !ok {
return nil, time.Time{}, "", fmt.Errorf("wrong type of event dto in queue. Expected: *TimedEvent, actual: %T (%s)", ite, ite)
return nil, time.Time{}, time.Time{}, "", fmt.Errorf("wrong type of event dto in queue. Expected: *TimedEvent, actual: %T (%s)", ite, ite)
}

return te.Payload, te.DequeuedTime, te.TokenID, nil
return te.Payload, te.DequeuedTime, te.FinishTime, te.TokenID, nil
}

func (q *NativeQueue) GetDelay() int {
return q.streamingRetryDelay
}

//Close closes underlying queue
Expand Down
16 changes: 9 additions & 7 deletions server/events/queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type TimedEvent struct {
Payload map[string]interface{}
DequeuedTime time.Time
FinishTime time.Time
TokenID string
}

Expand All @@ -27,19 +28,20 @@ func (d *DummyQueue) Close() error {
func (d *DummyQueue) Consume(f map[string]interface{}, tokenID string) {
}

func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string) {
func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string) {
}

func (d *DummyQueue) DequeueBlock() (Event, time.Time, string, error) {
return nil, time.Time{}, "", fmt.Errorf("DequeueBlock not supported on DummyQueue")
func (d *DummyQueue) DequeueBlock() (Event, time.Time, time.Time, string, error) {
return nil, time.Time{}, time.Time{}, "", fmt.Errorf("DequeueBlock not supported on DummyQueue")
}

//Queue is an events queue. Possible implementations (dque, leveldbqueue, native)
type Queue interface {
io.Closer
Consume(f map[string]interface{}, tokenID string)
ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string)
DequeueBlock() (Event, time.Time, string, error)
ConsumeTimed(f map[string]interface{}, dequeuedTime, finishTime time.Time, tokenID string)
DequeueBlock() (Event, time.Time, time.Time, string, error)
GetDelay() int
}

type QueueFactory struct {
Expand All @@ -51,7 +53,7 @@ func NewQueueFactory(redisPool *meta.RedisPool, redisReadTimeout time.Duration)
return &QueueFactory{redisPool: redisPool, redisReadTimeout: redisReadTimeout}
}

func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string) (Queue, error) {
func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string, streamingRetryDelay int, errorRetryPeriod int) (Queue, error) {
var underlyingQueue queue.Queue
if qf.redisPool != nil {
logging.Infof("[%s] initializing redis events queue", identifier)
Expand All @@ -60,7 +62,7 @@ func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string) (Queue,
logging.Infof("[%s] initializing inmemory events queue", identifier)
underlyingQueue = queue.NewInMemory(1_000_000)
}
return NewNativeQueue(queue.DestinationNamespace, subsystem, identifier, underlyingQueue)
return NewNativeQueue(queue.DestinationNamespace, subsystem, identifier, underlyingQueue, streamingRetryDelay, errorRetryPeriod)
}

func (qf *QueueFactory) CreateHTTPQueue(identifier string, serializationModelBuilder func() interface{}) queue.Queue {
Expand Down
7 changes: 3 additions & 4 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"flag"
"fmt"
"github.com/jitsucom/jitsu/server/script"
"math/rand"
"net/http"
"os"
Expand All @@ -19,9 +18,6 @@ import (
"syscall"
"time"

"github.com/jitsucom/jitsu/server/script/node"
"github.com/jitsucom/jitsu/server/templates"

"github.com/gin-gonic/gin/binding"
"github.com/jitsucom/jitsu/server/airbyte"
"github.com/jitsucom/jitsu/server/appconfig"
Expand Down Expand Up @@ -49,12 +45,15 @@ import (
"github.com/jitsucom/jitsu/server/safego"
"github.com/jitsucom/jitsu/server/scheduling"
"github.com/jitsucom/jitsu/server/schema"
"github.com/jitsucom/jitsu/server/script"
"github.com/jitsucom/jitsu/server/script/node"
"github.com/jitsucom/jitsu/server/singer"
"github.com/jitsucom/jitsu/server/sources"
"github.com/jitsucom/jitsu/server/storages"
"github.com/jitsucom/jitsu/server/synchronization"
"github.com/jitsucom/jitsu/server/system"
"github.com/jitsucom/jitsu/server/telemetry"
"github.com/jitsucom/jitsu/server/templates"
"github.com/jitsucom/jitsu/server/timestamp"
"github.com/jitsucom/jitsu/server/users"
"github.com/jitsucom/jitsu/server/uuid"
Expand Down
8 changes: 7 additions & 1 deletion server/storages/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ func (f *FactoryImpl) Configure(destinationID string, destination config.Destina

pkFields := map[string]bool{}
maxColumns := f.maxColumns
streamingRetryDelay := appconfig.Instance.StreamingRetryDelay
errorRetryPeriod := appconfig.Instance.ErrorRetryPeriod
uniqueIDField := appconfig.Instance.GlobalUniqueIDField
if destination.DataLayout != nil {
for _, field := range destination.DataLayout.PrimaryKeyFields {
Expand All @@ -167,6 +169,10 @@ func (f *FactoryImpl) Configure(destinationID string, destination config.Destina
maxColumns = destination.DataLayout.MaxColumns
logging.Infof("[%s] uses max_columns setting: %d", destinationID, maxColumns)
}
if destination.DataLayout.ErrorRetryPeriod > 0 {
errorRetryPeriod = destination.DataLayout.ErrorRetryPeriod
logging.Infof("[%s] uses error_retry_period_hours setting: %d", destinationID, errorRetryPeriod)
}
if destination.DataLayout.UniqueIDField != "" {
uniqueIDField = identifiers.NewUniqueID(destination.DataLayout.UniqueIDField)
}
Expand All @@ -185,7 +191,7 @@ func (f *FactoryImpl) Configure(destinationID string, destination config.Destina

var eventQueue events.Queue
if destination.Mode != SynchronousMode {
eventQueue, err = f.eventsQueueFactory.CreateEventsQueue(destination.Type, destinationID)
eventQueue, err = f.eventsQueueFactory.CreateEventsQueue(destination.Type, destinationID, streamingRetryDelay, errorRetryPeriod)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/storages/mock_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (mf *MockFactory) Create(id string, destination config.DestinationConfig) (
var eventQueue events.Queue
if destination.Mode == StreamMode {
qf := events.NewQueueFactory(nil, 0)
eventQueue, _ = qf.CreateEventsQueue(destination.Type, id)
eventQueue, _ = qf.CreateEventsQueue(destination.Type, id, appconfig.Instance.StreamingRetryDelay, appconfig.Instance.ErrorRetryPeriod)
}
return &testProxyMock{mode: destination.Mode}, eventQueue, nil
}
Expand Down
Loading