Skip to content

Commit

Permalink
Fix concurrent map writes in "ocpp1.6".centralSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbeaumont authored and lorenzodonini committed Apr 7, 2021
1 parent 5a351e0 commit 40a8792
Showing 1 changed file with 26 additions and 8 deletions.
34 changes: 26 additions & 8 deletions ocpp1.6/central_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ocpp16

import (
"fmt"
"sync"

"github.com/lorenzodonini/ocpp-go/ocpp"
"github.com/lorenzodonini/ocpp-go/ocpp1.6/core"
Expand All @@ -22,6 +23,7 @@ type centralSystem struct {
reservationHandler reservation.CentralSystemHandler
remoteTriggerHandler remotetrigger.CentralSystemHandler
smartChargingHandler smartcharging.CentralSystemHandler
callbacksMutex sync.RWMutex
callbacks map[string]func(confirmation ocpp.Response, err error)
errC chan error
}
Expand Down Expand Up @@ -371,10 +373,11 @@ func (cs *centralSystem) SendRequestAsync(clientId string, request ocpp.Request,
default:
return fmt.Errorf("unsupported action %v on central system, cannot send request", featureName)
}
cs.callbacks[clientId] = callback
err := cs.server.SendRequest(clientId, request)
if err != nil {
delete(cs.callbacks, clientId)

cs.insertCallback(clientId, callback)

if err := cs.server.SendRequest(clientId, request); err != nil {
_, _ = cs.popCallback(clientId)
return err
}
return nil
Expand Down Expand Up @@ -499,9 +502,25 @@ func (cs *centralSystem) handleIncomingRequest(chargePointId string, request ocp
}()
}

func (cs *centralSystem) insertCallback(chargePointId string, callback func(confirmation ocpp.Response, err error)) {
cs.callbacksMutex.Lock()
defer cs.callbacksMutex.Unlock()

cs.callbacks[chargePointId] = callback
}

func (cs *centralSystem) popCallback(chargePointId string) (func(confirmation ocpp.Response, err error), bool) {
cs.callbacksMutex.Lock()
defer cs.callbacksMutex.Unlock()

callback, ok := cs.callbacks[chargePointId]
delete(cs.callbacks, chargePointId)

return callback, ok
}

func (cs *centralSystem) handleIncomingConfirmation(chargePointId string, confirmation ocpp.Response, requestId string) {
if callback, ok := cs.callbacks[chargePointId]; ok {
delete(cs.callbacks, chargePointId)
if callback, ok := cs.popCallback(chargePointId); ok {
callback(confirmation, nil)
} else {
err := fmt.Errorf("no handler available for call of type %v from client %s for request %s", confirmation.GetFeatureName(), chargePointId, requestId)
Expand All @@ -510,8 +529,7 @@ func (cs *centralSystem) handleIncomingConfirmation(chargePointId string, confir
}

func (cs *centralSystem) handleIncomingError(chargePointId string, err *ocpp.Error, details interface{}) {
if callback, ok := cs.callbacks[chargePointId]; ok {
delete(cs.callbacks, chargePointId)
if callback, ok := cs.popCallback(chargePointId); ok {
callback(nil, err)
} else {
err := fmt.Errorf("no handler available for call error %w from client %s", err, chargePointId)
Expand Down

0 comments on commit 40a8792

Please sign in to comment.