Skip to content

Commit

Permalink
Merge pull request #79 from negbie/master
Browse files Browse the repository at this point in the history
Correlate inverse RTCP flow too
  • Loading branch information
negbie committed Oct 15, 2018
2 parents 9379482 + 9359a71 commit 9c64df0
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 30 deletions.
72 changes: 46 additions & 26 deletions decoder/correlator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,37 +88,57 @@ func cacheSDPIPPort(payload []byte) {
// First it will look inside the longlive RTCPCache with the ssrc as key.
// If it can't find a value it will look inside the shortlive SDPCache with (SDPIP+RTCPPort) as key.
// If it finds a value inside the SDPCache it will add it to the RTCPCache with the ssrc as key.
func correlateRTCP(srcIP net.IP, srcPort uint16, payload []byte) ([]byte, []byte, byte) {
srcIPString := srcIP.String()
srcPortString := strconv.Itoa(int(srcPort))
keySDP := []byte(srcIPString + srcPortString)
func correlateRTCP(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16, payload []byte) ([]byte, []byte) {

keyRTCP, jsonRTCP, info := protos.ParseRTCP(payload)
if info != "" {
logp.Debug("rtcp", "ssrc=%d, srcIP=%s, srcPort=%s, %s", keyRTCP, srcIPString, srcPortString, info)
logp.Debug("rtcp", "ssrc=%x, srcIP=%s, srcPort=%d, dstIP=%s, dstPort=%d, %s",
keyRTCP, srcIP, srcPort, dstIP, dstPort, info)
if jsonRTCP == nil {
return nil, nil, 0
return nil, nil
}
}

if corrID, err := RTCPCache.Get(keyRTCP); err == nil && keyRTCP != nil {
logp.Debug("rtcp", "Found '%d:%s' in RTCPCache srcIP=%s, srcPort=%s, payload=%s", keyRTCP, corrID, srcIPString, srcPortString, jsonRTCP)
return jsonRTCP, corrID, 5
} else if corrID, err := SDPCache.Get(keySDP); err == nil {
logp.Debug("rtcp", "Found '%s:%s' in SDPCache srcIP=%s, srcPort=%s, payload=%s", keySDP, corrID, srcIPString, srcPortString, jsonRTCP)
logp.Debug("rtcp", "Found '%x:%s' in RTCPCache srcIP=%s, srcPort=%d, dstIP=%s, dstPort=%d, payload=%s",
keyRTCP, corrID, srcIP, srcPort, dstIP, dstPort, jsonRTCP)
return jsonRTCP, corrID
}

srcIPString := srcIP.String()
srcPortString := strconv.Itoa(int(srcPort))
srcKey := []byte(srcIPString + srcPortString)
if corrID, err := SDPCache.Get(srcKey); err == nil {
logp.Debug("rtcp", "Found '%s:%s' in SDPCache srcIP=%s, srcPort=%s, payload=%s",
srcKey, corrID, srcIPString, srcPortString, jsonRTCP)
err = RTCPCache.Set(keyRTCP, corrID, 21600)
if err != nil {
logp.Warn("%v", err)
return nil, nil
}
return jsonRTCP, corrID
}

dstIPString := dstIP.String()
dstPortString := strconv.Itoa(int(dstPort))
dstKey := []byte(dstIPString + dstPortString)
if corrID, err := SDPCache.Get(dstKey); err == nil {
logp.Debug("rtcp", "Found '%s:%s' in SDPCache dstIP=%s, dstPort=%s, payload=%s",
dstKey, corrID, dstIPString, dstPortString, jsonRTCP)
err = RTCPCache.Set(keyRTCP, corrID, 21600)
if err != nil {
logp.Warn("%v", err)
return nil, nil, 0
return nil, nil
}
return jsonRTCP, corrID, 5
return jsonRTCP, corrID
}

logp.Debug("rtcp", "No correlationID for srcIP=%s, srcPort=%s, payload=%s", srcIPString, srcPortString, jsonRTCP)
return nil, nil, 0
logp.Debug("rtcp", "No correlationID for srcIP=%s, srcPort=%s, dstIP=%s, dstPort=%s, payload=%s",
srcIPString, srcPortString, dstIPString, dstPortString, jsonRTCP)
return nil, nil
}

func correlateLOG(payload []byte) ([]byte, []byte, byte) {
func correlateLOG(payload []byte) ([]byte, []byte) {
var callID []byte
if posID := bytes.Index(payload, []byte("ID=")); posID > 0 {
restID := payload[posID:]
Expand All @@ -129,11 +149,11 @@ func correlateLOG(payload []byte) ([]byte, []byte, byte) {
callID = restID[3:]
} else {
logp.Debug("log", "No end or fishy Call-ID in '%s'", restID)
return nil, nil, 0
return nil, nil
}
if callID != nil {
logp.Debug("log", "Found CallID: %s in Logline: '%s'", callID, payload)
return payload, callID, 100
return payload, callID

}
} else if posID := bytes.Index(payload, []byte(": [")); posID > 0 {
Expand All @@ -144,21 +164,21 @@ func correlateLOG(payload []byte) ([]byte, []byte, byte) {
callID = restID[len(": ["):posRestID]
} else {
logp.Debug("log", "No end or fishy Call-ID in '%s'", restID)
return nil, nil, 0
return nil, nil
}
if len(callID) > 4 && len(callID) < 80 {
logp.Debug("log", "Found CallID: %s in Logline: '%s'", callID, payload)
return payload, callID, 100
return payload, callID
}
}
return nil, nil, 0
return nil, nil
}

func correlateNG(payload []byte) ([]byte, []byte, byte) {
func correlateNG(payload []byte) ([]byte, []byte) {
cookie, rawNG, err := unmarshalNG(payload)
if err != nil {
logp.Warn("%v", err)
return nil, nil, 0
return nil, nil
}
switch rawTypes := rawNG.(type) {
case map[string]interface{}:
Expand All @@ -168,22 +188,22 @@ func correlateNG(payload []byte) ([]byte, []byte, byte) {
err = SIPCache.Set(cookie, callid, 10)
if err != nil {
logp.Warn("%v", err)
return nil, nil, 0
return nil, nil
}
}

if rawMapKey == "SSRC" {
data, err := json.Marshal(&rawMapValue)
if err != nil {
logp.Warn("%v", err)
return nil, nil, 0
return nil, nil
}
if corrID, err := SIPCache.Get(cookie); err == nil {
logp.Debug("ng", "Found CallID: %s and QOS stats: %s", string(corrID), string(data))
return data, corrID, 100
return data, corrID
}
}
}
}
return nil, nil, 0
return nil, nil
}
9 changes: 6 additions & 3 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,16 @@ func (d *Decoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *l

if config.Cfg.Mode == "SIPLOG" {
if udp.DstPort == 514 {
pkt.Payload, pkt.CID, pkt.ProtoType = correlateLOG(udp.Payload)
pkt.Payload, pkt.CID = correlateLOG(udp.Payload)
if pkt.Payload != nil && pkt.CID != nil {
pkt.ProtoType = 100
PacketQueue <- pkt
}
return
} else if udp.SrcPort == 2223 || udp.DstPort == 2223 {
pkt.Payload, pkt.CID, pkt.ProtoType = correlateNG(udp.Payload)
pkt.Payload, pkt.CID = correlateNG(udp.Payload)
if pkt.Payload != nil {
pkt.ProtoType = 100
PacketQueue <- pkt
}
return
Expand All @@ -310,8 +312,9 @@ func (d *Decoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *l
cacheSDPIPPort(udp.Payload)
if (udp.Payload[0]&0xc0)>>6 == 2 {
if (udp.Payload[1] == 200 || udp.Payload[1] == 201 || udp.Payload[1] == 207) && udp.SrcPort%2 != 0 && udp.DstPort%2 != 0 {
pkt.Payload, pkt.CID, pkt.ProtoType = correlateRTCP(pkt.SrcIP, pkt.SrcPort, udp.Payload)
pkt.Payload, pkt.CID = correlateRTCP(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, udp.Payload)
if pkt.Payload != nil {
pkt.ProtoType = 5
atomic.AddUint64(&d.rtcpCount, 1)
PacketQueue <- pkt
return
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
//_ "github.com/mkevac/debugcharts"
)

const version = "heplify 1.41"
const version = "heplify 1.45"

func init() {

Expand Down

0 comments on commit 9c64df0

Please sign in to comment.