diff --git a/README.md b/README.md index daacc1d0..249a1b10 100644 --- a/README.md +++ b/README.md @@ -150,7 +150,7 @@ docker run -it -p 8887:8887 --rm --name central-system ldonini/ocpp1.6-central-s You can also run it directly using docker-compose: ```sh -docker-compose -f example/1.6/docker-compose.yml up central_system +docker-compose -f example/1.6/docker-compose.yml up central-system ``` #### TLS @@ -293,7 +293,7 @@ You need to specify the host, on which the central system is running, in order f You can also run it directly using docker-compose: ```sh -docker-compose -f example/1.6/docker-compose.yml up charge_point +docker-compose -f example/1.6/docker-compose.yml up charge-point ``` #### TLS @@ -304,7 +304,7 @@ Feel free to use the utility script `cd example/1.6 && ./create-test-certificate Then run the following: ``` -docker-compose -f example/1.6/docker-compose.tls.yml up charge_point +docker-compose -f example/1.6/docker-compose.tls.yml up charge-point ``` ## OCPP 2.0 Usage diff --git a/example/1.6/cp/charge_point_sim.go b/example/1.6/cp/charge_point_sim.go index 79ad6875..6dbe166e 100644 --- a/example/1.6/cp/charge_point_sim.go +++ b/example/1.6/cp/charge_point_sim.go @@ -66,34 +66,6 @@ func setupTlsChargePoint(chargePointID string) ocpp16.ChargePoint { return ocpp16.NewChargePoint(chargePointID, nil, client) } -// Used for scheduling requests that are not generated from the main thread. -// OCPP is a request-response protocol and doesn't support multiple simultaneous requests. -// -// By scheduling a request, it can later on be sent safely from the main thread. -func scheduleAsyncRequest(asyncRequest func()) { - asyncRequestChan <- asyncRequest -} - -// Wait is used for simulating idle time, while being able to process trigger requests. -// -// If trigger requests (or other asynchronous requests) are fired while waiting, the request will be sent. -// This mechanism prevents errors when attempting to send a request while another one is still pending. -func wait(d time.Duration) { - t := time.NewTimer(d) - for { - select { - case req, ok := <-asyncRequestChan: - if !ok { - return - } - req() - case <-t.C: - log.Debugf("finished simulated wait") - return - } - } -} - // exampleRoutine simulates a charge point flow, where func exampleRoutine(chargePoint ocpp16.ChargePoint, stateHandler *ChargePointHandler) { dummyClientIdTag := "12345" @@ -105,7 +77,7 @@ func exampleRoutine(chargePoint ocpp16.ChargePoint, stateHandler *ChargePointHan // Notify connector status updateStatus(stateHandler, 0, core.ChargePointStatusAvailable) // Wait for some time ... - wait(5 * time.Second) + time.Sleep(5 * time.Second) // Simulate charging for connector 1 authConf, err := chargePoint.Authorize(dummyClientIdTag) checkError(err) @@ -125,7 +97,7 @@ func exampleRoutine(chargePoint ocpp16.ChargePoint, stateHandler *ChargePointHan if !ok { sampleInterval = 5 } - wait(time.Second * time.Duration(sampleInterval)) + time.Sleep(time.Second * time.Duration(sampleInterval)) stateHandler.meterValue += 10 sampledValue := types.SampledValue{Value: fmt.Sprintf("%v", stateHandler.meterValue), Unit: types.UnitOfMeasureWh, Format: types.ValueFormatRaw, Measurand: types.MeasurandEnergyActiveExportRegister, Context: types.ReadingContextSamplePeriodic, Location: types.LocationOutlet} meterValue := types.MeterValue{Timestamp: types.NewDateTime(time.Now()), SampledValue: []types.SampledValue{sampledValue}} @@ -147,7 +119,7 @@ func exampleRoutine(chargePoint ocpp16.ChargePoint, stateHandler *ChargePointHan // Update connector status updateStatus(stateHandler, chargingConnector, core.ChargePointStatusAvailable) // Wait for some time ... - wait(5 * time.Minute) + time.Sleep(5 * time.Minute) } // Start function @@ -193,13 +165,10 @@ func main() { // Connects to central system err := chargePoint.Start(csUrl) if err != nil { - log.Println(err) + log.Errorln(err) } else { log.Infof("connected to central system at %v", csUrl) - // Setup channel for asynchronous requests (e.g. triggers) - asyncRequestChan = make(chan func(), 5) exampleRoutine(chargePoint, handler) - close(asyncRequestChan) // Disconnect chargePoint.Stop() log.Infof("disconnected from central system") diff --git a/example/1.6/cp/handler.go b/example/1.6/cp/handler.go index 147ab8e1..91b88b09 100644 --- a/example/1.6/cp/handler.go +++ b/example/1.6/cp/handler.go @@ -37,8 +37,6 @@ type ChargePointHandler struct { localAuthListVersion int } -var asyncRequestChan chan func() - var chargePoint ocpp16.ChargePoint func (handler *ChargePointHandler) isValidConnectorID(ID int) bool { @@ -215,24 +213,22 @@ func (handler *ChargePointHandler) OnTriggerMessage(request *remotetrigger.Trigg break case firmware.DiagnosticsStatusNotificationFeatureName: // Schedule diagnostics status notification request - fn := func() { + go func() { _, e := chargePoint.DiagnosticsStatusNotification(firmware.DiagnosticsStatusIdle) checkError(e) logDefault(firmware.DiagnosticsStatusNotificationFeatureName).Info("diagnostics status notified") - } - scheduleAsyncRequest(fn) + }() status = remotetrigger.TriggerMessageStatusAccepted case firmware.FirmwareStatusNotificationFeatureName: //TODO: schedule firmware status notification message break case core.HeartbeatFeatureName: // Schedule heartbeat request - fn := func() { + go func() { conf, e := chargePoint.Heartbeat() checkError(e) logDefault(core.HeartbeatFeatureName).Infof("clock synchronized: %v", conf.CurrentTime.FormatTimestamp()) - } - scheduleAsyncRequest(fn) + }() status = remotetrigger.TriggerMessageStatusAccepted case core.MeterValuesFeatureName: //TODO: schedule meter values message @@ -245,7 +241,7 @@ func (handler *ChargePointHandler) OnTriggerMessage(request *remotetrigger.Trigg return remotetrigger.NewTriggerMessageConfirmation(remotetrigger.TriggerMessageStatusRejected), nil } // Schedule status notification request - fn := func() { + go func() { status := handler.status if c, ok := handler.connectors[connectorID]; ok { status = c.status @@ -253,8 +249,7 @@ func (handler *ChargePointHandler) OnTriggerMessage(request *remotetrigger.Trigg statusConfirmation, err := chargePoint.StatusNotification(connectorID, handler.errorCode, status) checkError(err) logDefault(statusConfirmation.GetFeatureName()).Infof("status for connector %v sent: %v", connectorID, status) - } - scheduleAsyncRequest(fn) + }() status = remotetrigger.TriggerMessageStatusAccepted default: return remotetrigger.NewTriggerMessageConfirmation(remotetrigger.TriggerMessageStatusNotImplemented), nil @@ -348,24 +343,19 @@ func updateFirmwareStatus(status firmware.FirmwareStatus, props ...func(request } func updateFirmware(location string, retrieveDate *types.DateTime, retries int, retryInterval int) { - fn := func(status firmware.FirmwareStatus) func() { - return func() { - updateFirmwareStatus(status) - } - } - scheduleAsyncRequest(fn(firmware.FirmwareStatusDownloading)) + updateFirmwareStatus(firmware.FirmwareStatusDownloading) err := downloadFile("/tmp/out.bin", location) if err != nil { logDefault(firmware.UpdateFirmwareFeatureName).Errorf("error while downloading file %v", err) - scheduleAsyncRequest(fn(firmware.FirmwareStatusDownloadFailed)) + updateFirmwareStatus(firmware.FirmwareStatusDownloadFailed) return } - scheduleAsyncRequest(fn(firmware.FirmwareStatusDownloaded)) + updateFirmwareStatus(firmware.FirmwareStatusDownloaded) // Simulate installation - scheduleAsyncRequest(fn(firmware.FirmwareStatusInstalling)) + updateFirmwareStatus(firmware.FirmwareStatusInstalling) time.Sleep(time.Second * 5) // Notify completion - scheduleAsyncRequest(fn(firmware.FirmwareStatusInstalled)) + updateFirmwareStatus(firmware.FirmwareStatusInstalled) } func downloadFile(filepath string, url string) error { diff --git a/ocpp1.6/v16.go b/ocpp1.6/v16.go index df6b17b4..9dd26a38 100644 --- a/ocpp1.6/v16.go +++ b/ocpp1.6/v16.go @@ -131,7 +131,7 @@ func NewChargePoint(id string, dispatcher *ocppj.Client, client ws.WsClient) Cha } }) if dispatcher == nil { - dispatcher = ocppj.NewClient(id, client, core.Profile, localauth.Profile, firmware.Profile, reservation.Profile, remotetrigger.Profile, smartcharging.Profile) + dispatcher = ocppj.NewClient(id, client, ocppj.NewFIFOClientQueue(0), core.Profile, localauth.Profile, firmware.Profile, reservation.Profile, remotetrigger.Profile, smartcharging.Profile) } cp := chargePoint{client: dispatcher, confirmationHandler: make(chan ocpp.Response), errorHandler: make(chan error)} cp.client.SetResponseHandler(func(confirmation ocpp.Response, requestId string) { @@ -249,7 +249,7 @@ func NewCentralSystem(dispatcher *ocppj.Server, server ws.WsServer) CentralSyste } server.AddSupportedSubprotocol(types.V16Subprotocol) if dispatcher == nil { - dispatcher = ocppj.NewServer(server, core.Profile, localauth.Profile, firmware.Profile, reservation.Profile, remotetrigger.Profile, smartcharging.Profile) + dispatcher = ocppj.NewServer(server, ocppj.NewFIFOQueueMap(0), core.Profile, localauth.Profile, firmware.Profile, reservation.Profile, remotetrigger.Profile, smartcharging.Profile) } cs := centralSystem{ server: dispatcher, diff --git a/ocpp1.6_test/ocpp16_test.go b/ocpp1.6_test/ocpp16_test.go index 77acdf39..63a411fc 100644 --- a/ocpp1.6_test/ocpp16_test.go +++ b/ocpp1.6_test/ocpp16_test.go @@ -103,6 +103,9 @@ func (websocketClient *MockWebsocketClient) Write(data []byte) error { func (websocketClient *MockWebsocketClient) AddOption(option interface{}) { } +// Default queue capacity +const queueCapacity = 10 + // ---------------------- MOCK FEATURE ---------------------- const ( MockFeatureName = "Mock" @@ -554,14 +557,24 @@ func testUnsupportedRequestFromCentralSystem(suite *OcppV16TestSuite, request oc suite.centralSystem.Start(8887, "somePath") err := suite.chargePoint.Start(wsUrl) assert.Nil(t, err) - // Run request test + // Run request test, expecting an error err = suite.centralSystem.SendRequestAsync(wsId, request, func(confirmation ocpp.Response, err error) { t.Fail() }) assert.Error(t, err) assert.Equal(t, expectedError, err.Error()) - // Run response test + // Add mocked request in queue and mark as pending, otherwise response will be ignored + mockCall, _ := suite.ocppjCentralSystem.CreateCall(request) + mockCall.UniqueId = messageId + jsonMessage, _ := mockCall.MarshalJSON() + requestBundle := ocppj.RequestBundle{ + Call: mockCall, + Data: jsonMessage, + } + q := suite.serverRequestMap.GetOrCreate(wsId) + _ = q.Push(requestBundle) suite.ocppjCentralSystem.AddPendingRequest(messageId, request) + // Run response test err = suite.mockWsClient.MessageHandler([]byte(requestJson)) assert.Nil(t, err) } @@ -603,6 +616,8 @@ type OcppV16TestSuite struct { chargePoint ocpp16.ChargePoint centralSystem ocpp16.CentralSystem messageIdGenerator TestRandomIdGenerator + clientRequestQueue ocppj.RequestQueue + serverRequestMap ocppj.ServerQueueMap } type TestRandomIdGenerator struct { @@ -626,8 +641,10 @@ func (suite *OcppV16TestSuite) SetupTest() { mockServer := MockWebsocketServer{} suite.mockWsClient = &mockClient suite.mockWsServer = &mockServer - suite.ocppjChargePoint = ocppj.NewClient("test_id", suite.mockWsClient, coreProfile, localAuthListProfile, firmwareProfile, reservationProfile, remoteTriggerProfile, smartChargingProfile) - suite.ocppjCentralSystem = ocppj.NewServer(suite.mockWsServer, coreProfile, localAuthListProfile, firmwareProfile, reservationProfile, remoteTriggerProfile, smartChargingProfile) + suite.clientRequestQueue = ocppj.NewFIFOClientQueue(queueCapacity) + suite.serverRequestMap = ocppj.NewFIFOQueueMap(queueCapacity) + suite.ocppjChargePoint = ocppj.NewClient("test_id", suite.mockWsClient, suite.clientRequestQueue, coreProfile, localAuthListProfile, firmwareProfile, reservationProfile, remoteTriggerProfile, smartChargingProfile) + suite.ocppjCentralSystem = ocppj.NewServer(suite.mockWsServer, suite.serverRequestMap, coreProfile, localAuthListProfile, firmwareProfile, reservationProfile, remoteTriggerProfile, smartChargingProfile) suite.chargePoint = ocpp16.NewChargePoint("test_id", suite.ocppjChargePoint, suite.mockWsClient) suite.centralSystem = ocpp16.NewCentralSystem(suite.ocppjCentralSystem, suite.mockWsServer) suite.messageIdGenerator = TestRandomIdGenerator{generator: func() string { diff --git a/ocpp2.0/v2.go b/ocpp2.0/v2.go index a133006f..a97fdeb4 100644 --- a/ocpp2.0/v2.go +++ b/ocpp2.0/v2.go @@ -160,7 +160,7 @@ func NewChargingStation(id string, dispatcher *ocppj.Client, client ws.WsClient) } }) if dispatcher == nil { - dispatcher = ocppj.NewClient(id, client, authorization.Profile, availability.Profile, data.Profile, diagnostics.Profile, display.Profile, firmware.Profile, iso15118.Profile, localauth.Profile, meter.Profile, provisioning.Profile, remotecontrol.Profile, reservation.Profile, security.Profile, smartcharging.Profile, tariffcost.Profile, transactions.Profile) + dispatcher = ocppj.NewClient(id, client, ocppj.NewFIFOClientQueue(0), authorization.Profile, availability.Profile, data.Profile, diagnostics.Profile, display.Profile, firmware.Profile, iso15118.Profile, localauth.Profile, meter.Profile, provisioning.Profile, remotecontrol.Profile, reservation.Profile, security.Profile, smartcharging.Profile, tariffcost.Profile, transactions.Profile) } cs := chargingStation{client: dispatcher, responseHandler: make(chan ocpp.Response), errorHandler: make(chan error)} cs.client.SetResponseHandler(func(confirmation ocpp.Response, requestId string) { @@ -314,7 +314,7 @@ func NewCSMS(dispatcher *ocppj.Server, server ws.WsServer) CSMS { } server.AddSupportedSubprotocol(types.V2Subprotocol) if dispatcher == nil { - dispatcher = ocppj.NewServer(server, authorization.Profile, availability.Profile, data.Profile, diagnostics.Profile, display.Profile, firmware.Profile, iso15118.Profile, localauth.Profile, meter.Profile, provisioning.Profile, remotecontrol.Profile, reservation.Profile, security.Profile, smartcharging.Profile, tariffcost.Profile, transactions.Profile) + dispatcher = ocppj.NewServer(server, ocppj.NewFIFOQueueMap(0), authorization.Profile, availability.Profile, data.Profile, diagnostics.Profile, display.Profile, firmware.Profile, iso15118.Profile, localauth.Profile, meter.Profile, provisioning.Profile, remotecontrol.Profile, reservation.Profile, security.Profile, smartcharging.Profile, tariffcost.Profile, transactions.Profile) } cs := csms{ server: dispatcher, diff --git a/ocpp2.0_test/ocpp2_test.go b/ocpp2.0_test/ocpp2_test.go index bfc80fcf..f03a02a3 100644 --- a/ocpp2.0_test/ocpp2_test.go +++ b/ocpp2.0_test/ocpp2_test.go @@ -117,6 +117,9 @@ func (websocketClient *MockWebsocketClient) Write(data []byte) error { func (websocketClient *MockWebsocketClient) AddOption(option interface{}) { } +// Default queue capacity +const queueCapacity = 10 + // ---------------------- MOCK FEATURE ---------------------- const ( MockFeatureName = "Mock" @@ -751,14 +754,24 @@ func testUnsupportedRequestFromCentralSystem(suite *OcppV2TestSuite, request ocp suite.csms.Start(8887, "somePath") err := suite.chargingStation.Start(wsUrl) require.Nil(t, err) - // Run request test + // Run request test, expecting an error err = suite.csms.SendRequestAsync(wsId, request, func(response ocpp.Response, err error) { t.Fail() }) require.Error(t, err) assert.Equal(t, expectedError, err.Error()) - // Run response test + // Add mocked request in queue and mark as pending, otherwise response will be ignored + mockCall, _ := suite.ocppjServer.CreateCall(request) + mockCall.UniqueId = messageId + jsonMessage, _ := mockCall.MarshalJSON() + requestBundle := ocppj.RequestBundle{ + Call: mockCall, + Data: jsonMessage, + } + q := suite.serverRequestMap.GetOrCreate(wsId) + _ = q.Push(requestBundle) suite.ocppjServer.AddPendingRequest(messageId, request) + // Run response test err = suite.mockWsClient.MessageHandler([]byte(requestJson)) assert.Nil(t, err) } @@ -791,6 +804,8 @@ type OcppV2TestSuite struct { chargingStation ocpp2.ChargingStation csms ocpp2.CSMS messageIdGenerator TestRandomIdGenerator + clientRequestQueue ocppj.RequestQueue + serverRequestMap ocppj.ServerQueueMap } type TestRandomIdGenerator struct { @@ -825,8 +840,10 @@ func (suite *OcppV2TestSuite) SetupTest() { mockServer := MockWebsocketServer{} suite.mockWsClient = &mockClient suite.mockWsServer = &mockServer - suite.ocppjClient = ocppj.NewClient("test_id", suite.mockWsClient, securityProfile, provisioningProfile, authProfile, availabilityProfile, reservationProfile, diagnosticsProfile, dataProfile, displayProfile, firmwareProfile, isoProfile, localAuthProfile, meterProfile, remoteProfile, smartChargingProfile, tariffProfile, transactionsProfile) - suite.ocppjServer = ocppj.NewServer(suite.mockWsServer, securityProfile, provisioningProfile, authProfile, availabilityProfile, reservationProfile, diagnosticsProfile, dataProfile, displayProfile, firmwareProfile, isoProfile, localAuthProfile, meterProfile, remoteProfile, smartChargingProfile, tariffProfile, transactionsProfile) + suite.clientRequestQueue = ocppj.NewFIFOClientQueue(queueCapacity) + suite.serverRequestMap = ocppj.NewFIFOQueueMap(queueCapacity) + suite.ocppjClient = ocppj.NewClient("test_id", suite.mockWsClient, suite.clientRequestQueue, securityProfile, provisioningProfile, authProfile, availabilityProfile, reservationProfile, diagnosticsProfile, dataProfile, displayProfile, firmwareProfile, isoProfile, localAuthProfile, meterProfile, remoteProfile, smartChargingProfile, tariffProfile, transactionsProfile) + suite.ocppjServer = ocppj.NewServer(suite.mockWsServer, suite.serverRequestMap, securityProfile, provisioningProfile, authProfile, availabilityProfile, reservationProfile, diagnosticsProfile, dataProfile, displayProfile, firmwareProfile, isoProfile, localAuthProfile, meterProfile, remoteProfile, smartChargingProfile, tariffProfile, transactionsProfile) suite.chargingStation = ocpp2.NewChargingStation("test_id", suite.ocppjClient, suite.mockWsClient) suite.csms = ocpp2.NewCSMS(suite.ocppjServer, suite.mockWsServer) suite.messageIdGenerator = TestRandomIdGenerator{generator: func() string { diff --git a/ocppj/central_system_test.go b/ocppj/central_system_test.go index 5d9d10c7..39c83d26 100644 --- a/ocppj/central_system_test.go +++ b/ocppj/central_system_test.go @@ -5,14 +5,21 @@ import ( "fmt" "github.com/lorenzodonini/ocpp-go/ocpp" "github.com/lorenzodonini/ocpp-go/ocppj" + "github.com/lorenzodonini/ocpp-go/ws" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "strconv" + "sync" + "time" ) // SendRequest func (suite *OcppJTestSuite) TestCentralSystemSendRequest() { mockChargePointId := "1234" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) suite.mockServer.On("Write", mockChargePointId, mock.Anything).Return(nil) + suite.centralSystem.Start(8887, "/{ws}") mockRequest := newMockRequest("mockValue") err := suite.centralSystem.SendRequest(mockChargePointId, mockRequest) assert.Nil(suite.T(), err) @@ -20,30 +27,39 @@ func (suite *OcppJTestSuite) TestCentralSystemSendRequest() { func (suite *OcppJTestSuite) TestCentralSystemSendInvalidRequest() { mockChargePointId := "1234" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) suite.mockServer.On("Write", mockChargePointId, mock.Anything).Return(nil) + suite.centralSystem.Start(8887, "/{ws}") mockRequest := newMockRequest("") err := suite.centralSystem.SendRequest(mockChargePointId, mockRequest) assert.NotNil(suite.T(), err) } -func (suite *OcppJTestSuite) TestCentralSystemSendRequestPending() { - mockChargePointId := "1234" - suite.mockServer.On("Write", mockChargePointId, mock.Anything).Return(nil) - mockRequest := newMockRequest("mockValue") - err := suite.centralSystem.SendRequest(mockChargePointId, mockRequest) - assert.Nil(suite.T(), err) - err = suite.centralSystem.SendRequest(mockChargePointId, mockRequest) - assert.NotNil(suite.T(), err) -} - func (suite *OcppJTestSuite) TestCentralSystemSendRequestFailed() { + t := suite.T() mockChargePointId := "1234" - suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(errors.New("networkError")) + var callID string + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) + suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(errors.New("networkError")).Run(func(args mock.Arguments) { + clientID := args.String(0) + q, ok := suite.serverRequestMap.Get(clientID) + require.True(t, ok) + require.False(t, q.IsEmpty()) + req := q.Peek().(ocppj.RequestBundle) + callID = req.Call.GetUniqueId() + _, ok = suite.centralSystem.GetPendingRequest(callID) + // Before anything is returned, the request must still be pending + assert.True(t, ok) + }) + suite.centralSystem.Start(8887, "/{ws}") mockRequest := newMockRequest("mockValue") err := suite.centralSystem.SendRequest(mockChargePointId, mockRequest) - assert.NotNil(suite.T(), err) - assert.Equal(suite.T(), "networkError", err.Error()) - //TODO: assert that pending request was removed + //TODO: currently the network error is not returned by SendRequest, but is only generated internally + assert.Nil(t, err) + // Assert that pending request was removed + time.Sleep(500 * time.Millisecond) + _, ok := suite.centralSystem.GetPendingRequest(callID) + assert.False(t, ok) } // SendResponse @@ -51,7 +67,9 @@ func (suite *OcppJTestSuite) TestCentralSystemSendConfirmation() { t := suite.T() mockChargePointId := "0101" mockUniqueId := "1234" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(nil) + suite.centralSystem.Start(8887, "/{ws}") mockConfirmation := newMockConfirmation("mockValue") err := suite.centralSystem.SendResponse(mockChargePointId, mockUniqueId, mockConfirmation) assert.Nil(t, err) @@ -61,7 +79,9 @@ func (suite *OcppJTestSuite) TestCentralSystemSendInvalidConfirmation() { t := suite.T() mockChargePointId := "0101" mockUniqueId := "6789" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(nil) + suite.centralSystem.Start(8887, "/{ws}") mockConfirmation := newMockConfirmation("") // This is allowed. Endpoint doesn't keep track of incoming requests, but only outgoing ones err := suite.centralSystem.SendResponse(mockChargePointId, mockUniqueId, mockConfirmation) @@ -72,7 +92,9 @@ func (suite *OcppJTestSuite) TestCentralSystemSendConfirmationFailed() { t := suite.T() mockChargePointId := "0101" mockUniqueId := "1234" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(errors.New("networkError")) + suite.centralSystem.Start(8887, "/{ws}") mockConfirmation := newMockConfirmation("mockValue") err := suite.centralSystem.SendResponse(mockChargePointId, mockUniqueId, mockConfirmation) assert.NotNil(t, err) @@ -85,7 +107,9 @@ func (suite *OcppJTestSuite) TestCentralSystemSendError() { mockChargePointId := "0101" mockUniqueId := "1234" mockDescription := "mockDescription" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(nil) + suite.centralSystem.Start(8887, "/{ws}") err := suite.centralSystem.SendError(mockChargePointId, mockUniqueId, ocppj.GenericError, mockDescription, nil) assert.Nil(t, err) } @@ -95,7 +119,9 @@ func (suite *OcppJTestSuite) TestCentralSystemSendInvalidError() { mockChargePointId := "0101" mockUniqueId := "6789" mockDescription := "mockDescription" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(nil) + suite.centralSystem.Start(8887, "/{ws}") err := suite.centralSystem.SendError(mockChargePointId, mockUniqueId, "InvalidErrorCode", mockDescription, nil) assert.NotNil(t, err) } @@ -104,7 +130,9 @@ func (suite *OcppJTestSuite) TestCentralSystemSendErrorFailed() { t := suite.T() mockChargePointId := "0101" mockUniqueId := "1234" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(errors.New("networkError")) + suite.centralSystem.Start(8887, "/{ws}") mockConfirmation := newMockConfirmation("mockValue") err := suite.centralSystem.SendResponse(mockChargePointId, mockUniqueId, mockConfirmation) assert.NotNil(t, err) @@ -144,14 +172,15 @@ func (suite *OcppJTestSuite) TestCentralSystemConfirmationHandler() { assert.Equal(t, mockUniqueId, requestId) assert.NotNil(t, confirmation) }) - suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return().Run(func(args mock.Arguments) { - // Simulate charge point message - channel := NewMockWebSocket(mockChargePointId) - err := suite.mockServer.MessageHandler(channel, []byte(mockConfirmation)) - assert.Nil(t, err) - }) - suite.centralSystem.AddPendingRequest(mockUniqueId, mockRequest) + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) + // Start central system suite.centralSystem.Start(8887, "somePath") + // Set mocked request in queue and mark as pending + addMockPendingRequest(suite, mockRequest, mockUniqueId, mockChargePointId) + // Simulate charge point message + channel := NewMockWebSocket(mockChargePointId) + err := suite.mockServer.MessageHandler(channel, []byte(mockConfirmation)) + assert.Nil(t, err) } func (suite *OcppJTestSuite) TestCentralSystemErrorHandler() { @@ -163,7 +192,6 @@ func (suite *OcppJTestSuite) TestCentralSystemErrorHandler() { mockValue := "someValue" mockErrorDetails := make(map[string]interface{}) mockErrorDetails["details"] = "someValue" - mockRequest := newMockRequest("testValue") mockError := fmt.Sprintf(`[4,"%v","%v","%v",{"details":"%v"}]`, mockUniqueId, mockErrorCode, mockErrorDescription, mockValue) suite.centralSystem.SetErrorHandler(func(chargePointId string, err *ocpp.Error, details interface{}) { @@ -173,12 +201,248 @@ func (suite *OcppJTestSuite) TestCentralSystemErrorHandler() { assert.Equal(t, mockErrorDescription, err.Description) assert.Equal(t, mockErrorDetails, details) }) - suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return().Run(func(args mock.Arguments) { - // Simulate charge point message - channel := NewMockWebSocket(mockChargePointId) - err := suite.mockServer.MessageHandler(channel, []byte(mockError)) - assert.Nil(t, err) - }) - suite.centralSystem.AddPendingRequest(mockUniqueId, mockRequest) + suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(nil) + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) + // Start central system suite.centralSystem.Start(8887, "somePath") + // Set mocked request in queue and mark as pending + addMockPendingRequest(suite, mockRequest, mockUniqueId, mockChargePointId) + // Simulate charge point message + channel := NewMockWebSocket(mockChargePointId) + err := suite.mockServer.MessageHandler(channel, []byte(mockError)) + assert.Nil(t, err) +} + +func addMockPendingRequest(suite *OcppJTestSuite, mockRequest ocpp.Request, mockUniqueID string, mockChargePointID string) { + mockCall, _ := suite.centralSystem.CreateCall(mockRequest) + mockCall.UniqueId = mockUniqueID + jsonMessage, _ := mockCall.MarshalJSON() + requestBundle := ocppj.RequestBundle{ + Call: mockCall, + Data: jsonMessage, + } + q := suite.serverRequestMap.GetOrCreate(mockChargePointID) + _ = q.Push(requestBundle) + suite.centralSystem.AddPendingRequest(mockUniqueID, mockRequest) +} + +// ----------------- Queue processing tests ----------------- + +func (suite *OcppJTestSuite) TestServerEnqueueRequest() { + t := suite.T() + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) + suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(nil) + // Start normally + suite.centralSystem.Start(8887, "/{ws}") + req := newMockRequest("somevalue") + mockChargePointId := "1234" + err := suite.centralSystem.SendRequest(mockChargePointId, req) + require.Nil(t, err) + time.Sleep(500 * time.Millisecond) + // Message was sent, but element should still be in queue + q, ok := suite.serverRequestMap.Get(mockChargePointId) + require.True(t, ok) + assert.False(t, q.IsEmpty()) + assert.Equal(t, 1, q.Size()) + // Analyze enqueued bundle + peeked := q.Peek() + require.NotNil(t, peeked) + bundle, ok := peeked.(ocppj.RequestBundle) + require.True(t, ok) + require.NotNil(t, bundle) + assert.Equal(t, req.GetFeatureName(), bundle.Call.Action) + marshaled, err := bundle.Call.MarshalJSON() + require.Nil(t, err) + assert.Equal(t, marshaled, bundle.Data) +} + +func (suite *OcppJTestSuite) TestEnqueueMultipleRequests() { + t := suite.T() + messagesToQueue := 5 + sentMessages := 0 + mockChargePointId := "1234" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) + suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Run(func(args mock.Arguments) { + sentMessages += 1 + }).Return(nil) + // Start normally + suite.centralSystem.Start(8887, "/{ws}") + for i := 0; i < messagesToQueue; i++ { + req := newMockRequest(fmt.Sprintf("request-%v", i)) + err := suite.centralSystem.SendRequest(mockChargePointId, req) + require.Nil(t, err) + } + time.Sleep(500 * time.Millisecond) + // Only one message was sent, but all elements should still be in queue + assert.Equal(t, 1, sentMessages) + q, ok := suite.serverRequestMap.Get(mockChargePointId) + require.True(t, ok) + assert.False(t, q.IsEmpty()) + assert.Equal(t, messagesToQueue, q.Size()) + // Analyze enqueued bundle + var i = 0 + for !q.IsEmpty() { + popped := q.Pop() + require.NotNil(t, popped) + bundle, ok := popped.(ocppj.RequestBundle) + require.True(t, ok) + require.NotNil(t, bundle) + assert.Equal(t, MockFeatureName, bundle.Call.Action) + i++ + } + assert.Equal(t, messagesToQueue, i) +} + +func (suite *OcppJTestSuite) TestRequestQueueFull() { + t := suite.T() + messagesToQueue := queueCapacity + mockChargePointId := "1234" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) + suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(nil) + // Start normally + suite.centralSystem.Start(8887, "/{ws}") + for i := 0; i < messagesToQueue; i++ { + req := newMockRequest(fmt.Sprintf("request-%v", i)) + err := suite.centralSystem.SendRequest(mockChargePointId, req) + require.Nil(t, err) + } + // Queue is now full. Trying to send an additional message should throw an error + req := newMockRequest("full") + err := suite.centralSystem.SendRequest(mockChargePointId, req) + require.NotNil(t, err) + assert.Equal(t, fmt.Sprintf("request queue for client %v is full, cannot send new request", mockChargePointId), err.Error()) +} + +func (suite *OcppJTestSuite) TestParallelRequests() { + t := suite.T() + messagesToQueue := 10 + sentMessages := 0 + mockChargePointId := "1234" + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) + suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Run(func(args mock.Arguments) { + sentMessages += 1 + }).Return(nil) + // Start normally + suite.centralSystem.Start(8887, "/{ws}") + for i := 0; i < messagesToQueue; i++ { + go func() { + req := newMockRequest(fmt.Sprintf("someReq")) + err := suite.centralSystem.SendRequest(mockChargePointId, req) + require.Nil(t, err) + }() + } + time.Sleep(1000 * time.Millisecond) + // Only one message was sent, but all elements should still be in queue + q, ok := suite.serverRequestMap.Get(mockChargePointId) + require.True(t, ok) + assert.False(t, q.IsEmpty()) + assert.Equal(t, messagesToQueue, q.Size()) + assert.Equal(t, 1, sentMessages) +} + +// TestRequestFlow tests a typical flow with multiple request-responses, sent to different clients. +// +// Requests are sent concurrently and a response to each message is sent from the mocked client endpoint. +// Both CallResult and CallError messages are returned to test all message types. +func (suite *OcppJTestSuite) TestServerRequestFlow() { + t := suite.T() + var mutex sync.Mutex + messagesToQueue := 10 + processedMessages := 0 + mockChargePoint1 := "cp1" + mockChargePoint2 := "cp2" + mockChargePoints := map[string]ws.Channel{ + mockChargePoint1: NewMockWebSocket(mockChargePoint1), + mockChargePoint2: NewMockWebSocket(mockChargePoint2), + } + type triggerData struct { + clientID string + call *ocppj.Call + } + sendResponseTrigger := make(chan triggerData, 1) + suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil) + suite.mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Run(func(args mock.Arguments) { + wsID := args.String(0) + data := args.Get(1).([]byte) + call := ParseCall(&suite.centralSystem.Endpoint, string(data), t) + require.NotNil(t, call) + sendResponseTrigger <- triggerData{clientID: wsID, call: call} + }).Return(nil) + // Mocked response generator + var wg sync.WaitGroup + wg.Add(messagesToQueue * 2) + go func() { + for { + d, ok := <-sendResponseTrigger + if !ok { + // Test completed, quitting + return + } + // Get original request to generate meaningful response + call := d.call + q, ok := suite.serverRequestMap.Get(d.clientID) + require.True(t, ok) + assert.False(t, q.IsEmpty()) + peeked := q.Peek() + bundle, _ := peeked.(ocppj.RequestBundle) + require.NotNil(t, bundle) + assert.Equal(t, call.UniqueId, bundle.Call.UniqueId) + req, _ := call.Payload.(*MockRequest) + // Send response back to server + var data []byte + var err error + v, _ := strconv.Atoi(req.MockValue) + if v%2 == 0 { + // Send CallResult + resp := newMockConfirmation("someResp") + res, err := suite.centralSystem.CreateCallResult(resp, call.GetUniqueId()) + require.Nil(t, err) + data, err = res.MarshalJSON() + require.Nil(t, err) + } else { + // Send CallError + res := suite.centralSystem.CreateCallError(call.GetUniqueId(), ocppj.GenericError, fmt.Sprintf("error-%v", req.MockValue), nil) + data, err = res.MarshalJSON() + require.Nil(t, err) + } + fmt.Printf("sending mocked response to message %v\n", call.GetUniqueId()) + wsChannel := mockChargePoints[d.clientID] + err = suite.mockServer.MessageHandler(wsChannel, data) // Triggers ocppMessageHandler + require.Nil(t, err) + // Make sure the top queue element was popped + mutex.Lock() + processedMessages += 1 + peeked = q.Peek() + if peeked != nil { + bundle, _ := peeked.(ocppj.RequestBundle) + require.NotNil(t, bundle) + assert.NotEqual(t, call.UniqueId, bundle.Call.UniqueId) + } + mutex.Unlock() + wg.Done() + } + }() + // Start server normally + suite.centralSystem.Start(8887, "/{ws}") + for i := 0; i < messagesToQueue*2; i++ { + // Select a source client + var chargePointTarget string + if i%2 == 0 { + chargePointTarget = mockChargePoint1 + } else { + chargePointTarget = mockChargePoint2 + } + go func(j int, clientID string) { + req := newMockRequest(fmt.Sprintf("%v", j)) + err := suite.centralSystem.SendRequest(clientID, req) + require.Nil(t, err) + }(i, chargePointTarget) + } + // Wait for processing to complete + wg.Wait() + close(sendResponseTrigger) + q, _ := suite.serverRequestMap.Get(mockChargePoint1) + assert.True(t, q.IsEmpty()) + q, _ = suite.serverRequestMap.Get(mockChargePoint2) + assert.True(t, q.IsEmpty()) } diff --git a/ocppj/charge_point_test.go b/ocppj/charge_point_test.go index 77dfa1f5..18954e4c 100644 --- a/ocppj/charge_point_test.go +++ b/ocppj/charge_point_test.go @@ -7,9 +7,14 @@ import ( "github.com/lorenzodonini/ocpp-go/ocppj" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "strconv" + "sync" + "time" ) -// Start +// ----------------- Start tests ----------------- + func (suite *OcppJTestSuite) TestChargePointStart() { suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) err := suite.chargePoint.Start("someUrl") @@ -22,9 +27,22 @@ func (suite *OcppJTestSuite) TestChargePointStartFailed() { assert.NotNil(suite.T(), err) } -// SendRequest +func (suite *OcppJTestSuite) TestNotStartedError() { + t := suite.T() + // Start normally + req := newMockRequest("somevalue") + err := suite.chargePoint.SendRequest(req) + require.NotNil(t, err) + assert.Equal(t, "ocppj client is not started, couldn't send request", err.Error()) + require.True(t, suite.clientRequestQueue.IsEmpty()) +} + +// ----------------- SendRequest tests ----------------- + func (suite *OcppJTestSuite) TestChargePointSendRequest() { suite.mockClient.On("Write", mock.Anything).Return(nil) + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + _ = suite.chargePoint.Start("someUrl") mockRequest := newMockRequest("mockValue") err := suite.chargePoint.SendRequest(mockRequest) assert.Nil(suite.T(), err) @@ -32,35 +50,46 @@ func (suite *OcppJTestSuite) TestChargePointSendRequest() { func (suite *OcppJTestSuite) TestChargePointSendInvalidRequest() { suite.mockClient.On("Write", mock.Anything).Return(nil) + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + _ = suite.chargePoint.Start("someUrl") mockRequest := newMockRequest("") err := suite.chargePoint.SendRequest(mockRequest) assert.NotNil(suite.T(), err) } -func (suite *OcppJTestSuite) TestChargePointSendRequestPending() { - suite.mockClient.On("Write", mock.Anything).Return(nil) - mockRequest := newMockRequest("mockValue") - err := suite.chargePoint.SendRequest(mockRequest) - assert.Nil(suite.T(), err) - err = suite.chargePoint.SendRequest(mockRequest) - assert.NotNil(suite.T(), err) -} - func (suite *OcppJTestSuite) TestChargePointSendRequestFailed() { - suite.mockClient.On("Write", mock.Anything).Return(errors.New("networkError")) + t := suite.T() + var callID string + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + suite.mockClient.On("Write", mock.Anything).Return(errors.New("networkError")).Run(func(args mock.Arguments) { + require.False(t, suite.clientRequestQueue.IsEmpty()) + req := suite.clientRequestQueue.Peek().(ocppj.RequestBundle) + callID = req.Call.GetUniqueId() + _, ok := suite.chargePoint.GetPendingRequest(callID) + // Before anything is returned, the request must still be pending + assert.True(t, ok) + }) + _ = suite.chargePoint.Start("someUrl") mockRequest := newMockRequest("mockValue") err := suite.chargePoint.SendRequest(mockRequest) - assert.NotNil(suite.T(), err) - assert.Equal(suite.T(), "networkError", err.Error()) - //TODO: assert that pending request was removed + //TODO: currently the network error is not returned by SendRequest, but is only generated internally + assert.Nil(t, err) + // Assert that pending request was removed + time.Sleep(500 * time.Millisecond) + _, ok := suite.chargePoint.GetPendingRequest(callID) + assert.False(t, ok) } -// SendResponse +// ----------------- SendResponse tests ----------------- + func (suite *OcppJTestSuite) TestChargePointSendConfirmation() { t := suite.T() mockUniqueId := "1234" suite.mockClient.On("Write", mock.Anything).Return(nil) + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + _ = suite.chargePoint.Start("someUrl") mockConfirmation := newMockConfirmation("mockValue") + // This is allowed. Endpoint doesn't keep track of incoming requests, but only outgoing ones err := suite.chargePoint.SendResponse(mockUniqueId, mockConfirmation) assert.Nil(t, err) } @@ -69,6 +98,8 @@ func (suite *OcppJTestSuite) TestChargePointSendInvalidConfirmation() { t := suite.T() mockUniqueId := "6789" suite.mockClient.On("Write", mock.Anything).Return(nil) + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + _ = suite.chargePoint.Start("someUrl") mockConfirmation := newMockConfirmation("") // This is allowed. Endpoint doesn't keep track of incoming requests, but only outgoing ones err := suite.chargePoint.SendResponse(mockUniqueId, mockConfirmation) @@ -79,13 +110,16 @@ func (suite *OcppJTestSuite) TestChargePointSendConfirmationFailed() { t := suite.T() mockUniqueId := "1234" suite.mockClient.On("Write", mock.Anything).Return(errors.New("networkError")) + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + _ = suite.chargePoint.Start("someUrl") mockConfirmation := newMockConfirmation("mockValue") err := suite.chargePoint.SendResponse(mockUniqueId, mockConfirmation) assert.NotNil(t, err) assert.Equal(t, "networkError", err.Error()) } -// SendError +// ----------------- SendError tests ----------------- + func (suite *OcppJTestSuite) TestChargePointSendError() { t := suite.T() mockUniqueId := "1234" @@ -114,7 +148,8 @@ func (suite *OcppJTestSuite) TestChargePointSendErrorFailed() { assert.Equal(t, "networkError", err.Error()) } -// Call Handlers +// ----------------- Call Handlers tests ----------------- + func (suite *OcppJTestSuite) TestChargePointCallHandler() { t := suite.T() mockUniqueId := "5678" @@ -144,14 +179,13 @@ func (suite *OcppJTestSuite) TestChargePointCallResultHandler() { assert.Equal(t, mockUniqueId, requestId) assert.NotNil(t, confirmation) }) - suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil).Run(func(args mock.Arguments) { - // Simulate central system message - err := suite.mockClient.MessageHandler([]byte(mockConfirmation)) - assert.Nil(t, err) - }) - suite.chargePoint.AddPendingRequest(mockUniqueId, mockRequest) + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + suite.chargePoint.AddPendingRequest(mockUniqueId, mockRequest) // Manually add a pending request, so that response is not rejected err := suite.chargePoint.Start("somePath") assert.Nil(t, err) + // Simulate central system message + err = suite.mockClient.MessageHandler([]byte(mockConfirmation)) + assert.Nil(t, err) } func (suite *OcppJTestSuite) TestChargePointCallErrorHandler() { @@ -171,12 +205,202 @@ func (suite *OcppJTestSuite) TestChargePointCallErrorHandler() { assert.Equal(t, mockErrorDescription, err.Description) assert.Equal(t, mockErrorDetails, details) }) - suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil).Run(func(args mock.Arguments) { - // Simulate central system message - err := suite.mockClient.MessageHandler([]byte(mockError)) - assert.Nil(t, err) - }) - suite.chargePoint.AddPendingRequest(mockUniqueId, mockRequest) + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + suite.chargePoint.AddPendingRequest(mockUniqueId, mockRequest) // Manually add a pending request, so that response is not rejected err := suite.chargePoint.Start("someUrl") assert.Nil(t, err) + // Simulate central system message + err = suite.mockClient.MessageHandler([]byte(mockError)) + assert.Nil(t, err) +} + +// ----------------- Queue processing tests ----------------- + +func (suite *OcppJTestSuite) TestClientEnqueueRequest() { + t := suite.T() + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + suite.mockClient.On("Write", mock.Anything).Return(nil) + // Start normally + err := suite.chargePoint.Start("someUrl") + require.Nil(t, err) + req := newMockRequest("somevalue") + err = suite.chargePoint.SendRequest(req) + require.Nil(t, err) + time.Sleep(500 * time.Millisecond) + // Message was sent, but element should still be in queue + require.False(t, suite.clientRequestQueue.IsEmpty()) + assert.Equal(t, 1, suite.clientRequestQueue.Size()) + // Analyze enqueued bundle + peeked := suite.clientRequestQueue.Peek() + require.NotNil(t, peeked) + bundle, ok := peeked.(ocppj.RequestBundle) + require.True(t, ok) + require.NotNil(t, bundle) + assert.Equal(t, req.GetFeatureName(), bundle.Call.Action) + marshaled, err := bundle.Call.MarshalJSON() + require.Nil(t, err) + assert.Equal(t, marshaled, bundle.Data) +} + +func (suite *OcppJTestSuite) TestClientEnqueueMultipleRequests() { + t := suite.T() + messagesToQueue := 5 + sentMessages := 0 + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + suite.mockClient.On("Write", mock.Anything).Run(func(args mock.Arguments) { + sentMessages += 1 + }).Return(nil) + // Start normally + err := suite.chargePoint.Start("someUrl") + require.Nil(t, err) + for i := 0; i < messagesToQueue; i++ { + req := newMockRequest(fmt.Sprintf("request-%v", i)) + err = suite.chargePoint.SendRequest(req) + require.Nil(t, err) + } + time.Sleep(500 * time.Millisecond) + // Only one message was sent, but all elements should still be in queue + assert.Equal(t, 1, sentMessages) + require.False(t, suite.clientRequestQueue.IsEmpty()) + assert.Equal(t, messagesToQueue, suite.clientRequestQueue.Size()) + // Analyze enqueued bundle + var i = 0 + for !suite.clientRequestQueue.IsEmpty() { + popped := suite.clientRequestQueue.Pop() + require.NotNil(t, popped) + bundle, ok := popped.(ocppj.RequestBundle) + require.True(t, ok) + require.NotNil(t, bundle) + assert.Equal(t, MockFeatureName, bundle.Call.Action) + i++ + } + assert.Equal(t, messagesToQueue, i) } + +func (suite *OcppJTestSuite) TestClientRequestQueueFull() { + t := suite.T() + messagesToQueue := queueCapacity + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + suite.mockClient.On("Write", mock.Anything).Return(nil) + // Start normally + err := suite.chargePoint.Start("someUrl") + require.Nil(t, err) + for i := 0; i < messagesToQueue; i++ { + req := newMockRequest(fmt.Sprintf("request-%v", i)) + err = suite.chargePoint.SendRequest(req) + require.Nil(t, err) + } + // Queue is now full. Trying to send an additional message should throw an error + req := newMockRequest("full") + err = suite.chargePoint.SendRequest(req) + require.NotNil(t, err) + assert.Equal(t, "request queue is full, cannot push new element", err.Error()) +} + +func (suite *OcppJTestSuite) TestClientParallelRequests() { + t := suite.T() + messagesToQueue := 10 + sentMessages := 0 + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + suite.mockClient.On("Write", mock.Anything).Run(func(args mock.Arguments) { + sentMessages += 1 + }).Return(nil) + // Start normally + err := suite.chargePoint.Start("someUrl") + require.Nil(t, err) + for i := 0; i < messagesToQueue; i++ { + go func() { + req := newMockRequest(fmt.Sprintf("someReq")) + err = suite.chargePoint.SendRequest(req) + require.Nil(t, err) + }() + } + time.Sleep(1000 * time.Millisecond) + // Only one message was sent, but all element should still be in queue + require.False(t, suite.clientRequestQueue.IsEmpty()) + assert.Equal(t, messagesToQueue, suite.clientRequestQueue.Size()) +} + +// TestClientRequestFlow tests a typical flow with multiple request-responses. +// +// Requests are sent concurrently and a response to each message is sent from the mocked server endpoint. +// Both CallResult and CallError messages are returned to test all message types. +func (suite *OcppJTestSuite) TestClientRequestFlow() { + t := suite.T() + var mutex sync.Mutex + messagesToQueue := 10 + processedMessages := 0 + sendResponseTrigger := make(chan *ocppj.Call, 1) + suite.mockClient.On("Start", mock.AnythingOfType("string")).Return(nil) + suite.mockClient.On("Write", mock.Anything).Run(func(args mock.Arguments) { + data := args.Get(0).([]byte) + call := ParseCall(&suite.chargePoint.Endpoint, string(data), t) + require.NotNil(t, call) + sendResponseTrigger <- call + }).Return(nil) + // Mocked response generator + var wg sync.WaitGroup + wg.Add(messagesToQueue) + go func() { + for { + call, ok := <-sendResponseTrigger + if !ok { + // Test completed, quitting + return + } + // Get original request to generate meaningful response + peeked := suite.clientRequestQueue.Peek() + bundle, _ := peeked.(ocppj.RequestBundle) + require.NotNil(t, bundle) + assert.Equal(t, call.UniqueId, bundle.Call.UniqueId) + req, _ := call.Payload.(*MockRequest) + // Send response back to client + var data []byte + var err error + v, _ := strconv.Atoi(req.MockValue) + if v%2 == 0 { + // Send CallResult + resp := newMockConfirmation("someResp") + res, err := suite.chargePoint.CreateCallResult(resp, call.GetUniqueId()) + require.Nil(t, err) + data, err = res.MarshalJSON() + require.Nil(t, err) + } else { + // Send CallError + res := suite.chargePoint.CreateCallError(call.GetUniqueId(), ocppj.GenericError, fmt.Sprintf("error-%v", req.MockValue), nil) + data, err = res.MarshalJSON() + require.Nil(t, err) + } + fmt.Printf("sending mocked response to message %v\n", call.GetUniqueId()) + err = suite.mockClient.MessageHandler(data) // Triggers ocppMessageHandler + require.Nil(t, err) + // Make sure the top queue element was popped + mutex.Lock() + processedMessages += 1 + peeked = suite.clientRequestQueue.Peek() + if peeked != nil { + bundle, _ := peeked.(ocppj.RequestBundle) + require.NotNil(t, bundle) + assert.NotEqual(t, call.UniqueId, bundle.Call.UniqueId) + } + mutex.Unlock() + wg.Done() + } + }() + // Start client normally + err := suite.chargePoint.Start("someUrl") + require.Nil(t, err) + for i := 0; i < messagesToQueue; i++ { + go func(j int) { + req := newMockRequest(fmt.Sprintf("%v", j)) + err = suite.chargePoint.SendRequest(req) + require.Nil(t, err) + }(i) + } + // Wait for processing to complete + wg.Wait() + close(sendResponseTrigger) + assert.True(t, suite.clientRequestQueue.IsEmpty()) +} + +//TODO: test retransmission diff --git a/ocppj/client.go b/ocppj/client.go index 05fe4426..b63e38ba 100644 --- a/ocppj/client.go +++ b/ocppj/client.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/lorenzodonini/ocpp-go/ocpp" "github.com/lorenzodonini/ocpp-go/ws" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -12,25 +11,27 @@ import ( // During message exchange, the two roles may be reversed (depending on the message direction), but a client struct remains associated to a charge point/charging station. type Client struct { Endpoint - client ws.WsClient - Id string - requestHandler func(request ocpp.Request, requestId string, action string) - responseHandler func(response ocpp.Response, requestId string) - errorHandler func(err *ocpp.Error, details interface{}) - hasPendingRequest bool + client ws.WsClient + Id string + requestHandler func(request ocpp.Request, requestId string, action string) + responseHandler func(response ocpp.Response, requestId string) + errorHandler func(err *ocpp.Error, details interface{}) + requestQueue RequestQueue + requestChannel chan bool + readyForDispatch chan bool } // Creates a new Client endpoint. -// Requires a unique client ID, a websocket client and a list of profiles (optional). -func NewClient(id string, wsClient ws.WsClient, profiles ...*ocpp.Profile) *Client { +// Requires a unique client ID, a websocket client, a request queue struct and a list of supported profiles (optional). +func NewClient(id string, wsClient ws.WsClient, requestQueue RequestQueue, profiles ...*ocpp.Profile) *Client { endpoint := Endpoint{pendingRequests: map[string]ocpp.Request{}} for _, profile := range profiles { endpoint.AddProfile(profile) } if wsClient != nil { - return &Client{Endpoint: endpoint, client: wsClient, Id: id, hasPendingRequest: false} + return &Client{Endpoint: endpoint, client: wsClient, Id: id, requestQueue: requestQueue, readyForDispatch: make(chan bool)} } else { - return &Client{Endpoint: endpoint, client: ws.NewClient(), Id: id, hasPendingRequest: false} + return &Client{Endpoint: endpoint, client: ws.NewClient(), Id: id, requestQueue: requestQueue, readyForDispatch: make(chan bool)} } } @@ -63,7 +64,12 @@ func (c *Client) Start(serverURL string) error { c.client.SetMessageHandler(c.ocppMessageHandler) // Connect & run fullUrl := fmt.Sprintf("%v/%v", serverURL, c.Id) - return c.client.Start(fullUrl) + err := c.client.Start(fullUrl) + if err == nil { + c.requestChannel = make(chan bool, 1) + go c.requestPump() + } + return err } // Stops the client. @@ -71,45 +77,110 @@ func (c *Client) Start(serverURL string) error { func (c *Client) Stop() { c.client.Stop() c.clearPendingRequests() - c.hasPendingRequest = false + close(c.requestChannel) } // Sends an OCPP Request to the server. +// The protocol is based on request-response and cannot send multiple messages concurrently. +// To guarantee this, outgoing messages are added to a queue and processed sequentially. // // Returns an error in the following cases: // -// - message validation fails (request is malformed) +// - the client wasn't started // -// - another request is already pending +// - message validation fails (request is malformed) // // - the endpoint doesn't support the feature // -// - a network error occurred +// - the output queue is full func (c *Client) SendRequest(request ocpp.Request) error { + if c.requestChannel == nil { + return fmt.Errorf("ocppj client is not started, couldn't send request") + } err := Validate.Struct(request) if err != nil { return err } - if c.hasPendingRequest { - // Cannot send. Protocol is based on request-response - return errors.Errorf("There already is a pending request. Cannot send a further one before receiving a response first") - } - call, err := c.CreateCall(request.(ocpp.Request)) + call, err := c.CreateCall(request) // This doesn't add a pending request yet if err != nil { return err } jsonMessage, err := call.MarshalJSON() if err != nil { + // TODO: delete pending request on endpoint struct return err } - c.hasPendingRequest = true - err = c.client.Write([]byte(jsonMessage)) + // Will not send right away. Queuing message and let it be processed by dedicated requestPump routine + if err := c.requestQueue.Push(RequestBundle{Call: call, Data: jsonMessage}); err != nil { + log.Errorf("request %v - %v: %v", call.UniqueId, call.Action, err) + return err + } + log.Debugf("enqueued request %v - %v", call.UniqueId, call.Action) + c.requestChannel <- true + return nil +} + +// requestPump processes new outgoing requests and makes sure they are enqueued correctly. +// This method is executed by a dedicated coroutine as soon as the client is started and runs indefinitely. +func (c *Client) requestPump() { + rdy := true // Ready to transmit at the beginning + for { + select { + case _, ok := <-c.requestChannel: + // Enqueue new request + if !ok { + log.Infof("stopped processing requests") + c.requestQueue.Init() + c.requestChannel = nil + return + } + case rdy = <-c.readyForDispatch: + } + // Only dispatch request if able to send and request queue isn't empty + if rdy && !c.requestQueue.IsEmpty() { + c.dispatchNextRequest() + rdy = false + } + } +} + +func (c *Client) dispatchNextRequest() { + // Get first element in queue + el := c.requestQueue.Peek() + bundle, _ := el.(RequestBundle) + jsonMessage := bundle.Data + c.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload) + + err := c.client.Write(jsonMessage) if err != nil { - // Clear pending request - c.DeletePendingRequest(call.GetUniqueId()) - c.hasPendingRequest = false + log.Errorf("error while sending message: %v", err) + //TODO: handle retransmission instead of removing pending request + c.DeletePendingRequest(bundle.Call.GetUniqueId()) + c.completePendingRequest(bundle.Call.GetUniqueId()) + if c.errorHandler != nil { + c.errorHandler(ocpp.NewError(GenericError, err.Error(), bundle.Call.GetUniqueId()), err) + } + } else { + // Transmitted correctly + log.Debugf("sent request %v: %v", bundle.Call.UniqueId, string(jsonMessage)) } - return err +} + +func (c *Client) completePendingRequest(requestId string) { + el := c.requestQueue.Peek() + if el == nil { + log.Errorf("attempting to pop front of queue, but queue is empty") + return + } + bundle, _ := el.(RequestBundle) + if bundle.Call.UniqueId != requestId { + log.Fatalf("internal state mismatch: received response for %v but expected response for %v", requestId, bundle.Call.UniqueId) + return + } + c.requestQueue.Pop() + log.Debugf("removed request %v from front of queue", bundle.Call.UniqueId) + // Signal that next message in queue may be sent + c.readyForDispatch <- true } // Sends an OCPP Response to the server. @@ -178,12 +249,18 @@ func (c *Client) ocppMessageHandler(data []byte) error { c.requestHandler(call.Payload, call.UniqueId, call.Action) case CALL_RESULT: callResult := message.(*CallResult) - c.hasPendingRequest = false - c.responseHandler(callResult.Payload, callResult.UniqueId) + c.DeletePendingRequest(callResult.GetUniqueId()) + c.completePendingRequest(callResult.UniqueId) // Remove current request from queue and send next one + if c.responseHandler != nil { + c.responseHandler(callResult.Payload, callResult.UniqueId) + } case CALL_ERROR: - callError := message.(*CallError) - c.hasPendingRequest = false - c.errorHandler(ocpp.NewError(callError.ErrorCode, callError.ErrorDescription, callError.UniqueId), callError.ErrorDetails) + callError := message.(*CallError) // Remove current request from queue and send next one + c.DeletePendingRequest(callError.GetUniqueId()) + c.completePendingRequest(callError.UniqueId) + if c.errorHandler != nil { + c.errorHandler(ocpp.NewError(callError.ErrorCode, callError.ErrorDescription, callError.UniqueId), callError.ErrorDetails) + } } return nil } diff --git a/ocppj/ocppj.go b/ocppj/ocppj.go index 2c593ca0..5c192670 100644 --- a/ocppj/ocppj.go +++ b/ocppj/ocppj.go @@ -10,6 +10,7 @@ import ( "gopkg.in/go-playground/validator.v9" "math/rand" "reflect" + "sync" ) // The validator, used for validating incoming/outgoing OCPP messages. @@ -225,6 +226,7 @@ func errorFromValidation(validationErrors validator.ValidationErrors, messageId type Endpoint struct { Profiles []*ocpp.Profile pendingRequests map[string]ocpp.Request + mutex sync.Mutex } // Adds support for a new profile on the endpoint. @@ -257,22 +259,30 @@ func (endpoint *Endpoint) GetProfileForFeature(featureName string) (*ocpp.Profil // Sets a Request as pending on the endpoint. Requests are considered pending until a response was received. // The function expects a message unique ID and the Request. func (endpoint *Endpoint) AddPendingRequest(id string, request ocpp.Request) { + endpoint.mutex.Lock() + defer endpoint.mutex.Unlock() endpoint.pendingRequests[id] = request } // Retrieves a pending Request, using the message ID. // If no request for the passed message ID is found, a false flag is returned. func (endpoint *Endpoint) GetPendingRequest(id string) (ocpp.Request, bool) { + endpoint.mutex.Lock() + defer endpoint.mutex.Unlock() request, ok := endpoint.pendingRequests[id] return request, ok } // Deletes a pending Request from the endpoint, using the message ID. func (endpoint *Endpoint) DeletePendingRequest(id string) { + endpoint.mutex.Lock() + defer endpoint.mutex.Unlock() delete(endpoint.pendingRequests, id) } func (endpoint *Endpoint) clearPendingRequests() { + endpoint.mutex.Lock() + defer endpoint.mutex.Unlock() endpoint.pendingRequests = map[string]ocpp.Request{} } @@ -347,7 +357,7 @@ func (endpoint *Endpoint) ParseMessage(arr []interface{}) (Message, *ocpp.Error) } return &call, nil } else if typeId == CALL_RESULT { - request, ok := endpoint.pendingRequests[uniqueId] + request, ok := endpoint.GetPendingRequest(uniqueId) if !ok { log.Printf("No previous request %v sent. Discarding response message", uniqueId) return nil, nil @@ -362,13 +372,17 @@ func (endpoint *Endpoint) ParseMessage(arr []interface{}) (Message, *ocpp.Error) UniqueId: uniqueId, Payload: confirmation, } - endpoint.DeletePendingRequest(callResult.GetUniqueId()) err = Validate.Struct(callResult) if err != nil { return nil, errorFromValidation(err.(validator.ValidationErrors), uniqueId) } return &callResult, nil } else if typeId == CALL_ERROR { + _, ok := endpoint.GetPendingRequest(uniqueId) + if !ok { + log.Printf("No previous request %v sent. Discarding error message", uniqueId) + return nil, nil + } if len(arr) < 4 { return nil, ocpp.NewError(FormationViolation, "Invalid Call Error message. Expected array length >= 4", uniqueId) } @@ -385,7 +399,6 @@ func (endpoint *Endpoint) ParseMessage(arr []interface{}) (Message, *ocpp.Error) ErrorDescription: arr[3].(string), ErrorDetails: details, } - endpoint.DeletePendingRequest(callError.GetUniqueId()) err := Validate.Struct(callError) if err != nil { return nil, errorFromValidation(err.(validator.ValidationErrors), uniqueId) @@ -398,12 +411,15 @@ func (endpoint *Endpoint) ParseMessage(arr []interface{}) (Message, *ocpp.Error) // Creates a Call message, given an OCPP request. A unique ID for the message is automatically generated. // Returns an error in case the request's feature is not supported on this endpoint. +// +// The created call is not automatically scheduled for transmission and is not added to the list of pending requests. func (endpoint *Endpoint) CreateCall(request ocpp.Request) (*Call, error) { action := request.GetFeatureName() profile, _ := endpoint.GetProfileForFeature(action) if profile == nil { return nil, errors2.Errorf("Couldn't create Call for unsupported action %v", action) } + // TODO: handle collisions? uniqueId := messageIdGenerator() call := Call{ MessageTypeId: CALL, @@ -415,11 +431,11 @@ func (endpoint *Endpoint) CreateCall(request ocpp.Request) (*Call, error) { if err != nil { return nil, err } - endpoint.AddPendingRequest(uniqueId, request) return &call, nil } // Creates a CallResult message, given an OCPP response and the message's unique ID. +// // Returns an error in case the response's feature is not supported on this endpoint. func (endpoint *Endpoint) CreateCallResult(confirmation ocpp.Response, uniqueId string) (*CallResult, error) { action := confirmation.GetFeatureName() diff --git a/ocppj/ocppj_test.go b/ocppj/ocppj_test.go index 4a5ab566..6fb4d91d 100644 --- a/ocppj/ocppj_test.go +++ b/ocppj/ocppj_test.go @@ -27,6 +27,7 @@ func NewMockWebSocket(id string) MockWebSocket { } // ---------------------- MOCK WEBSOCKET SERVER ---------------------- + type MockWebsocketServer struct { mock.Mock ws.WsServer @@ -68,6 +69,7 @@ func (websocketServer *MockWebsocketServer) NewClient(websocketId string, client } // ---------------------- MOCK WEBSOCKET CLIENT ---------------------- + type MockWebsocketClient struct { mock.Mock ws.WsClient @@ -143,6 +145,7 @@ func newMockConfirmation(value string) *MockConfirmation { } // ---------------------- COMMON UTILITY METHODS ---------------------- + func NewWebsocketServer(t *testing.T, onMessage func(data []byte) ([]byte, error)) *ws.Server { wsServer := ws.Server{} wsServer.SetMessageHandler(func(ws ws.Channel, data []byte) error { @@ -244,12 +247,15 @@ func init() { } // ---------------------- TESTS ---------------------- + type OcppJTestSuite struct { suite.Suite - chargePoint *ocppj.Client - centralSystem *ocppj.Server - mockServer *MockWebsocketServer - mockClient *MockWebsocketClient + chargePoint *ocppj.Client + centralSystem *ocppj.Server + mockServer *MockWebsocketServer + mockClient *MockWebsocketClient + clientRequestQueue ocppj.RequestQueue + serverRequestMap ocppj.ServerQueueMap } func (suite *OcppJTestSuite) SetupTest() { @@ -258,11 +264,12 @@ func (suite *OcppJTestSuite) SetupTest() { mockServer := MockWebsocketServer{} suite.mockClient = &mockClient suite.mockServer = &mockServer - suite.chargePoint = ocppj.NewClient("mock_id", suite.mockClient, mockProfile) - suite.centralSystem = ocppj.NewServer(suite.mockServer, mockProfile) + suite.clientRequestQueue = ocppj.NewFIFOClientQueue(queueCapacity) + suite.chargePoint = ocppj.NewClient("mock_id", suite.mockClient, suite.clientRequestQueue, mockProfile) + suite.serverRequestMap = ocppj.NewFIFOQueueMap(queueCapacity) + suite.centralSystem = ocppj.NewServer(suite.mockServer, suite.serverRequestMap, mockProfile) } -// Protocol functions test func (suite *OcppJTestSuite) TestGetProfile() { t := suite.T() profile, ok := suite.chargePoint.GetProfile("mock") @@ -364,11 +371,10 @@ func (suite *OcppJTestSuite) TestCreateCall() { assert.True(t, ok) assert.NotNil(t, message) assert.Equal(t, mockValue, message.MockValue) - // Check that request was stored as pending request + // Check that request was not yet stored as pending request pendingRequest, exists := suite.chargePoint.GetPendingRequest(call.UniqueId) - assert.True(t, exists) - assert.NotNil(t, pendingRequest) - suite.chargePoint.DeletePendingRequest(call.UniqueId) + assert.False(t, exists) + assert.Nil(t, pendingRequest) } func (suite *OcppJTestSuite) TestCreateCallResult() { @@ -513,10 +519,12 @@ func (suite *OcppJTestSuite) TestParseMessageInvalidCallError() { t := suite.T() mockMessage := make([]interface{}, 3) messageId := "12345" + pendingRequest := newMockRequest("request") // Test invalid message length mockMessage[0] = float64(ocppj.CALL_ERROR) // Message Type ID mockMessage[1] = messageId // Unique ID mockMessage[2] = ocppj.GenericError + suite.chargePoint.AddPendingRequest(messageId, pendingRequest) // Manually add a pending request, so that response is not rejected message, protoErr := suite.chargePoint.ParseMessage(mockMessage) assert.Nil(t, message) assert.NotNil(t, protoErr) @@ -559,7 +567,7 @@ func (suite *OcppJTestSuite) TestParseMessageInvalidConfirmation() { mockMessage[0] = float64(ocppj.CALL_RESULT) // Message Type ID mockMessage[1] = messageId // Unique ID mockMessage[2] = mockConfirmation - suite.chargePoint.AddPendingRequest(messageId, pendingRequest) + suite.chargePoint.AddPendingRequest(messageId, pendingRequest) // Manually add a pending request, so that response is not rejected message, protoErr := suite.chargePoint.ParseMessage(mockMessage) assert.Nil(t, message) assert.NotNil(t, protoErr) @@ -567,7 +575,7 @@ func (suite *OcppJTestSuite) TestParseMessageInvalidConfirmation() { assert.Equal(t, ocppj.OccurrenceConstraintViolation, protoErr.Code) // Test invalid request -> max constraint wrong mockConfirmation.MockValue = "min" - suite.chargePoint.AddPendingRequest(messageId, pendingRequest) + suite.chargePoint.AddPendingRequest(messageId, pendingRequest) // Manually add a pending request, so that responses are not rejected message, protoErr = suite.chargePoint.ParseMessage(mockMessage) assert.Nil(t, message) assert.NotNil(t, protoErr) @@ -601,6 +609,8 @@ func (suite *OcppJTestSuite) TestParseCall() { //TODO: implement further ocpp-j protocol tests -func TestOcppJ(t *testing.T) { +func TestMockOcppJ(t *testing.T) { suite.Run(t, new(OcppJTestSuite)) + suite.Run(t, new(ClientQueueTestSuite)) + suite.Run(t, new(ServerQueueMapTestSuite)) } diff --git a/ocppj/queue.go b/ocppj/queue.go new file mode 100644 index 00000000..7e066acf --- /dev/null +++ b/ocppj/queue.go @@ -0,0 +1,176 @@ +package ocppj + +import ( + "container/list" + "errors" + "sync" +) + +// RequestBundle is a convenience struct for passing a call object struct and the +// raw byte data into the queue containing outgoing requests. +type RequestBundle struct { + Call *Call + Data []byte +} + +// RequestQueue can be arbitrarily implemented, as long as it conforms to the Queue interface. +// +// A RequestQueue is used by ocppj client and server to manage outgoing requests. +// The underlying data structure must be thread-safe, since different goroutines may access it at the same time. +type RequestQueue interface { + // Init puts the queue in its initial state. May be used for initial setup or clearing. + Init() + // Push appends the given element at the end of the queue. + // Returns an error if the operation failed (e.g. the queue is full). + Push(element interface{}) error + // Peek returns the first element of the queue, without removing it from the data structure. + Peek() interface{} + // Pop returns the first element of the queue, removing it from the queue. + Pop() interface{} + // Size returns the current size of the queue. + Size() int + // IsFull returns true if the queue is currently full, false otherwise. + IsFull() bool + // IsEmpty returns true if the queue is currently empty, false otherwise. + IsEmpty() bool +} + +// FIFOClientQueue is a default queue implementation for OCPP-J clients. +type FIFOClientQueue struct { + requestQueue *list.List + capacity int + mutex sync.Mutex +} + +func (q *FIFOClientQueue) Init() { + q.requestQueue = q.requestQueue.Init() +} + +func (q *FIFOClientQueue) Push(element interface{}) error { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.requestQueue.Len() >= q.capacity && q.capacity > 0 { + return errors.New("request queue is full, cannot push new element") + } + q.requestQueue.PushBack(element) + return nil +} + +func (q *FIFOClientQueue) Peek() interface{} { + q.mutex.Lock() + defer q.mutex.Unlock() + el := q.requestQueue.Front() + if el == nil { + return nil + } + return el.Value +} + +func (q *FIFOClientQueue) Pop() interface{} { + q.mutex.Lock() + defer q.mutex.Unlock() + result := q.requestQueue.Front() + if result != nil { + return q.requestQueue.Remove(result) + } + return nil +} + +func (q *FIFOClientQueue) Size() int { + q.mutex.Lock() + defer q.mutex.Unlock() + return q.requestQueue.Len() +} + +func (q *FIFOClientQueue) IsFull() bool { + q.mutex.Lock() + defer q.mutex.Unlock() + return q.requestQueue.Len() >= q.capacity && q.capacity > 0 +} + +func (q *FIFOClientQueue) IsEmpty() bool { + q.mutex.Lock() + defer q.mutex.Unlock() + return q.requestQueue.Len() == 0 +} + +// NewFIFOClientQueue creates a new FIFOClientQueue with the given capacity. +// +// A FIFOQueue is backed by a linked list, and the capacity represents the maximum capacity of the queue. +// Passing capacity = 0 will create a queue without a maximum capacity. +// The capacity cannot change after creation. +func NewFIFOClientQueue(capacity int) *FIFOClientQueue { + return &FIFOClientQueue{ + requestQueue: list.New(), + capacity: capacity, + } +} + +// ServerQueueMap defines the interface for managing client request queues. +// +// An OCPP-J server may serve multiple clients at the same time, so it will need to provide a queue for each client. +type ServerQueueMap interface { + // Get retrieves the queue associated to a specific clientID. + // If no such element exists, the returned flag will be false. + Get(clientID string) (RequestQueue, bool) + // GetOrCreate retrieves the queue associated to a specific clientID. + // If no such element exists, it is created, added to the map and returned. + GetOrCreate(clientID string) RequestQueue + // Remove deletes the queue associated to a specific clientID. + // If no such element exists, nothing happens. + Remove(clientID string) + // Add inserts a new RequestQueue into the map structure. + // If such element already exists, it will be replaced with the new queue. + Add(clientID string, queue RequestQueue) +} + +// FIFOQueueMap is a default implementation of ServerQueueMap for OCPP-J servers. +// +// A FIFOQueueMap is backed by a map[string]RequestQueue. +// When calling the GetOrCreate function, if no entry for a key was found in the map, +// a new RequestQueue with the given capacity will be created. +type FIFOQueueMap struct { + data map[string]RequestQueue + queueCapacity int + mutex sync.Mutex +} + +func (f *FIFOQueueMap) Get(clientID string) (RequestQueue, bool) { + f.mutex.Lock() + defer f.mutex.Unlock() + q, ok := f.data[clientID] + return q, ok +} + +func (f *FIFOQueueMap) GetOrCreate(clientID string) RequestQueue { + f.mutex.Lock() + defer f.mutex.Unlock() + var q RequestQueue + var ok bool + q, ok = f.data[clientID] + if !ok { + q = NewFIFOClientQueue(f.queueCapacity) + f.data[clientID] = q + } + return q +} + +func (f *FIFOQueueMap) Remove(clientID string) { + f.mutex.Lock() + defer f.mutex.Unlock() + delete(f.data, clientID) +} + +func (f *FIFOQueueMap) Add(clientID string, queue RequestQueue) { + f.mutex.Lock() + defer f.mutex.Unlock() + f.data[clientID] = queue +} + +// NewFIFOQueueMap creates a new FIFOQueueMap, which will automatically create queues with the specified capacity. +// +// Passing capacity = 0 will generate queues without a maximum capacity. +// The capacity cannot change after creation. +func NewFIFOQueueMap(clientQueueCapacity int) *FIFOQueueMap { + return &FIFOQueueMap{data: map[string]RequestQueue{}, queueCapacity: clientQueueCapacity} +} diff --git a/ocppj/queue_test.go b/ocppj/queue_test.go new file mode 100644 index 00000000..c12f9853 --- /dev/null +++ b/ocppj/queue_test.go @@ -0,0 +1,173 @@ +package ocppj_test + +import ( + "github.com/lorenzodonini/ocpp-go/ocppj" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +const queueCapacity = 10 + +type ClientQueueTestSuite struct { + suite.Suite + queue ocppj.RequestQueue +} + +func (suite *ClientQueueTestSuite) SetupTest() { + suite.queue = ocppj.NewFIFOClientQueue(queueCapacity) +} + +func (suite *ClientQueueTestSuite) TestQueueEmpty() { + t := suite.T() + empty := suite.queue.IsEmpty() + assert.True(t, empty) +} + +func (suite *ClientQueueTestSuite) TestPushElement() { + t := suite.T() + req := newMockRequest("somevalue") + err := suite.queue.Push(req) + require.Nil(t, err) + assert.False(t, suite.queue.IsEmpty()) + assert.False(t, suite.queue.IsFull()) + assert.Equal(t, 1, suite.queue.Size()) +} + +func (suite *ClientQueueTestSuite) TestQueueSize() { + t := suite.T() + for i := 0; i < queueCapacity; i++ { + req := newMockRequest("somevalue") + err := suite.queue.Push(req) + require.Nil(t, err) + assert.False(t, suite.queue.IsEmpty()) + assert.Equal(t, i+1, suite.queue.Size()) + } +} + +func (suite *ClientQueueTestSuite) TestQueueFull() { + t := suite.T() + for i := 0; i < queueCapacity+2; i++ { + req := newMockRequest("somevalue") + err := suite.queue.Push(req) + if i < queueCapacity { + require.Nil(t, err) + if i < queueCapacity-1 { + assert.False(t, suite.queue.IsFull()) + } else { + assert.True(t, suite.queue.IsFull()) + } + } else { + require.NotNil(t, err) + assert.True(t, suite.queue.IsFull()) + } + } +} + +func (suite *ClientQueueTestSuite) TestPeekElement() { + t := suite.T() + req := newMockRequest("somevalue") + err := suite.queue.Push(req) + require.Nil(t, err) + el := suite.queue.Peek() + require.NotNil(t, el) + peeked, ok := el.(*MockRequest) + require.True(t, ok) + require.NotNil(t, peeked) + assert.Equal(t, req.MockValue, peeked.MockValue) + assert.False(t, suite.queue.IsEmpty()) + assert.False(t, suite.queue.IsFull()) + assert.Equal(t, 1, suite.queue.Size()) +} + +func (suite *ClientQueueTestSuite) TestPopElement() { + t := suite.T() + req := newMockRequest("somevalue") + err := suite.queue.Push(req) + require.Nil(t, err) + el := suite.queue.Pop() + require.NotNil(t, el) + popped, ok := el.(*MockRequest) + require.True(t, ok) + require.NotNil(t, popped) + assert.Equal(t, req.MockValue, popped.MockValue) + assert.True(t, suite.queue.IsEmpty()) + assert.False(t, suite.queue.IsFull()) +} + +func (suite *ClientQueueTestSuite) TestQueueNoCapacity() { + t := suite.T() + suite.queue = ocppj.NewFIFOClientQueue(0) + for i := 0; i < 50; i++ { + req := newMockRequest("somevalue") + err := suite.queue.Push(req) + require.Nil(t, err) + } + assert.False(t, suite.queue.IsFull()) +} + +func (suite *ClientQueueTestSuite) TestQueueClear() { + t := suite.T() + for i := 0; i < queueCapacity; i++ { + req := newMockRequest("somevalue") + err := suite.queue.Push(req) + require.Nil(t, err) + } + assert.True(t, suite.queue.IsFull()) + suite.queue.Init() + assert.True(t, suite.queue.IsEmpty()) + assert.Equal(t, 0, suite.queue.Size()) +} + +type ServerQueueMapTestSuite struct { + suite.Suite + queueMap ocppj.ServerQueueMap +} + +func (suite *ServerQueueMapTestSuite) SetupTest() { + suite.queueMap = ocppj.NewFIFOQueueMap(queueCapacity) +} + +func (suite *ServerQueueMapTestSuite) TestAddElement() { + t := suite.T() + q := ocppj.NewFIFOClientQueue(0) + el := "element1" + _ = q.Push(el) + id := "test" + suite.queueMap.Add(id, q) + + retrieved, ok := suite.queueMap.Get(id) + require.True(t, ok) + require.NotNil(t, retrieved) + assert.False(t, retrieved.IsEmpty()) + assert.Equal(t, 1, retrieved.Size()) + assert.Equal(t, el, retrieved.Peek()) +} + +func (suite *ServerQueueMapTestSuite) TestGetOrCreate() { + t := suite.T() + el := "element1" + id := "test" + q, ok := suite.queueMap.Get(id) + require.False(t, ok) + require.Nil(t, q) + q = suite.queueMap.GetOrCreate(id) + require.NotNil(t, q) + _ = q.Push(el) + // Verify consistency + q, ok = suite.queueMap.Get(id) + require.True(t, ok) + assert.Equal(t, 1, q.Size()) + assert.Equal(t, el, q.Peek()) +} + +func (suite *ServerQueueMapTestSuite) TestRemove() { + t := suite.T() + id := "test" + q := suite.queueMap.GetOrCreate(id) + require.NotNil(t, q) + suite.queueMap.Remove(id) + q, ok := suite.queueMap.Get(id) + assert.False(t, ok) + assert.Nil(t, q) +} diff --git a/ocppj/server.go b/ocppj/server.go index 52ad8ee9..013460b0 100644 --- a/ocppj/server.go +++ b/ocppj/server.go @@ -1,9 +1,9 @@ package ocppj import ( + "fmt" "github.com/lorenzodonini/ocpp-go/ocpp" "github.com/lorenzodonini/ocpp-go/ws" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -17,20 +17,23 @@ type Server struct { requestHandler func(clientID string, request ocpp.Request, requestId string, action string) responseHandler func(clientID string, response ocpp.Response, requestId string) errorHandler func(clientID string, err *ocpp.Error, details interface{}) - clientPendingMessages map[string]string + requestQueueMap ServerQueueMap + readyForDispatch chan string + requestChannel chan string + pendingRequests map[string]string } // Creates a new Server endpoint. // Requires a a websocket server and a list of profiles (optional). -func NewServer(wsServer ws.WsServer, profiles ...*ocpp.Profile) *Server { +func NewServer(wsServer ws.WsServer, requestMap ServerQueueMap, profiles ...*ocpp.Profile) *Server { endpoint := Endpoint{pendingRequests: map[string]ocpp.Request{}} for _, profile := range profiles { endpoint.AddProfile(profile) } if wsServer != nil { - return &Server{Endpoint: endpoint, server: wsServer, clientPendingMessages: map[string]string{}} + return &Server{Endpoint: endpoint, server: wsServer, requestQueueMap: requestMap, readyForDispatch: make(chan string, 1), pendingRequests: map[string]string{}} } else { - return &Server{Endpoint: endpoint, server: &ws.Server{}, clientPendingMessages: map[string]string{}} + return &Server{Endpoint: endpoint, server: &ws.Server{}, requestQueueMap: requestMap, readyForDispatch: make(chan string, 1), pendingRequests: map[string]string{}} } } @@ -73,12 +76,16 @@ func (s *Server) Start(listenPort int, listenPath string) { } }) s.server.SetDisconnectedClientHandler(func(ws ws.Channel) { - delete(s.clientPendingMessages, ws.GetID()) + //TODO: handle reconnection and don't delete request queue + s.requestQueueMap.Remove(ws.GetID()) + s.requestChannel <- ws.GetID() if s.disconnectedClientHandler != nil { s.disconnectedClientHandler(ws.GetID()) } }) s.server.SetMessageHandler(s.ocppMessageHandler) + s.requestChannel = make(chan string, 1) + go s.requestPump() // Serve & run // TODO: return error? s.server.Start(listenPort, listenPath) @@ -95,23 +102,21 @@ func (s *Server) Stop() { // // Returns an error in the following cases: // -// - message validation fails (request is malformed) +// - the server wasn't started // -// - another request for that client is already pending +// - message validation fails (request is malformed) // // - the endpoint doesn't support the feature // -// - a network error occurred +// - the output queue is full func (s *Server) SendRequest(clientID string, request ocpp.Request) error { + if s.requestChannel == nil { + return fmt.Errorf("ocppj server is not started, couldn't send request") + } err := Validate.Struct(request) if err != nil { return err } - req, ok := s.clientPendingMessages[clientID] - if ok { - // Cannot send. Protocol is based on request-response - return errors.Errorf("There already is a pending request %v for client %v. Cannot send a further one before receiving a response first", req, clientID) - } call, err := s.CreateCall(request.(ocpp.Request)) if err != nil { return err @@ -120,14 +125,111 @@ func (s *Server) SendRequest(clientID string, request ocpp.Request) error { if err != nil { return err } - s.clientPendingMessages[clientID] = call.UniqueId - err = s.server.Write(clientID, []byte(jsonMessage)) + // Will not send right away. Queuing message and let it be processed by dedicated requestPump routine + q := s.requestQueueMap.GetOrCreate(clientID) + if q.IsFull() { + return fmt.Errorf("request queue for client %v is full, cannot send new request", clientID) + } + err = q.Push(RequestBundle{call, jsonMessage}) + if err != nil { + return err + } + log.Debugf("enqueued request %v - %v for client %v", call.UniqueId, call.Action, clientID) + // Notify requestPump that a new request for ClientID is pending + s.requestChannel <- clientID + return nil +} + +// requestPump processes new outgoing requests for each client and makes sure they are processed sequentially. +// This method is executed by a dedicated coroutine as soon as the server is started and runs indefinitely. +func (s *Server) requestPump() { + var clientID string + var ok bool + var rdy bool + var clientQueue RequestQueue + clientReadyMap := map[string]bool{} // Empty at the beginning + for { + select { + case clientID, ok = <-s.requestChannel: + // Check if channel was closed + if !ok { + log.Infof("stopped processing requests") + s.clearPendingRequests() + s.requestChannel = nil + return + } + clientQueue, ok = s.requestQueueMap.Get(clientID) + // Check whether there is a request queue for the specified client + if !ok { + // No client queue found, deleting the ready flag + delete(clientReadyMap, clientID) + break + } + // Check whether can transmit to client + rdy, ok = clientReadyMap[clientID] + if !ok { + // First request sent to client. Setting ready flag to true + rdy = true + clientReadyMap[clientID] = rdy + } + case clientID = <-s.readyForDispatch: + // Client can now transmit again + rdy = true + clientReadyMap[clientID] = rdy + clientQueue, _ = s.requestQueueMap.Get(clientID) + } + // Only dispatch request if able to send and request queue isn't empty + if rdy && !clientQueue.IsEmpty() { + s.dispatchNextRequest(clientID) + // Update ready state + rdy = false + clientReadyMap[clientID] = rdy + } + } +} + +func (s *Server) dispatchNextRequest(clientID string) { + // Get first element in queue + q, _ := s.requestQueueMap.Get(clientID) + el := q.Peek() + bundle, _ := el.(RequestBundle) + jsonMessage := bundle.Data + callID := bundle.Call.GetUniqueId() + s.AddPendingRequest(callID, bundle.Call.Payload) + s.pendingRequests[clientID] = callID + err := s.server.Write(clientID, jsonMessage) if err != nil { - // Clear pending request - s.DeletePendingRequest(call.GetUniqueId()) - delete(s.clientPendingMessages, clientID) + log.Errorf("error while sending message: %v", err) + //TODO: handle retransmission instead of removing pending request + s.completePendingRequest(clientID, callID) + if s.errorHandler != nil { + s.errorHandler(clientID, ocpp.NewError(GenericError, err.Error(), callID), err) + } + } else { + // Transmitted correctly + log.Debugf("sent request %v to client %v: %v", callID, clientID, string(jsonMessage)) } - return err +} + +func (s *Server) completePendingRequest(clientID string, requestID string) { + q, _ := s.requestQueueMap.Get(clientID) + el := q.Peek() + if el == nil { + log.Errorf("attempting to pop front of queue, but queue is empty") + return + } + bundle, _ := el.(RequestBundle) + callID := bundle.Call.GetUniqueId() + if callID != requestID { + log.Fatalf("internal state mismatch: received response for %v but expected response for %v", requestID, callID) + return + } + q.Pop() + delete(s.pendingRequests, clientID) + s.DeletePendingRequest(callID) + log.Debugf("removed request %v from front of queue", callID) + // Signal that next message in queue may be sent + s.readyForDispatch <- clientID } // Sends an OCPP Response to a client, identified by the clientID parameter. @@ -196,12 +298,16 @@ func (s *Server) ocppMessageHandler(wsChannel ws.Channel, data []byte) error { s.requestHandler(wsChannel.GetID(), call.Payload, call.UniqueId, call.Action) case CALL_RESULT: callResult := message.(*CallResult) - delete(s.clientPendingMessages, wsChannel.GetID()) - s.responseHandler(wsChannel.GetID(), callResult.Payload, callResult.UniqueId) + s.completePendingRequest(wsChannel.GetID(), callResult.GetUniqueId()) // Remove current request from queue and send next one + if s.responseHandler != nil { + s.responseHandler(wsChannel.GetID(), callResult.Payload, callResult.UniqueId) + } case CALL_ERROR: callError := message.(*CallError) - delete(s.clientPendingMessages, wsChannel.GetID()) - s.errorHandler(wsChannel.GetID(), ocpp.NewError(callError.ErrorCode, callError.ErrorDescription, callError.UniqueId), callError.ErrorDetails) + s.completePendingRequest(wsChannel.GetID(), callError.GetUniqueId()) // Remove current request from queue and send next one + if s.errorHandler != nil { + s.errorHandler(wsChannel.GetID(), ocpp.NewError(callError.ErrorCode, callError.ErrorDescription, callError.UniqueId), callError.ErrorDetails) + } } return nil }