Skip to content

Commit

Permalink
chore: some more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 14, 2023
1 parent db9bf6f commit f46d21e
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 293 deletions.
18 changes: 7 additions & 11 deletions services/notifier/model/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,20 @@ type Job struct {
LastExecTime time.Time
}

type JobMetadata json.RawMessage

type PublishRequest struct {
Payloads []json.RawMessage
PayloadMetadata json.RawMessage
JobType JobType
Priority int
Payloads []json.RawMessage
UploadSchema json.RawMessage // Hack to support merging schema with the payload at the postgres level
JobType JobType
Priority int
}

type PublishResponse struct {
Jobs []Job
JobMetadata JobMetadata
Err error
Jobs []Job
Err error
}

type ClaimJob struct {
Job *Job
JobMetadata JobMetadata
Job *Job
}

type ClaimJobResponse struct {
Expand Down
24 changes: 11 additions & 13 deletions services/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type notifierRepo interface {
PendingByBatchID(context.Context, string) (int64, error)
DeleteByBatchID(context.Context, string) error
OrphanJobIDs(context.Context, int) ([]int64, error)
GetByBatchID(context.Context, string) ([]model.Job, model.JobMetadata, error)
Claim(context.Context, string) (*model.Job, model.JobMetadata, error)
GetByBatchID(context.Context, string) ([]model.Job, error)
Claim(context.Context, string) (*model.Job, error)
OnClaimFailed(context.Context, *model.Job, error, int) error
OnClaimSuccess(context.Context, *model.Job, json.RawMessage) error
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func (n *Notifier) trackBatch(
continue
}

jobs, jobMetadata, err := n.repo.GetByBatchID(ctx, batchID)
jobs, err := n.repo.GetByBatchID(ctx, batchID)
if err != nil {
onUpdate(&model.PublishResponse{
Err: fmt.Errorf("could not get jobs for batch: %s: %w", batchID, err),
Expand All @@ -370,8 +370,7 @@ func (n *Notifier) trackBatch(
n.logger.Infof("Completed processing all files in batch: %s", batchID)

onUpdate(&model.PublishResponse{
Jobs: jobs,
JobMetadata: jobMetadata,
Jobs: jobs,
})
return nil
}
Expand Down Expand Up @@ -403,7 +402,7 @@ func (n *Notifier) Subscribe(
pollSleep := time.Duration(0)

for {
job, metadata, err := n.claim(ctx, workerId)
job, err := n.claim(ctx, workerId)
if err != nil {
var pqErr *pq.Error

Expand All @@ -419,8 +418,7 @@ func (n *Notifier) Subscribe(
pollSleep = nextPollInterval(pollSleep)
} else {
jobsCh <- &model.ClaimJob{
Job: job,
JobMetadata: metadata,
Job: job,
}

pollSleep = time.Duration(0)
Expand All @@ -442,25 +440,25 @@ func (n *Notifier) Subscribe(
func (n *Notifier) claim(
ctx context.Context,
workerID string,
) (*model.Job, model.JobMetadata, error) {
) (*model.Job, error) {
claimStartTime := n.now()

claimedJob, metadata, err := n.repo.Claim(ctx, workerID)
claimedJob, err := n.repo.Claim(ctx, workerID)
if err == sql.ErrNoRows {
return nil, nil, fmt.Errorf("no jobs found: %w", err)
return nil, fmt.Errorf("no jobs found: %w", err)
}
if err != nil {
n.stats.claimFailedTime.Since(claimStartTime)
n.stats.claimFailed.Increment()

return nil, nil, fmt.Errorf("claiming job: %w", err)
return nil, fmt.Errorf("claiming job: %w", err)
}

n.stats.claimLag.SendTiming(n.now().Sub(claimedJob.CreatedAt))
n.stats.claimSucceededTime.Since(claimStartTime)
n.stats.claimSucceeded.Increment()

return claimedJob, metadata, nil
return claimedJob, nil
}

// UpdateClaim updates the notifier with the claimResponse
Expand Down
42 changes: 22 additions & 20 deletions services/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func TestNotifier(t *testing.T) {
json.RawMessage(`{"id":"4"}`),
json.RawMessage(`{"id":"5"}`),
},
JobType: model.JobTypeUpload,
PayloadMetadata: json.RawMessage(`{"mid": "1"}`),
Priority: 50,
JobType: model.JobTypeUpload,
UploadSchema: json.RawMessage(`{"UploadSchema": "1"}`),
Priority: 50,
}

statsStore := memstats.New()
Expand Down Expand Up @@ -149,8 +149,14 @@ func TestNotifier(t *testing.T) {
for i := 0; i < totalJobs; i++ {
job := <-collectResponses
require.NoError(t, job.Err)
require.EqualValues(t, job.JobMetadata, model.JobMetadata(publishRequest.PayloadMetadata))
require.Len(t, job.Jobs, len(publishRequest.Payloads))

successfulJobs := lo.Filter(job.Jobs, func(item model.Job, index int) bool {
return item.Error == nil
})
for _, j := range successfulJobs {
require.EqualValues(t, j.Payload, json.RawMessage(`{"test": "payload"}`))
}
responses = append(responses, job.Jobs...)
}

Expand Down Expand Up @@ -183,7 +189,6 @@ func TestNotifier(t *testing.T) {
"queueName": "pg_notifier_queue",
}).LastValue(), totalJobs*len(publishRequest.Payloads))
})

t.Run("many publish jobs", func(t *testing.T) {
t.Parallel()

Expand All @@ -203,10 +208,10 @@ func TestNotifier(t *testing.T) {
}

publishRequest := &model.PublishRequest{
Payloads: payloads,
JobType: model.JobTypeUpload,
PayloadMetadata: json.RawMessage(`{"mid": "1"}`),
Priority: 50,
Payloads: payloads,
JobType: model.JobTypeUpload,
UploadSchema: json.RawMessage(`{"mid": "1"}`),
Priority: 50,
}

c := config.New()
Expand Down Expand Up @@ -274,7 +279,6 @@ func TestNotifier(t *testing.T) {
})
require.NoError(t, g.Wait())
})

t.Run("bigger batches and many subscribers", func(t *testing.T) {
t.Parallel()

Expand All @@ -294,10 +298,10 @@ func TestNotifier(t *testing.T) {
}

publishRequest := &model.PublishRequest{
Payloads: payloads,
JobType: model.JobTypeUpload,
PayloadMetadata: json.RawMessage(`{"mid": "1"}`),
Priority: 50,
Payloads: payloads,
JobType: model.JobTypeUpload,
UploadSchema: json.RawMessage(`{"mid": "1"}`),
Priority: 50,
}

c := config.New()
Expand Down Expand Up @@ -365,7 +369,6 @@ func TestNotifier(t *testing.T) {
})
require.NoError(t, g.Wait())
})

t.Run("round robin pickup and maintenance workers", func(t *testing.T) {
t.Parallel()

Expand All @@ -385,10 +388,10 @@ func TestNotifier(t *testing.T) {
}

publishRequest := &model.PublishRequest{
Payloads: payloads,
JobType: model.JobTypeUpload,
PayloadMetadata: json.RawMessage(`{"mid": "1"}`),
Priority: 50,
Payloads: payloads,
JobType: model.JobTypeUpload,
UploadSchema: json.RawMessage(`{"mid": "1"}`),
Priority: 50,
}

c := config.New()
Expand Down Expand Up @@ -477,7 +480,6 @@ func TestNotifier(t *testing.T) {
"queueName": "pg_notifier_queue",
}).LastValue(), claimedWorkers.Load()-1)
})

t.Run("env vars", func(t *testing.T) {
t.Parallel()

Expand Down
36 changes: 1 addition & 35 deletions services/notifier/repo/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package repo

import (
"context"
"database/sql"
"fmt"
"time"

Expand All @@ -27,10 +26,6 @@ const (
updated_at,
last_exec_time
`

notifierMetadataTableName = "pg_notifier_queue_metadata"

topic = "warehouse/v1"
)

type Opt func(*Notifier)
Expand Down Expand Up @@ -64,31 +59,7 @@ func (n *Notifier) ResetForWorkspace(
ctx context.Context,
workspaceIdentifier string,
) error {
txn, err := n.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return fmt.Errorf("reset: begin transaction: %w", err)
}
defer func() {
if err != nil {
_ = txn.Rollback()
return
}
}()

_, err = txn.ExecContext(ctx, `
DELETE FROM `+notifierMetadataTableName+`
WHERE batch_id IN (
SELECT DISTINCT batch_id FROM `+notifierTableName+`
WHERE workspace = $1
);
`,
workspaceIdentifier,
)
if err != nil {
return fmt.Errorf("reset: delete metadata for workspace %s: %w", workspaceIdentifier, err)
}

_, err = txn.ExecContext(ctx, `
_, err := n.db.ExecContext(ctx, `
DELETE FROM `+notifierTableName+`
WHERE workspace = $1;
`,
Expand All @@ -97,10 +68,5 @@ func (n *Notifier) ResetForWorkspace(
if err != nil {
return fmt.Errorf("reset: delete for workspace %s: %w", workspaceIdentifier, err)
}

if err = txn.Commit(); err != nil {
return fmt.Errorf("reset: commit: %w", err)
}

return nil
}
Loading

0 comments on commit f46d21e

Please sign in to comment.