Skip to content

Commit

Permalink
Elide getSubscribers* => ProcessSubscribersInState
Browse files Browse the repository at this point in the history
Just like processScanOutput from the previous commit, it turned out that
getSubscribersInState wasn't as useful anymore. The final
ProcessSubscribersInState implementation isn't that long or complex at
all.

Perhaps I could've started with this abstraction and implementation, but
programming and design is a process. Sometimes you have to wander to
learn where you're eventually going.
  • Loading branch information
mbland committed May 8, 2023
1 parent af0f7a4 commit f3c9814
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 92 deletions.
47 changes: 11 additions & 36 deletions db/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,51 +331,26 @@ func (db *DynamoDb) Delete(ctx context.Context, email string) (err error) {
}

func (db *DynamoDb) ProcessSubscribersInState(
ctx context.Context, status SubscriberStatus, processor SubscriberProcessor,
ctx context.Context, status SubscriberStatus, sp SubscriberProcessor,
) (err error) {
var subs []*Subscriber
var next dbAttributes
index := string(status)
input := &dynamodb.ScanInput{TableName: &db.TableName, IndexName: &index}
var output *dynamodb.ScanOutput

for {
subs, next, err = db.getSubscribersInState(ctx, status, next)

if err != nil {
if output, err = db.Client.Scan(ctx, input); err != nil {
err = fmt.Errorf("failed to get %s subscribers: %s", status, err)
return
}
for _, sub := range subs {
if !processor.Process(sub) {
for _, item := range output.Items {
var sub *Subscriber
if sub, err = parseSubscriber(item); err != nil || !sp.Process(sub) {
return
}
}
if next == nil {
if output.LastEvaluatedKey == nil {
return
}
input.ExclusiveStartKey = output.LastEvaluatedKey
}
}

func (db *DynamoDb) getSubscribersInState(
ctx context.Context, state SubscriberStatus, startKey dbAttributes,
) (subs []*Subscriber, nextStartKey dbAttributes, err error) {
indexName := string(state)
input := &dynamodb.ScanInput{
TableName: &db.TableName,
IndexName: &indexName,
ExclusiveStartKey: startKey,
}
var output *dynamodb.ScanOutput

if output, err = db.Client.Scan(ctx, input); err != nil {
err = fmt.Errorf("failed to get %s subscribers: %s", state, err)
return
}

nextStartKey = output.LastEvaluatedKey
subs = make([]*Subscriber, len(output.Items))
errs := make([]error, len(subs))

for i, item := range output.Items {
subs[i], errs[i] = parseSubscriber(item)
}
err = errors.Join(errs...)
return
}
79 changes: 23 additions & 56 deletions db/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,57 +282,6 @@ func setupDbWithSubscribers() (dyndb *DynamoDb, client *TestDynamoDbClient) {
return
}

func TestGetSubscribersInState(t *testing.T) {
ctx := context.Background()

t.Run("Succeeds", func(t *testing.T) {
dyndb, _ := setupDbWithSubscribers()

subs, next, err := dyndb.getSubscribersInState(
ctx, SubscriberVerified, nil,
)

assert.NilError(t, err)
assert.Assert(t, is.Nil(next))
assert.DeepEqual(t, testVerifiedSubscribers, subs)
})

t.Run("ReturnsErrorIfScanFails", func(t *testing.T) {
dyndb, client := setupDbWithSubscribers()
client.scanErr = errors.New("scanning error")

subs, next, err := dyndb.getSubscribersInState(
ctx, SubscriberVerified, nil,
)

assert.Assert(t, is.Nil(subs))
assert.Assert(t, is.Nil(next))
expectedErr := "failed to get verified subscribers: scanning error"
assert.ErrorContains(t, err, expectedErr)
})

t.Run("ReturnsErrorIfParsingSubscribersFails", func(t *testing.T) {
dyndb, client := setupDbWithSubscribers()
status := SubscriberVerified
client.addSubscriberRecord(dbAttributes{
"email": &dbString{Value: "bad-uid@foo.com"},
"uid": &dbString{Value: "not a uid"},
string(status): toDynamoDbTimestamp(testTimestamp),
})

subs, _, err := dyndb.getSubscribersInState(
ctx, SubscriberVerified, nil,
)

expectedSubscribers := append(testVerifiedSubscribers, nil)
assert.DeepEqual(t, expectedSubscribers, subs)

expectedErr := "failed to parse subscriber: " +
"failed to parse 'uid' from: "
assert.ErrorContains(t, err, expectedErr)
})
}

func TestProcessSubscribersInState(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -387,12 +336,30 @@ func TestProcessSubscribersInState(t *testing.T) {
})
})

t.Run("ReturnsGetSubscribersError", func(t *testing.T) {
dynDb, client, _, f := setup()
client.scanErr = errors.New("scanning error")
t.Run("ReturnsError", func(t *testing.T) {
t.Run("IfScanFails", func(t *testing.T) {
dynDb, client, _, f := setup()
client.scanErr = errors.New("scanning error")

err := dynDb.ProcessSubscribersInState(ctx, SubscriberVerified, f)
err := dynDb.ProcessSubscribersInState(ctx, SubscriberVerified, f)

assert.ErrorContains(t, err, "scanning error")
assert.ErrorContains(t, err, "scanning error")
})

t.Run("IfParseSubscriberFails", func(t *testing.T) {
dynDb, client, _, f := setup()
status := SubscriberVerified
client.addSubscriberRecord(dbAttributes{
"email": &dbString{Value: "bad-uid@foo.com"},
"uid": &dbString{Value: "not a uid"},
string(status): toDynamoDbTimestamp(testTimestamp),
})

err := dynDb.ProcessSubscribersInState(ctx, SubscriberVerified, f)

expectedErr := "failed to parse subscriber: " +
"failed to parse 'uid' from: "
assert.ErrorContains(t, err, expectedErr)
})
})
}

0 comments on commit f3c9814

Please sign in to comment.