Skip to content

Commit

Permalink
fix struct of PCRptMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
trustywolf committed Feb 18, 2023
1 parent ae39b4f commit 55d7b07
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 91 deletions.
206 changes: 130 additions & 76 deletions pkg/packet/pcep/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func NewKeepaliveMessage() (*KeepaliveMessage, error) {
return m, nil
}

// PCRpt Message
type PCRptMessage struct {
type StateReport struct {
SrpObject *SrpObject
LspObject *LspObject
EroObject *EroObject
Expand All @@ -147,65 +146,8 @@ type PCRptMessage struct {
VendorInformationObject *VendorInformationObject
}

func (m *PCRptMessage) DecodeFromBytes(messageBody []uint8) error {
// TODO: Supports multiple <state-report>'s stacked PCRpt Message.
// https://datatracker.ietf.org/doc/html/rfc8231#section-6.1
// Currently, when more than 2 <state-report> come in, One object has multiple object information.
var commonObjectHeader CommonObjectHeader
if err := commonObjectHeader.DecodeFromBytes(messageBody); err != nil {
return err
}

switch commonObjectHeader.ObjectClass {
case OC_BANDWIDTH:
bandwidthObject := &BandwidthObject{}
if err := bandwidthObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
m.BandwidthObjects = append(m.BandwidthObjects, bandwidthObject)
case OC_METRIC:
metricObject := &MetricObject{}
if err := metricObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
m.MetricObjects = append(m.MetricObjects, metricObject)
case OC_ERO:
if err := m.EroObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_LSPA:
if err := m.LspaObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_LSP:
if err := m.LspObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_SRP:
if err := m.SrpObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_ASSOCIATION:
if err := m.AssociationObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_VENDOR_INFORMATION:
if err := m.VendorInformationObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
default:
}

if int(commonObjectHeader.ObjectLength) < len(messageBody) {
if err := m.DecodeFromBytes(messageBody[commonObjectHeader.ObjectLength:]); err != nil {
return err
}
}
return nil
}

func NewPCRptMessage() *PCRptMessage {
m := &PCRptMessage{
func NewStateReport() (*StateReport, error) {
sr := &StateReport{
SrpObject: &SrpObject{},
LspObject: &LspObject{},
EroObject: &EroObject{},
Expand All @@ -215,30 +157,128 @@ func NewPCRptMessage() *PCRptMessage {
AssociationObject: &AssociationObject{},
VendorInformationObject: &VendorInformationObject{},
}
return m
return sr, nil
}

func (m *PCRptMessage) ToSRPolicy(pcc PccType) table.SRPolicy {
func (sr *StateReport) ToSRPolicy(pcc PccType) table.SRPolicy {
srPolicy := table.SRPolicy{
PlspId: m.LspObject.PlspId,
Name: m.LspObject.Name,
PlspId: sr.LspObject.PlspId,
Name: sr.LspObject.Name,
SegmentList: []table.Segment{},
SrcAddr: m.LspObject.SrcAddr,
DstAddr: m.LspObject.DstAddr,
SrcAddr: sr.LspObject.SrcAddr,
DstAddr: sr.LspObject.DstAddr,
}
if pcc == CISCO_LEGACY {
srPolicy.Color = m.VendorInformationObject.Color()
srPolicy.Preference = m.VendorInformationObject.Preference()
srPolicy.Color = sr.VendorInformationObject.Color()
srPolicy.Preference = sr.VendorInformationObject.Preference()
} else {
srPolicy.Color = m.AssociationObject.Color()
srPolicy.Preference = m.AssociationObject.Preference()
srPolicy.Color = sr.AssociationObject.Color()
srPolicy.Preference = sr.AssociationObject.Preference()
}

srPolicy.SegmentList = m.EroObject.ToSegmentList()
srPolicy.SegmentList = sr.EroObject.ToSegmentList()

return srPolicy
}

// PCRpt Message
type PCRptMessage struct {
StateReports []*StateReport
}

func (m *PCRptMessage) DecodeFromBytes(messageBody []uint8) error {
// To determine the delimitation of StateReports from the order of object classes
var previousOC uint8
var sr *StateReport
for len(messageBody) != 0 {
var commonObjectHeader CommonObjectHeader
if err := commonObjectHeader.DecodeFromBytes(messageBody); err != nil {
return err
}
switch commonObjectHeader.ObjectClass {
case OC_BANDWIDTH:
previousOC = OC_BANDWIDTH
bandwidthObject := &BandwidthObject{}
if err := bandwidthObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
sr.BandwidthObjects = append(sr.BandwidthObjects, bandwidthObject)
case OC_METRIC:
previousOC = OC_METRIC
metricObject := &MetricObject{}
if err := metricObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
sr.MetricObjects = append(sr.MetricObjects, metricObject)
case OC_ERO:
previousOC = OC_METRIC
if err := sr.EroObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_LSPA:
previousOC = OC_LSPA
if err := sr.LspaObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_LSP:
if previousOC != OC_SRP {
// If sr is not zero value, this StateReport is already updated.
var err error
if sr != nil {
m.StateReports = append(m.StateReports, sr)
}
sr, err = NewStateReport()
if err != nil {
return err
}
}

previousOC = OC_LSP
if err := sr.LspObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_SRP:
// If sr is not zero value, this StateReport is already updated.
var err error
if sr != nil {
m.StateReports = append(m.StateReports, sr)
}
sr, err = NewStateReport()
if err != nil {
return err
}

previousOC = OC_SRP
if err = sr.SrpObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_ASSOCIATION:
previousOC = OC_ASSOCIATION
if err := sr.AssociationObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
case OC_VENDOR_INFORMATION:
previousOC = OC_VENDOR_INFORMATION
if err := sr.VendorInformationObject.DecodeFromBytes(messageBody[COMMON_OBJECT_HEADER_LENGTH:commonObjectHeader.ObjectLength]); err != nil {
return err
}
default:
}
messageBody = messageBody[commonObjectHeader.ObjectLength:]
}
if sr != nil {
m.StateReports = append(m.StateReports, sr)
}
return nil
}

func NewPCRptMessage() *PCRptMessage {
m := &PCRptMessage{
StateReports: []*StateReport{},
}
return m
}

// PCInitiate Message
type PCInitiateMessage struct {
SrpObject *SrpObject
Expand All @@ -254,15 +294,22 @@ func (m *PCInitiateMessage) Serialize() ([]uint8, error) {
if err != nil {
return nil, err
}
endpointsObjectLength, err := m.EndpointsObject.getByteLength()
if err != nil {
return nil, err
}
pcinitiateMessageLength := COMMON_HEADER_LENGTH +
m.SrpObject.getByteLength() +
m.LspObject.getByteLength() +
m.EndpointsObject.getByteLength() +
endpointsObjectLength +
eroObjectLength

byteSrpObject := m.SrpObject.Serialize()
byteLspObject := m.LspObject.Serialize()
byteEndpointsObject := m.EndpointsObject.Serialize()
byteEndpointsObject, err := m.EndpointsObject.Serialize()
if err != nil {
return nil, err
}
byteEroObject, err := m.EroObject.Serialize()
if err != nil {
return nil, err
Expand All @@ -272,8 +319,15 @@ func (m *PCInitiateMessage) Serialize() ([]uint8, error) {
byteAssociationObject := []uint8{}

if m.AssociationObject != nil {
byteAssociationObject = append(byteAssociationObject, m.AssociationObject.Serialize()...)
pcinitiateMessageLength += m.AssociationObject.getByteLength()
byteAssociationObject, err = m.AssociationObject.Serialize()
if err != nil {
return nil, err
}
associationObjectLength, err := m.AssociationObject.getByteLength()
if err != nil {
return nil, err
}
pcinitiateMessageLength += associationObjectLength
}
if m.VendorInformationObject != nil {
byteVendorInformationObject = append(byteVendorInformationObject, m.VendorInformationObject.Serialize()...)
Expand Down
33 changes: 18 additions & 15 deletions pkg/server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,24 +179,27 @@ func (ss *Session) ReceivePcepMessage() error {
if err := pcrptMessage.DecodeFromBytes(bytePcrptMessageBody); err != nil {
return err
}
if pcrptMessage.LspObject.SFlag {
// During LSP state synchronization (RFC8231 5.6)
srPolicy := pcrptMessage.ToSRPolicy(ss.pccType)
ss.logger.Info("Synchronize SR Policy information", zap.String("session", ss.peerAddr.String()), zap.Any("SRPolicy", srPolicy), zap.Any("Message", pcrptMessage))
go ss.RegisterSRPolicy(srPolicy)
} else if !pcrptMessage.LspObject.SFlag {
if pcrptMessage.LspObject.PlspId == 0 {
// End of synchronization (RFC8231 5.6)
ss.logger.Info("Finish PCRpt state synchronization", zap.String("session", ss.peerAddr.String()))
ss.isSynced = true
} else if pcrptMessage.SrpObject.SrpId != 0 {
// Response to PCInitiate/PCUpdate (RFC8231 7.2)
srPolicy := pcrptMessage.ToSRPolicy(ss.pccType)
ss.logger.Info("Finish Stateful PCE request", zap.String("session", ss.peerAddr.String()), zap.Uint32("srpId", pcrptMessage.SrpObject.SrpId))
for _, sr := range pcrptMessage.StateReports {
if sr.LspObject.SFlag {
// During LSP state synchronization (RFC8231 5.6)
srPolicy := sr.ToSRPolicy(ss.pccType)
ss.logger.Info("Synchronize SR Policy information", zap.String("session", ss.peerAddr.String()), zap.Any("SRPolicy", srPolicy), zap.Any("Message", pcrptMessage))
go ss.RegisterSRPolicy(srPolicy)
} else if !sr.LspObject.SFlag {
if sr.LspObject.PlspId == 0 {
// End of synchronization (RFC8231 5.6)
ss.logger.Info("Finish PCRpt state synchronization", zap.String("session", ss.peerAddr.String()))
ss.isSynced = true
} else if sr.SrpObject.SrpId != 0 {
// Response to PCInitiate/PCUpdate (RFC8231 7.2)
srPolicy := sr.ToSRPolicy(ss.pccType)
ss.logger.Info("Finish Stateful PCE request", zap.String("session", ss.peerAddr.String()), zap.Uint32("srpId", sr.SrpObject.SrpId))
go ss.RegisterSRPolicy(srPolicy)
}
// TODO: Need to implementation of PCUpdate for Passive stateful PCE
}
// TODO: Need to implementation of PCUpdate for Passive stateful PCE
}

case pcep.MT_ERROR:
ss.logger.Info("Received PCErr", zap.String("session", ss.peerAddr.String()))
// TODO: Display error details
Expand Down

0 comments on commit 55d7b07

Please sign in to comment.