Skip to content

Commit

Permalink
[TASK] TRK-474 Store Click information in Event Storage - added mutex…
Browse files Browse the repository at this point in the history
… around Ticker to avoid data race. (#101)
  • Loading branch information
tsknadaj committed Jul 12, 2022
1 parent fc28473 commit b907dcc
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 7 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ jobs:
uses: ./.github/actions/external/go-test
with:
org_token: ${{ secrets.GH_TOKEN }}
lint: true
test: true
vet: true
vet: true
race: false
1 change: 0 additions & 1 deletion kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ func (s *Source) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.Consum
// - s.buf is full (but not draining, since pumps are off)
// - we have consumed a message and are attempting to send it to s.buf
case <-s.done:
break
}
}

Expand Down
2 changes: 1 addition & 1 deletion metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *metastore) PullAll() (map[Processor]Metaitems, error) {

// Make sure no marks are happening on the old metadata
s.procMu.Lock()
s.procMu.Unlock()
s.procMu.Unlock() //lint:ignore SA2001 syncpoint

return oldMeta, nil
}
Expand Down
5 changes: 5 additions & 0 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type timedSupervisor struct {
t *time.Ticker
commits uint32
running uint32
mutex sync.Mutex
}

// NewTimedSupervisor returns a supervisor that commits automatically.
Expand Down Expand Up @@ -241,7 +242,9 @@ func (s *timedSupervisor) Start() error {
return ErrAlreadyRunning
}

s.mutex.Lock()
s.t = time.NewTicker(s.d)
s.mutex.Unlock()

go func() {
for range s.t.C {
Expand All @@ -267,6 +270,8 @@ func (s *timedSupervisor) Close() error {
return ErrNotRunning
}

s.mutex.Lock()
defer s.mutex.Unlock()
s.t.Stop()

return s.inner.Close()
Expand Down
4 changes: 1 addition & 3 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ func nodesConnected(roots []Node) bool {
var visit []Node
connections := 0

for _, node := range roots {
visit = append(visit, node)
}
visit = append(visit, roots...)

for len(visit) > 0 {
var n Node
Expand Down

0 comments on commit b907dcc

Please sign in to comment.