Skip to content

Commit

Permalink
Fix change stream on Plc4x-client.
Browse files Browse the repository at this point in the history
  • Loading branch information
riclolsen committed Jun 21, 2024
1 parent 0101151 commit df653de
Showing 1 changed file with 142 additions and 117 deletions.
259 changes: 142 additions & 117 deletions src/plc4x-client/plc4x-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func commandDelivered(collectionCommands *mongo.Collection, ID primitive.ObjectI
}

// process commands from change stream, forward commands
func iterateChangeStream(routineCtx context.Context, waitGroup *sync.WaitGroup, stream *mongo.ChangeStream, protCon *protocolConnection, collectionCommands *mongo.Collection) {
func iterateChangeStream(routineCtx context.Context, waitGroup *sync.WaitGroup, stream *mongo.ChangeStream, protConns []*protocolConnection, collectionCommands *mongo.Collection) {
defer stream.Close(routineCtx)
defer waitGroup.Done()
for stream.Next(routineCtx) {
Expand All @@ -213,107 +213,110 @@ func iterateChangeStream(routineCtx context.Context, waitGroup *sync.WaitGroup,
continue
}

if insDoc.OperationType == "insert" && insDoc.FullDocument.ProtocolSourceConnectionNumber == protCon.ProtocolConnectionNumber {
log.Printf("Commands - Command received on connection %d, %s %f", insDoc.FullDocument.ProtocolSourceConnectionNumber, insDoc.FullDocument.Tag, insDoc.FullDocument.Value)
for _, protCon := range protConns {

// test for time expired, if too old command (> 10s) then cancel it
if time.Since(insDoc.FullDocument.TimeTag) > 10*time.Second {
log.Println("Commands - Command expired ", time.Since(insDoc.FullDocument.TimeTag))
// write cancel to the command in mongo
commandCancel(collectionCommands, insDoc.FullDocument.ID, "expired")
continue
}

// All is ok, so send command to I104M UPD
if insDoc.OperationType == "insert" && insDoc.FullDocument.ProtocolSourceConnectionNumber == protCon.ProtocolConnectionNumber {
log.Printf("Commands - Command received on connection %d, %s %f", insDoc.FullDocument.ProtocolSourceConnectionNumber, insDoc.FullDocument.Tag, insDoc.FullDocument.Value)

buf := new(bytes.Buffer)
var cmdSig uint32 = 0x4b4b4b4b
err := binary.Write(buf, binary.LittleEndian, cmdSig)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var addr uint32 = uint32(insDoc.FullDocument.ProtocolSourceObjectAddress)
err = binary.Write(buf, binary.LittleEndian, addr)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var tiType uint32 = uint32(insDoc.FullDocument.ProtocolSourceASDU)
err = binary.Write(buf, binary.LittleEndian, tiType)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var value uint32 = uint32(insDoc.FullDocument.Value)
err = binary.Write(buf, binary.LittleEndian, value)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var sbo uint32 = 0
if insDoc.FullDocument.ProtocolSourceCommandUseSBO {
sbo = 1
}
err = binary.Write(buf, binary.LittleEndian, sbo)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var qu uint32 = uint32(insDoc.FullDocument.ProtocolSourceCommandDuration)
err = binary.Write(buf, binary.LittleEndian, qu)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var ca uint32 = uint32(insDoc.FullDocument.ProtocolSourceCommonAddress)
err = binary.Write(buf, binary.LittleEndian, ca)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
// test for time expired, if too old command (> 10s) then cancel it
if time.Since(insDoc.FullDocument.TimeTag) > 10*time.Second {
log.Println("Commands - Command expired ", time.Since(insDoc.FullDocument.TimeTag))
// write cancel to the command in mongo
commandCancel(collectionCommands, insDoc.FullDocument.ID, "expired")
continue
}

errMsg := ""
ok := false
for i, ipAddressDest := range protCon.IPAddresses {
// All is ok, so send command to I104M UPD

if i >= 2 { // only send to the first 2 addresses
break
buf := new(bytes.Buffer)
var cmdSig uint32 = 0x4b4b4b4b
err := binary.Write(buf, binary.LittleEndian, cmdSig)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}

if strings.TrimSpace(ipAddressDest) == "" {
errMsg = "no IP destination"
var addr uint32 = uint32(insDoc.FullDocument.ProtocolSourceObjectAddress)
err = binary.Write(buf, binary.LittleEndian, addr)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
//udpAddr, err := net.ResolveUDPAddr("udp", ipAddressDest)
//if err != nil {
// errMsg = "IP address error"
// log.Println("Commands - Error on IP: ", err)
// continue
//}
//_, err = UdpConn.WriteToUDP(buf.Bytes(), udpAddr)
//if err != nil {
// errMsg = "UDP send error"
// log.Println("Commands - Error on IP: ", err)
// continue
//}
// success delivering command
log.Println("Commands - Command sent to: ", ipAddressDest)
ok = true
// log.Println(buf.Bytes())
}
if ok {
commandDelivered(collectionCommands, insDoc.FullDocument.ID)
} else {
commandCancel(collectionCommands, insDoc.FullDocument.ID, errMsg)
log.Println("Commands - Command canceled!")
var tiType uint32 = uint32(insDoc.FullDocument.ProtocolSourceASDU)
err = binary.Write(buf, binary.LittleEndian, tiType)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var value uint32 = uint32(insDoc.FullDocument.Value)
err = binary.Write(buf, binary.LittleEndian, value)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var sbo uint32 = 0
if insDoc.FullDocument.ProtocolSourceCommandUseSBO {
sbo = 1
}
err = binary.Write(buf, binary.LittleEndian, sbo)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var qu uint32 = uint32(insDoc.FullDocument.ProtocolSourceCommandDuration)
err = binary.Write(buf, binary.LittleEndian, qu)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}
var ca uint32 = uint32(insDoc.FullDocument.ProtocolSourceCommonAddress)
err = binary.Write(buf, binary.LittleEndian, ca)
if err != nil {
commandCancel(collectionCommands, insDoc.FullDocument.ID, "udp buffer write error")
log.Println("Commands - binary.Write failed:", err)
continue
}

errMsg := ""
ok := false
for i, ipAddressDest := range protCon.IPAddresses {

if i >= 2 { // only send to the first 2 addresses
break
}

if strings.TrimSpace(ipAddressDest) == "" {
errMsg = "no IP destination"
continue
}
//udpAddr, err := net.ResolveUDPAddr("udp", ipAddressDest)
//if err != nil {
// errMsg = "IP address error"
// log.Println("Commands - Error on IP: ", err)
// continue
//}
//_, err = UdpConn.WriteToUDP(buf.Bytes(), udpAddr)
//if err != nil {
// errMsg = "UDP send error"
// log.Println("Commands - Error on IP: ", err)
// continue
//}
// success delivering command
log.Println("Commands - Command sent to: ", ipAddressDest)
ok = true
// log.Println(buf.Bytes())
}
if ok {
commandDelivered(collectionCommands, insDoc.FullDocument.ID)
} else {
commandCancel(collectionCommands, insDoc.FullDocument.ID, errMsg)
log.Println("Commands - Command canceled!")
}
}
}
}
Expand Down Expand Up @@ -428,6 +431,7 @@ func main() {
var err error
var collectionInstances, collectionConnections, collectionCommands *mongo.Collection
var csCommands *mongo.ChangeStream
someConnectionHasCommandsEnabled := false

instanceNumber := 1
if os.Getenv("JS_PLC4X_INSTANCE") != "" {
Expand Down Expand Up @@ -506,13 +510,14 @@ func main() {
checkFatalError(err)
log.Print("Mongodb - Connected to server.")

opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
csCommands, err = collectionCommands.Watch(context.TODO(), mongo.Pipeline{bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: "insert"},
},
},
}})
}}, opts)
checkFatalError(err)
defer csCommands.Close(context.TODO())

