Skip to content

Commit

Permalink
Merge pull request #396 from flaviocirillo/development
Browse files Browse the repository at this point in the history
Extended the bug fix for array of attribute values also to ngsiv1 not…
  • Loading branch information
smartfog committed Nov 29, 2022
2 parents 3ba9930 + 8fbb58f commit d4c7a88
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 45 deletions.
20 changes: 15 additions & 5 deletions broker/ngsiv1.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"io/ioutil"
"net/http"

"github.com/ant0ine/go-json-rest/rest"
Expand Down Expand Up @@ -88,17 +89,26 @@ func (tb *ThinBroker) NGSIV1_QueryContext(w rest.ResponseWriter, r *rest.Request
}

func (tb *ThinBroker) NGSIV1_NotifyContext(w rest.ResponseWriter, r *rest.Request) {

content, _ := ioutil.ReadAll(r.Body)
r.Body.Close()
// DEBUG.Println(string(content))

notifyCtxReq := NotifyContextRequest{}
err := r.DecodeJsonPayload(&notifyCtxReq)
if err != nil {
rest.Error(w, err.Error(), http.StatusInternalServerError)
return
}
notifyCtxReq.ParseNotifyContextRequest_HybridNGSI_NGSILD(content)

// err := r.DecodeJsonPayload(&notifyCtxReq)
// if err != nil {
// rest.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }

// send out the response
notifyCtxResp := NotifyContextResponse{}
w.WriteJson(&notifyCtxResp)

//DEBUG.Println(notifyCtxReq)

// inform its subscribers
for _, ctxResp := range notifyCtxReq.ContextResponses {
go tb.notifySubscribers(&ctxResp.ContextElement, "", false)
Expand Down
3 changes: 3 additions & 0 deletions broker/thinBroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,9 @@ func (tb *ThinBroker) UpdateContext2RemoteSite(ctxElem *ContextElement, updateAc

func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator string, checkSelectedAttributes bool) {
eid := ctxElem.Entity.ID

//DEBUG.Println(ctxElem)

tb.e2sub_lock.RLock()
defer tb.e2sub_lock.RUnlock()
subscriberList := tb.entityId2Subcriptions[eid]
Expand Down
4 changes: 1 addition & 3 deletions broker/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func postNotifyContext(ctxElems []ContextElement, subscriptionId string, URL string, DestinationBrokerType string, tenant string, httpsCfg *HTTPS) error {
INFO.Println("destionation protocol: ", DestinationBrokerType)
INFO.Println("destination protocol: ", DestinationBrokerType)

switch DestinationBrokerType {
case "NGSI-LD":
Expand Down Expand Up @@ -173,8 +173,6 @@ func postNGSILDUpsert(ctxElems []ContextElement, subscriptionId string, URL stri
return err
}

INFO.Println(string(body))

brokerURL := URL + "/ngsi-ld/v1/entityOperations/upsert"
req, err := http.NewRequest("POST", brokerURL, bytes.NewBuffer(body))
req.Header.Add("Content-Type", "application/json")
Expand Down
118 changes: 82 additions & 36 deletions common/ngsi/ngsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,8 +995,7 @@ func (updateCtxReq *UpdateContextRequest) ReadFromNGSILD(ngsildEntities []map[st
return counter
}

func (queryCtxResp *QueryContextResponse) parseQueryContextResponse_HybridNGSI_NGSILD(queryResponseBody []byte) {

func parseContextElementResponses_HybridNGSI_NGSILD(body []byte, messageType string) interface{} {
type InternalContextAttribute struct {
Name string `json:"name"`
Type string `json:"type,omitempty"`
Expand Down Expand Up @@ -1025,47 +1024,67 @@ func (queryCtxResp *QueryContextResponse) parseQueryContextResponse_HybridNGSI_N
ErrorCode StatusCode `json:"errorCode,omitempty"`
}

queryCtxRespInternal := InternalQueryContextResponseObject{}
_ = json.Unmarshal(queryResponseBody, &queryCtxRespInternal)
// DEBUG.Println(queryCtxRespInternal)
// err := json.Unmarshal(queryResponseBody, &queryCtxRespInternal)
// if err != nil {
// ERROR.Println(err)
// // return
// }

parsedQueryCtxResp := QueryContextResponse{}
_ = json.Unmarshal(queryResponseBody, &parsedQueryCtxResp)
// DEBUG.Println(parsedQueryCtxResp)
// err = json.Unmarshal(queryResponseBody, &parsedQueryCtxResp)
// if err != nil {
// ERROR.Println(err)
// // return
// }

// DEBUG.Println(queryCtxRespInternal)
// for _, ctxResp := range queryCtxResp.ContextResponses {
// for _, contextAttribute := range ctxResp.ContextElement.Attributes {
// if contextAttribute.Type == "object" && contextAttribute.Value == nil {

// }
// }
// }

//return (string)queryCtxResp1

queryCtxResp.ErrorCode = queryCtxRespInternal.ErrorCode
type InternatlnotifyContextRequest struct {
SubscriptionId string `json:"subscriptionId"`
Originator string `json:"originator"`
ContextResponses []InternalContextElementResponse `json:"contextResponses,omitempty"`
}

var queryCtxResp QueryContextResponse
var notifyCtxReq NotifyContextRequest

var internalContextResponses []InternalContextElementResponse
var parsedContextResponses []ContextElementResponse

switch messageType {
case "QueryContextResponse":

queryCtxResp = QueryContextResponse{}

queryCtxRespInternal := InternalQueryContextResponseObject{}
_ = json.Unmarshal(body, &queryCtxRespInternal)

internalContextResponses = queryCtxRespInternal.ContextResponses

parsedQueryCtxResp := QueryContextResponse{}
_ = json.Unmarshal(body, &parsedQueryCtxResp)

parsedContextResponses = parsedQueryCtxResp.ContextResponses

queryCtxResp.ErrorCode = queryCtxRespInternal.ErrorCode

case "NotifyContextRequest":
notifyCtxReq = NotifyContextRequest{}

notifyCtxReqInternal := InternatlnotifyContextRequest{}
_ = json.Unmarshal(body, &notifyCtxReqInternal)

internalContextResponses = notifyCtxReqInternal.ContextResponses

parsedNotifyCtxReq := NotifyContextRequest{}
_ = json.Unmarshal(body, &parsedNotifyCtxReq)

parsedContextResponses = parsedNotifyCtxReq.ContextResponses

notifyCtxReq.Originator = notifyCtxReqInternal.Originator
notifyCtxReq.SubscriptionId = notifyCtxReqInternal.SubscriptionId

default:
fmt.Println("unknown")
return nil
}

contextResponses := make([]ContextElementResponse, 0)

for ctxResp_index, ctxRespInternal := range queryCtxRespInternal.ContextResponses {
for ctxResp_index, ctxRespInternal := range internalContextResponses {
ctxResp := ContextElementResponse{}

ctxResp.StatusCode = ctxRespInternal.StatusCode

ctxElement := ContextElement{}
ctxElement.Entity = ctxRespInternal.ContextElement.Entity
//ctxElement.ID = ctxRespInternal.ContextElement.Entity.ID
ctxElement.Entity = (parsedQueryCtxResp.ContextResponses)[ctxResp_index].ContextElement.Entity
ctxElement.Entity = parsedContextResponses[ctxResp_index].ContextElement.Entity
//ctxElement.Type = ctxRespInternal.ContextElement.Type
//ctxElement.IsPattern = ctxRespInternal.ContextElement.IsPattern
ctxElement.Metadata = ctxRespInternal.ContextElement.Metadata
Expand All @@ -1081,7 +1100,7 @@ func (queryCtxResp *QueryContextResponse) parseQueryContextResponse_HybridNGSI_N
if attributeInternal.Value != nil {
ctxAttribute.Value = attributeInternal.Value
} else {
ctxAttribute.Value = ((parsedQueryCtxResp.ContextResponses)[ctxResp_index].ContextElement.Attributes)[attribute_index].Value
ctxAttribute.Value = (parsedContextResponses[ctxResp_index].ContextElement.Attributes)[attribute_index].Value
}

ctxElement.Attributes = append(ctxElement.Attributes, ctxAttribute)
Expand All @@ -1092,7 +1111,34 @@ func (queryCtxResp *QueryContextResponse) parseQueryContextResponse_HybridNGSI_N
contextResponses = append(contextResponses, ctxResp)
}

queryCtxResp.ContextResponses = contextResponses
switch messageType {
case "QueryContextResponse":

queryCtxResp.ContextResponses = contextResponses
return queryCtxResp

case "NotifyContextRequest":
notifyCtxReq.ContextResponses = contextResponses
return notifyCtxReq

default:
fmt.Println("unknown")
return nil
}

}

func (queryCtxResp *QueryContextResponse) ParseQueryContextResponse_HybridNGSI_NGSILD(queryResponseBody []byte) {
parsedQueryCtxResp := parseContextElementResponses_HybridNGSI_NGSILD(queryResponseBody, "QueryContextResponse").(QueryContextResponse)
queryCtxResp.ErrorCode = parsedQueryCtxResp.ErrorCode
queryCtxResp.ContextResponses = parsedQueryCtxResp.ContextResponses
}

func (notifyCtxReq *NotifyContextRequest) ParseNotifyContextRequest_HybridNGSI_NGSILD(notifyRequestBody []byte) {
parsedNotifyContextRequest := parseContextElementResponses_HybridNGSI_NGSILD(notifyRequestBody, "NotifyContextRequest").(NotifyContextRequest)
notifyCtxReq.SubscriptionId = parsedNotifyContextRequest.SubscriptionId
notifyCtxReq.Originator = parsedNotifyContextRequest.Originator
notifyCtxReq.ContextResponses = parsedNotifyContextRequest.ContextResponses
}

func (updateCtxReq *UpdateContextRequest) ReadFromNGSIv2(ngsiv2Entity map[string]interface{}) int {
Expand Down
2 changes: 1 addition & 1 deletion common/ngsi/ngsiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (nc *NGSI10Client) InternalQueryContext(query *QueryContextRequest) ([]Cont
// }

queryCtxResp := QueryContextResponse{}
queryCtxResp.parseQueryContextResponse_HybridNGSI_NGSILD(text)
queryCtxResp.ParseQueryContextResponse_HybridNGSI_NGSILD(text)

ctxElements := make([]ContextElement, 0)
for _, contextElementResponse := range queryCtxResp.ContextResponses {
Expand Down

0 comments on commit d4c7a88

Please sign in to comment.