Skip to content

Commit

Permalink
Merge pull request #6 from miladosos/exercise-1
Browse files Browse the repository at this point in the history
[BUGFIX] Closing broker
  • Loading branch information
miladosos committed Apr 2, 2019
2 parents 9206bab + 7b56f67 commit b1f61a8
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions internal/broker/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ type publishResponse struct {
}

func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) {
if c.isCanceled(ctx) {
if isCanceledContext(ctx) {
return nil, ctx.Err()
}

if !c.start() {
if !c.startWorking() {
return nil, ubroker.ErrClosed
}
defer c.working.Done()
Expand All @@ -90,11 +90,11 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) {
}

func (c *core) Acknowledge(ctx context.Context, id int) error {
if c.isCanceled(ctx) {
if isCanceledContext(ctx) {
return ctx.Err()
}

if !c.start() {
if !c.startWorking() {
return ubroker.ErrClosed
}
defer c.working.Done()
Expand All @@ -118,11 +118,11 @@ func (c *core) Acknowledge(ctx context.Context, id int) error {
}

func (c *core) ReQueue(ctx context.Context, id int) error {
if c.isCanceled(ctx) {
if isCanceledContext(ctx) {
return ctx.Err()
}

if !c.start() {
if !c.startWorking() {
return ubroker.ErrClosed
}
defer c.working.Done()
Expand All @@ -146,11 +146,11 @@ func (c *core) ReQueue(ctx context.Context, id int) error {
}

func (c *core) Publish(ctx context.Context, message ubroker.Message) error {
if c.isCanceled(ctx) {
if isCanceledContext(ctx) {
return ctx.Err()
}

if !c.start() {
if !c.startWorking() {
return ubroker.ErrClosed
}
defer c.working.Done()
Expand All @@ -172,7 +172,7 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error {

func (c *core) Close() error {
if !c.startClosing() {
return nil
return errors.New("can not close channel, closing in progress")
}
c.working.Wait()
close(c.closed)
Expand Down Expand Up @@ -222,7 +222,7 @@ func (c *core) startDelivery() {
}
}

func (c *core) start() bool {
func (c *core) startWorking() bool {
c.mutex.Lock()
defer c.mutex.Unlock()

Expand All @@ -246,11 +246,9 @@ func (c *core) startClosing() bool {
close(c.closing)
return true
}
c.working.Add(1)
return true
}

func (c *core) isCanceled(ctx context.Context) bool {
func isCanceledContext(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
Expand Down

0 comments on commit b1f61a8

Please sign in to comment.