Expand Down Expand Up @@ -556,18 +561,14 @@ func main() {
continue
}

if protocolConn.CommandsEnabled {
someConnectionHasCommandsEnabled = true
}

// log connection info
log.Printf("Instance:%d Connection:%d %s", protocolConn.ProtocolDriverInstanceNumber, protocolConn.ProtocolConnectionNumber, protocolConn.Name)
log.Printf("Server endpoint URL: %s", protocolConn.EndpointURLs[0])

var waitGroup sync.WaitGroup
if protocolConn.CommandsEnabled {
waitGroup.Add(1)
routineCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go iterateChangeStream(routineCtx, &waitGroup, csCommands, protocolConn, collectionCommands)
}

// Get a connection to a remote PLC
connectionRequestChanel := driverManager.GetConnection(protocolConn.EndpointURLs[0])
// connectionRequestChanel := driverManager.GetConnection("opcua://opcua.demo-this.com:51210/UA/SampleServer?discovery=true&SecurityPolicy=None")
Expand Down Expand Up @@ -621,24 +622,48 @@ func main() {
log.Printf("error preparing read-request: %s", connectionResult.GetErr().Error())
return
}
}

// Execute a read-request
readResponseChanel := protocolConns[0].ReadRequest.Execute()
execRead := func() {
log.Println(protocolConn.Name + ": integrity read...")

// Execute a read-request
readResponseChanel := protocolConn.ReadRequest.Execute()

// Wait for the response to finish
readRequestResult := <-readResponseChanel
if readRequestResult.GetErr() != nil {
log.Printf("error executing read-request: %s", readRequestResult.GetErr().Error())
return
// Wait for the response to finish
readRequestResult := <-readResponseChanel
if readRequestResult.GetErr() != nil {
log.Printf(protocolConn.Name+": error executing read-request: %s", readRequestResult.GetErr().Error())
return
}

// Do something with the response
value1 := readRequestResult.GetResponse().GetValue("field1")
log.Printf(protocolConn.Name+": Result field1: %d\n", value1.GetInt16())
value5 := readRequestResult.GetResponse().GetValue("field5")
log.Printf(protocolConn.Name+": Result field5: %d\n", value5.GetInt16())
}
go func() {
execRead()
for range time.Tick(time.Millisecond * 100) {
execRead()
}
}()
}

// Do something with the response
value1 := readRequestResult.GetResponse().GetValue("field1")
log.Printf("\n\nResult field1: %d\n", value1.GetInt16())
value5 := readRequestResult.GetResponse().GetValue("field5")
log.Printf("\n\nResult field5: %d\n", value5.GetInt16())
var waitGroup sync.WaitGroup
if someConnectionHasCommandsEnabled {
waitGroup.Add(1)
routineCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go iterateChangeStream(routineCtx, &waitGroup, csCommands, protocolConns, collectionCommands)
}

// wait forever
for {
time.Sleep(10000)
}

// Make sure the connection is closed at the end
defer protocolConns[0].PlcConn.Close()

}

0 comments on commit df653de

Please sign in to comment.