Skip to content

Commit

Permalink
add teardown procedure for eventstore service
Browse files Browse the repository at this point in the history
  • Loading branch information
negrel committed Apr 4, 2024
1 parent 18ba095 commit 5aaa20d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/server/full/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/server/ingestion/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 52 additions & 18 deletions pkg/services/eventstore/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,47 @@ import (
"github.com/prismelabs/analytics/pkg/clickhouse"
"github.com/prismelabs/analytics/pkg/config"
"github.com/prismelabs/analytics/pkg/event"
"github.com/prismelabs/analytics/pkg/services/teardown"
"github.com/rs/zerolog"
)

// ProvideClickhouseService is a wire provider for a clickhouse based event
// storage service.
func ProvideClickhouseService(ch clickhouse.Ch, logger zerolog.Logger) Service {
func ProvideClickhouseService(
ch clickhouse.Ch,
logger zerolog.Logger,
teardownService teardown.Service,
) Service {
maxBatchSize := config.ParseUintEnvOrDefault("PRISME_EVENTSTORE_MAX_BATCH_SIZE", 4096, 64)
maxBatchTimeout := config.ParseDurationEnvOrDefault("PRISME_EVENTSTORE_MAX_BATCH_TIMEOUT", 1*time.Minute)
batchDone := make(chan struct{})

logger = logger.With().
Str("service", "eventstore").
Str("service_impl", "clickhouse").
Uint64("max_batch_size", maxBatchSize).
Stringer("max_batch_timeout", maxBatchTimeout).
Logger()

// Create context for batch loops.
ctx, cancel := context.WithCancel(context.Background())

// Cancel them on teardown.
teardownService.RegisterProcedure(func() error {
logger.Info().Msg("cancelling event batch loops...")
cancel()
// Wait for last batch to be sent.
<-batchDone
<-batchDone
logger.Info().Msg("event batch loops cancelled.")
return nil
})

service := &ClickhouseService{
logger: logger,
conn: ch.Conn,
maxBatchSize: config.ParseUintEnvOrDefault("PRISME_EVENTSTORE_MAX_BATCH_SIZE", 4096, 64),
maxBatchTimeout: config.ParseDurationEnvOrDefault("PRISME_EVENTSTORE_MAX_BATCH_TIMEOUT", 1*time.Minute),
maxBatchSize: maxBatchSize,
maxBatchTimeout: maxBatchTimeout,
}
service.pageViewRingBuf = ringo.NewWaiter(
ringo.NewManyToOne(
Expand All @@ -28,6 +58,7 @@ func ProvideClickhouseService(ch clickhouse.Ch, logger zerolog.Logger) Service {
service.logger.Warn().Msg("pageview events ring buffer collision detected, consider increasing PRISME_EVENTSTORE_MAX_BATCH_SIZE")
})),
),
ringo.WithWaiterContext[*event.PageView](ctx),
)
service.customEventRingBuf = ringo.NewWaiter(
ringo.NewManyToOne(
Expand All @@ -36,20 +67,14 @@ func ProvideClickhouseService(ch clickhouse.Ch, logger zerolog.Logger) Service {
service.logger.Warn().Msg("custom events ring buffer collision detected, consider increasing PRISME_EVENTSTORE_MAX_BATCH_SIZE")
})),
),
ringo.WithWaiterContext[*event.Custom](ctx),
)

logger = logger.With().
Str("service", "eventstore").
Str("service_impl", "clickhouse").
Uint64("max_batch_size", service.maxBatchSize).
Stringer("max_batch_timeout", service.maxBatchTimeout).
Logger()
go service.batchPageViewLoop(batchDone)
go service.batchCustomEventLoop(batchDone)

logger.Info().Msg("clickhouse based event store configured")

go service.batchPageViewLoop()
go service.batchCustomEventLoop()

return service
}

Expand All @@ -74,7 +99,7 @@ func (cs *ClickhouseService) StoreCustom(_ context.Context, ev *event.Custom) er
return nil
}

func (cs *ClickhouseService) batchPageViewLoop() {
func (cs *ClickhouseService) batchPageViewLoop(batchDone chan<- struct{}) {
var batch driver.Batch
var err error
batchCreationDate := time.Now()
Expand All @@ -95,18 +120,22 @@ func (cs *ClickhouseService) batchPageViewLoop() {
batchCreationDate = time.Now()
}

// Wait for next event.
ev, done, dropped := cs.pageViewRingBuf.Next()
// Ring buffer context was cancelled.
if done {
cs.logger.Info().Msg("page view ring buffer done, terminating batch loop...")
cs.logger.Info().Msg("page view ring buffer done, sending last batch...")
cs.sendBatch(batch)
cs.logger.Info().Msg("last batch of page view events sent.")
batchDone <- struct{}{}
return
}
if dropped > 0 {
cs.logger.Info().Int("dropped", dropped).Msg("pageview events dropped")
}

cs.logger.Debug().Any("pageview_event", ev).Msg("appending pageview event to batch...")

// Append to batch.
cs.logger.Debug().Any("pageview_event", ev).Msg("appending pageview event to batch...")
err = batch.Append(
ev.Timestamp,
ev.PageUri.Host(),
Expand All @@ -128,7 +157,7 @@ func (cs *ClickhouseService) batchPageViewLoop() {
}
}

func (cs *ClickhouseService) batchCustomEventLoop() {
func (cs *ClickhouseService) batchCustomEventLoop(batchDone chan<- struct{}) {
var batch driver.Batch
var err error
batchCreationDate := time.Now()
Expand All @@ -149,9 +178,14 @@ func (cs *ClickhouseService) batchCustomEventLoop() {
batchCreationDate = time.Now()
}

// Wait for next event.
ev, done, dropped := cs.customEventRingBuf.Next()
// Ring buffer context was cancelled.
if done {
cs.logger.Info().Msg("custom ring buffer done, terminating batch loop...")
cs.logger.Info().Msg("custom ring buffer done, sending last batch...")
cs.sendBatch(batch)
cs.logger.Info().Msg("last batch of custom events sent.")
batchDone <- struct{}{}
return
}
if dropped > 0 {
Expand Down

0 comments on commit 5aaa20d

Please sign in to comment.