Skip to content

Commit

Permalink
Merge b351c41 into bf7b631
Browse files Browse the repository at this point in the history
  • Loading branch information
lorenzodonini committed Aug 29, 2020
2 parents bf7b631 + b351c41 commit 9f13df0
Show file tree
Hide file tree
Showing 15 changed files with 1,243 additions and 204 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ docker run -it -p 8887:8887 --rm --name central-system ldonini/ocpp1.6-central-s

You can also run it directly using docker-compose:
```sh
docker-compose -f example/1.6/docker-compose.yml up central_system
docker-compose -f example/1.6/docker-compose.yml up central-system
```

#### TLS
Expand Down Expand Up @@ -293,7 +293,7 @@ You need to specify the host, on which the central system is running, in order f

You can also run it directly using docker-compose:
```sh
docker-compose -f example/1.6/docker-compose.yml up charge_point
docker-compose -f example/1.6/docker-compose.yml up charge-point
```

#### TLS
Expand All @@ -304,7 +304,7 @@ Feel free to use the utility script `cd example/1.6 && ./create-test-certificate

Then run the following:
```
docker-compose -f example/1.6/docker-compose.tls.yml up charge_point
docker-compose -f example/1.6/docker-compose.tls.yml up charge-point
```

## OCPP 2.0 Usage
Expand Down
39 changes: 4 additions & 35 deletions example/1.6/cp/charge_point_sim.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,6 @@ func setupTlsChargePoint(chargePointID string) ocpp16.ChargePoint {
return ocpp16.NewChargePoint(chargePointID, nil, client)
}

// Used for scheduling requests that are not generated from the main thread.
// OCPP is a request-response protocol and doesn't support multiple simultaneous requests.
//
// By scheduling a request, it can later on be sent safely from the main thread.
func scheduleAsyncRequest(asyncRequest func()) {
asyncRequestChan <- asyncRequest
}

// Wait is used for simulating idle time, while being able to process trigger requests.
//
// If trigger requests (or other asynchronous requests) are fired while waiting, the request will be sent.
// This mechanism prevents errors when attempting to send a request while another one is still pending.
func wait(d time.Duration) {
t := time.NewTimer(d)
for {
select {
case req, ok := <-asyncRequestChan:
if !ok {
return
}
req()
case <-t.C:
log.Debugf("finished simulated wait")
return
}
}
}

// exampleRoutine simulates a charge point flow, where
func exampleRoutine(chargePoint ocpp16.ChargePoint, stateHandler *ChargePointHandler) {
dummyClientIdTag := "12345"
Expand All @@ -105,7 +77,7 @@ func exampleRoutine(chargePoint ocpp16.ChargePoint, stateHandler *ChargePointHan
// Notify connector status
updateStatus(stateHandler, 0, core.ChargePointStatusAvailable)
// Wait for some time ...
wait(5 * time.Second)
time.Sleep(5 * time.Second)
// Simulate charging for connector 1
authConf, err := chargePoint.Authorize(dummyClientIdTag)
checkError(err)
Expand All @@ -125,7 +97,7 @@ func exampleRoutine(chargePoint ocpp16.ChargePoint, stateHandler *ChargePointHan
if !ok {
sampleInterval = 5
}
wait(time.Second * time.Duration(sampleInterval))
time.Sleep(time.Second * time.Duration(sampleInterval))
stateHandler.meterValue += 10
sampledValue := types.SampledValue{Value: fmt.Sprintf("%v", stateHandler.meterValue), Unit: types.UnitOfMeasureWh, Format: types.ValueFormatRaw, Measurand: types.MeasurandEnergyActiveExportRegister, Context: types.ReadingContextSamplePeriodic, Location: types.LocationOutlet}
meterValue := types.MeterValue{Timestamp: types.NewDateTime(time.Now()), SampledValue: []types.SampledValue{sampledValue}}
Expand All @@ -147,7 +119,7 @@ func exampleRoutine(chargePoint ocpp16.ChargePoint, stateHandler *ChargePointHan
// Update connector status
updateStatus(stateHandler, chargingConnector, core.ChargePointStatusAvailable)
// Wait for some time ...
wait(5 * time.Minute)
time.Sleep(5 * time.Minute)
}

// Start function
Expand Down Expand Up @@ -193,13 +165,10 @@ func main() {
// Connects to central system
err := chargePoint.Start(csUrl)
if err != nil {
log.Println(err)
log.Errorln(err)
} else {
log.Infof("connected to central system at %v", csUrl)
// Setup channel for asynchronous requests (e.g. triggers)
asyncRequestChan = make(chan func(), 5)
exampleRoutine(chargePoint, handler)
close(asyncRequestChan)
// Disconnect
chargePoint.Stop()
log.Infof("disconnected from central system")
Expand Down
32 changes: 11 additions & 21 deletions example/1.6/cp/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ type ChargePointHandler struct {
localAuthListVersion int
}

var asyncRequestChan chan func()

var chargePoint ocpp16.ChargePoint

func (handler *ChargePointHandler) isValidConnectorID(ID int) bool {
Expand Down Expand Up @@ -215,24 +213,22 @@ func (handler *ChargePointHandler) OnTriggerMessage(request *remotetrigger.Trigg
break
case firmware.DiagnosticsStatusNotificationFeatureName:
// Schedule diagnostics status notification request
fn := func() {
go func() {
_, e := chargePoint.DiagnosticsStatusNotification(firmware.DiagnosticsStatusIdle)
checkError(e)
logDefault(firmware.DiagnosticsStatusNotificationFeatureName).Info("diagnostics status notified")
}
scheduleAsyncRequest(fn)
}()
status = remotetrigger.TriggerMessageStatusAccepted
case firmware.FirmwareStatusNotificationFeatureName:
//TODO: schedule firmware status notification message
break
case core.HeartbeatFeatureName:
// Schedule heartbeat request
fn := func() {
go func() {
conf, e := chargePoint.Heartbeat()
checkError(e)
logDefault(core.HeartbeatFeatureName).Infof("clock synchronized: %v", conf.CurrentTime.FormatTimestamp())
}
scheduleAsyncRequest(fn)
}()
status = remotetrigger.TriggerMessageStatusAccepted
case core.MeterValuesFeatureName:
//TODO: schedule meter values message
Expand All @@ -245,16 +241,15 @@ func (handler *ChargePointHandler) OnTriggerMessage(request *remotetrigger.Trigg
return remotetrigger.NewTriggerMessageConfirmation(remotetrigger.TriggerMessageStatusRejected), nil
}
// Schedule status notification request
fn := func() {
go func() {
status := handler.status
if c, ok := handler.connectors[connectorID]; ok {
status = c.status
}
statusConfirmation, err := chargePoint.StatusNotification(connectorID, handler.errorCode, status)
checkError(err)
logDefault(statusConfirmation.GetFeatureName()).Infof("status for connector %v sent: %v", connectorID, status)
}
scheduleAsyncRequest(fn)
}()
status = remotetrigger.TriggerMessageStatusAccepted
default:
return remotetrigger.NewTriggerMessageConfirmation(remotetrigger.TriggerMessageStatusNotImplemented), nil
Expand Down Expand Up @@ -348,24 +343,19 @@ func updateFirmwareStatus(status firmware.FirmwareStatus, props ...func(request
}

func updateFirmware(location string, retrieveDate *types.DateTime, retries int, retryInterval int) {
fn := func(status firmware.FirmwareStatus) func() {
return func() {
updateFirmwareStatus(status)
}
}
scheduleAsyncRequest(fn(firmware.FirmwareStatusDownloading))
updateFirmwareStatus(firmware.FirmwareStatusDownloading)
err := downloadFile("/tmp/out.bin", location)
if err != nil {
logDefault(firmware.UpdateFirmwareFeatureName).Errorf("error while downloading file %v", err)
scheduleAsyncRequest(fn(firmware.FirmwareStatusDownloadFailed))
updateFirmwareStatus(firmware.FirmwareStatusDownloadFailed)
return
}
scheduleAsyncRequest(fn(firmware.FirmwareStatusDownloaded))
updateFirmwareStatus(firmware.FirmwareStatusDownloaded)
// Simulate installation
scheduleAsyncRequest(fn(firmware.FirmwareStatusInstalling))
updateFirmwareStatus(firmware.FirmwareStatusInstalling)
time.Sleep(time.Second * 5)
// Notify completion
scheduleAsyncRequest(fn(firmware.FirmwareStatusInstalled))
updateFirmwareStatus(firmware.FirmwareStatusInstalled)
}

func downloadFile(filepath string, url string) error {
Expand Down
4 changes: 2 additions & 2 deletions ocpp1.6/v16.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewChargePoint(id string, dispatcher *ocppj.Client, client ws.WsClient) Cha
}
})
if dispatcher == nil {
dispatcher = ocppj.NewClient(id, client, core.Profile, localauth.Profile, firmware.Profile, reservation.Profile, remotetrigger.Profile, smartcharging.Profile)
dispatcher = ocppj.NewClient(id, client, ocppj.NewFIFOClientQueue(0), core.Profile, localauth.Profile, firmware.Profile, reservation.Profile, remotetrigger.Profile, smartcharging.Profile)
}
cp := chargePoint{client: dispatcher, confirmationHandler: make(chan ocpp.Response), errorHandler: make(chan error)}
cp.client.SetResponseHandler(func(confirmation ocpp.Response, requestId string) {
Expand Down Expand Up @@ -249,7 +249,7 @@ func NewCentralSystem(dispatcher *ocppj.Server, server ws.WsServer) CentralSyste
}
server.AddSupportedSubprotocol(types.V16Subprotocol)
if dispatcher == nil {
dispatcher = ocppj.NewServer(server, core.Profile, localauth.Profile, firmware.Profile, reservation.Profile, remotetrigger.Profile, smartcharging.Profile)
dispatcher = ocppj.NewServer(server, ocppj.NewFIFOQueueMap(0), core.Profile, localauth.Profile, firmware.Profile, reservation.Profile, remotetrigger.Profile, smartcharging.Profile)
}
cs := centralSystem{
server: dispatcher,
Expand Down
25 changes: 21 additions & 4 deletions ocpp1.6_test/ocpp16_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (websocketClient *MockWebsocketClient) Write(data []byte) error {
func (websocketClient *MockWebsocketClient) AddOption(option interface{}) {
}

// Default queue capacity
const queueCapacity = 10

// ---------------------- MOCK FEATURE ----------------------
const (
MockFeatureName = "Mock"
Expand Down Expand Up @@ -554,14 +557,24 @@ func testUnsupportedRequestFromCentralSystem(suite *OcppV16TestSuite, request oc
suite.centralSystem.Start(8887, "somePath")
err := suite.chargePoint.Start(wsUrl)
assert.Nil(t, err)
// Run request test
// Run request test, expecting an error
err = suite.centralSystem.SendRequestAsync(wsId, request, func(confirmation ocpp.Response, err error) {
t.Fail()
})
assert.Error(t, err)
assert.Equal(t, expectedError, err.Error())
// Run response test
// Add mocked request in queue and mark as pending, otherwise response will be ignored
mockCall, _ := suite.ocppjCentralSystem.CreateCall(request)
mockCall.UniqueId = messageId
jsonMessage, _ := mockCall.MarshalJSON()
requestBundle := ocppj.RequestBundle{
Call: mockCall,
Data: jsonMessage,
}
q := suite.serverRequestMap.GetOrCreate(wsId)
_ = q.Push(requestBundle)
suite.ocppjCentralSystem.AddPendingRequest(messageId, request)
// Run response test
err = suite.mockWsClient.MessageHandler([]byte(requestJson))
assert.Nil(t, err)
}
Expand Down Expand Up @@ -603,6 +616,8 @@ type OcppV16TestSuite struct {
chargePoint ocpp16.ChargePoint
centralSystem ocpp16.CentralSystem
messageIdGenerator TestRandomIdGenerator
clientRequestQueue ocppj.RequestQueue
serverRequestMap ocppj.ServerQueueMap
}

type TestRandomIdGenerator struct {
Expand All @@ -626,8 +641,10 @@ func (suite *OcppV16TestSuite) SetupTest() {
mockServer := MockWebsocketServer{}
suite.mockWsClient = &mockClient
suite.mockWsServer = &mockServer
suite.ocppjChargePoint = ocppj.NewClient("test_id", suite.mockWsClient, coreProfile, localAuthListProfile, firmwareProfile, reservationProfile, remoteTriggerProfile, smartChargingProfile)
suite.ocppjCentralSystem = ocppj.NewServer(suite.mockWsServer, coreProfile, localAuthListProfile, firmwareProfile, reservationProfile, remoteTriggerProfile, smartChargingProfile)
suite.clientRequestQueue = ocppj.NewFIFOClientQueue(queueCapacity)
suite.serverRequestMap = ocppj.NewFIFOQueueMap(queueCapacity)
suite.ocppjChargePoint = ocppj.NewClient("test_id", suite.mockWsClient, suite.clientRequestQueue, coreProfile, localAuthListProfile, firmwareProfile, reservationProfile, remoteTriggerProfile, smartChargingProfile)
suite.ocppjCentralSystem = ocppj.NewServer(suite.mockWsServer, suite.serverRequestMap, coreProfile, localAuthListProfile, firmwareProfile, reservationProfile, remoteTriggerProfile, smartChargingProfile)
suite.chargePoint = ocpp16.NewChargePoint("test_id", suite.ocppjChargePoint, suite.mockWsClient)
suite.centralSystem = ocpp16.NewCentralSystem(suite.ocppjCentralSystem, suite.mockWsServer)
suite.messageIdGenerator = TestRandomIdGenerator{generator: func() string {
Expand Down
4 changes: 2 additions & 2 deletions ocpp2.0/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func NewChargingStation(id string, dispatcher *ocppj.Client, client ws.WsClient)
}
})
if dispatcher == nil {
dispatcher = ocppj.NewClient(id, client, authorization.Profile, availability.Profile, data.Profile, diagnostics.Profile, display.Profile, firmware.Profile, iso15118.Profile, localauth.Profile, meter.Profile, provisioning.Profile, remotecontrol.Profile, reservation.Profile, security.Profile, smartcharging.Profile, tariffcost.Profile, transactions.Profile)
dispatcher = ocppj.NewClient(id, client, ocppj.NewFIFOClientQueue(0), authorization.Profile, availability.Profile, data.Profile, diagnostics.Profile, display.Profile, firmware.Profile, iso15118.Profile, localauth.Profile, meter.Profile, provisioning.Profile, remotecontrol.Profile, reservation.Profile, security.Profile, smartcharging.Profile, tariffcost.Profile, transactions.Profile)
}
cs := chargingStation{client: dispatcher, responseHandler: make(chan ocpp.Response), errorHandler: make(chan error)}
cs.client.SetResponseHandler(func(confirmation ocpp.Response, requestId string) {
Expand Down Expand Up @@ -314,7 +314,7 @@ func NewCSMS(dispatcher *ocppj.Server, server ws.WsServer) CSMS {
}
server.AddSupportedSubprotocol(types.V2Subprotocol)
if dispatcher == nil {
dispatcher = ocppj.NewServer(server, authorization.Profile, availability.Profile, data.Profile, diagnostics.Profile, display.Profile, firmware.Profile, iso15118.Profile, localauth.Profile, meter.Profile, provisioning.Profile, remotecontrol.Profile, reservation.Profile, security.Profile, smartcharging.Profile, tariffcost.Profile, transactions.Profile)
dispatcher = ocppj.NewServer(server, ocppj.NewFIFOQueueMap(0), authorization.Profile, availability.Profile, data.Profile, diagnostics.Profile, display.Profile, firmware.Profile, iso15118.Profile, localauth.Profile, meter.Profile, provisioning.Profile, remotecontrol.Profile, reservation.Profile, security.Profile, smartcharging.Profile, tariffcost.Profile, transactions.Profile)
}
cs := csms{
server: dispatcher,
Expand Down
25 changes: 21 additions & 4 deletions ocpp2.0_test/ocpp2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (websocketClient *MockWebsocketClient) Write(data []byte) error {
func (websocketClient *MockWebsocketClient) AddOption(option interface{}) {
}

// Default queue capacity
const queueCapacity = 10

// ---------------------- MOCK FEATURE ----------------------
const (
MockFeatureName = "Mock"
Expand Down Expand Up @@ -751,14 +754,24 @@ func testUnsupportedRequestFromCentralSystem(suite *OcppV2TestSuite, request ocp
suite.csms.Start(8887, "somePath")
err := suite.chargingStation.Start(wsUrl)
require.Nil(t, err)
// Run request test
// Run request test, expecting an error
err = suite.csms.SendRequestAsync(wsId, request, func(response ocpp.Response, err error) {
t.Fail()
})
require.Error(t, err)
assert.Equal(t, expectedError, err.Error())
// Run response test
// Add mocked request in queue and mark as pending, otherwise response will be ignored
mockCall, _ := suite.ocppjServer.CreateCall(request)
mockCall.UniqueId = messageId
jsonMessage, _ := mockCall.MarshalJSON()
requestBundle := ocppj.RequestBundle{
Call: mockCall,
Data: jsonMessage,
}
q := suite.serverRequestMap.GetOrCreate(wsId)
_ = q.Push(requestBundle)
suite.ocppjServer.AddPendingRequest(messageId, request)
// Run response test
err = suite.mockWsClient.MessageHandler([]byte(requestJson))
assert.Nil(t, err)
}
Expand Down Expand Up @@ -791,6 +804,8 @@ type OcppV2TestSuite struct {
chargingStation ocpp2.ChargingStation
csms ocpp2.CSMS
messageIdGenerator TestRandomIdGenerator
clientRequestQueue ocppj.RequestQueue
serverRequestMap ocppj.ServerQueueMap
}

type TestRandomIdGenerator struct {
Expand Down Expand Up @@ -825,8 +840,10 @@ func (suite *OcppV2TestSuite) SetupTest() {
mockServer := MockWebsocketServer{}
suite.mockWsClient = &mockClient
suite.mockWsServer = &mockServer
suite.ocppjClient = ocppj.NewClient("test_id", suite.mockWsClient, securityProfile, provisioningProfile, authProfile, availabilityProfile, reservationProfile, diagnosticsProfile, dataProfile, displayProfile, firmwareProfile, isoProfile, localAuthProfile, meterProfile, remoteProfile, smartChargingProfile, tariffProfile, transactionsProfile)
suite.ocppjServer = ocppj.NewServer(suite.mockWsServer, securityProfile, provisioningProfile, authProfile, availabilityProfile, reservationProfile, diagnosticsProfile, dataProfile, displayProfile, firmwareProfile, isoProfile, localAuthProfile, meterProfile, remoteProfile, smartChargingProfile, tariffProfile, transactionsProfile)
suite.clientRequestQueue = ocppj.NewFIFOClientQueue(queueCapacity)
suite.serverRequestMap = ocppj.NewFIFOQueueMap(queueCapacity)
suite.ocppjClient = ocppj.NewClient("test_id", suite.mockWsClient, suite.clientRequestQueue, securityProfile, provisioningProfile, authProfile, availabilityProfile, reservationProfile, diagnosticsProfile, dataProfile, displayProfile, firmwareProfile, isoProfile, localAuthProfile, meterProfile, remoteProfile, smartChargingProfile, tariffProfile, transactionsProfile)
suite.ocppjServer = ocppj.NewServer(suite.mockWsServer, suite.serverRequestMap, securityProfile, provisioningProfile, authProfile, availabilityProfile, reservationProfile, diagnosticsProfile, dataProfile, displayProfile, firmwareProfile, isoProfile, localAuthProfile, meterProfile, remoteProfile, smartChargingProfile, tariffProfile, transactionsProfile)
suite.chargingStation = ocpp2.NewChargingStation("test_id", suite.ocppjClient, suite.mockWsClient)
suite.csms = ocpp2.NewCSMS(suite.ocppjServer, suite.mockWsServer)
suite.messageIdGenerator = TestRandomIdGenerator{generator: func() string {
Expand Down

0 comments on commit 9f13df0

Please sign in to comment.