Skip to content

Commit

Permalink
nits
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew7234 committed Sep 6, 2023
1 parent 998a926 commit c0986a3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
6 changes: 3 additions & 3 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func (a *itemBasedAnalyzer[Item]) processBatch(ctx context.Context) (int, error)

batches := make([]*storage.QueryBatch, 0, len(items))

for _, item := range items {
for _, it := range items {
// Redeclare `item` for unclobbered use within goroutine.
workItem := item
item := it
batch := &storage.QueryBatch{}
batches = append(batches, batch)
group.Go(func() error {
return a.processor.ProcessItem(groupCtx, batch, workItem)
return a.processor.ProcessItem(groupCtx, batch, item)
})
}

Expand Down
8 changes: 7 additions & 1 deletion analyzer/item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type mockProcessor struct {
// this is distinct from whether the db updates returned by ProcessItem were
// successfully applied to the database.
processedItems map[uint64]struct{}
lock sync.Mutex
}

var _ item.ItemProcessor[*mockItem] = (*mockProcessor)(nil)
Expand All @@ -86,7 +87,8 @@ func (p *mockProcessor) ProcessItem(ctx context.Context, batch *storage.QueryBat
return fmt.Errorf("error processing item %d", item.id)
}
batch.Queue(testItemInsert, item.id)
fmt.Printf("processing item %d", item.id)
p.lock.Lock()
defer p.lock.Unlock()
p.processedItems[item.id] = struct{}{}
return nil
}
Expand Down Expand Up @@ -162,6 +164,7 @@ func TestItemBasedAnalyzerAllItems(t *testing.T) {
nextBatch: 0,
workQueue: workQueue,
processedItems: make(map[uint64]struct{}),
lock: sync.Mutex{},
}
analyzer := setupAnalyzer(t, db, p, testItemBasedConfig)

Expand Down Expand Up @@ -216,6 +219,7 @@ func TestItemBasedAnalyzerStopOnEmpty(t *testing.T) {
nextBatch: 0,
workQueue: workQueue,
processedItems: make(map[uint64]struct{}),
lock: sync.Mutex{},
}
analyzer := setupAnalyzer(t, db, p, testItemBasedConfig)

Expand Down Expand Up @@ -258,6 +262,7 @@ func TestItemBasedAnalyzerBadItem(t *testing.T) {
nextBatch: 0,
workQueue: workQueue,
processedItems: make(map[uint64]struct{}),
lock: sync.Mutex{},
}
analyzer := setupAnalyzer(t, db, p, testItemBasedConfig)

Expand Down Expand Up @@ -310,6 +315,7 @@ func TestItemBasedAnalyzerFailingBatch(t *testing.T) {
nextBatch: 0,
workQueue: workQueue,
processedItems: make(map[uint64]struct{}),
lock: sync.Mutex{},
}
analyzer := setupAnalyzer(t, db, p, testItemBasedConfig)

Expand Down

0 comments on commit c0986a3

Please sign in to comment.