/
ul.go
155 lines (142 loc) · 5.43 KB
/
ul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// package servce implements S6a GRPC proxy service which sends AIR, ULR messages over diameter connection,
// waits (blocks) for diameter's AIAs, ULAs & returns their RPC representation
package service
import (
"log"
"time"
"github.com/fiorix/go-diameter/diam"
"github.com/fiorix/go-diameter/diam/avp"
"github.com/fiorix/go-diameter/diam/datatype"
"github.com/fiorix/go-diameter/diam/dict"
"github.com/fiorix/go-diameter/diam/sm/smpeer"
"github.com/fiorix/go-diameter/examples/s6a_proxy/protos"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// sendULR - sends ULR with given Session ID (sid)
func (s *s6aProxy) sendULR(sid string, req *protos.UpdateLocationRequest) error {
c := s.conn
meta, ok := smpeer.FromContext(c.Context())
if !ok {
return Errorf(codes.Internal, "peer metadata unavailable for ULR")
}
m := diam.NewRequest(diam.UpdateLocation, diam.TGPP_S6A_APP_ID, dict.Default)
m.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(sid))
m.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity(s.cfg.Host))
m.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity(s.cfg.Realm))
m.NewAVP(avp.DestinationRealm, avp.Mbit, 0, meta.OriginRealm)
m.NewAVP(avp.DestinationHost, avp.Mbit, 0, meta.OriginHost)
m.NewAVP(avp.UserName, avp.Mbit, 0, datatype.UTF8String(req.UserName))
m.NewAVP(avp.AuthSessionState, avp.Mbit, 0, datatype.Enumerated(1))
m.NewAVP(avp.RATType, avp.Mbit, VENDOR_3GPP, datatype.Enumerated(ULR_RAT_TYPE))
m.NewAVP(avp.ULRFlags, avp.Vbit|avp.Mbit, VENDOR_3GPP, datatype.Unsigned32(ULR_FLAGS))
m.NewAVP(avp.VisitedPLMNID, avp.Vbit|avp.Mbit, VENDOR_3GPP, datatype.OctetString(req.VisitedPlmn))
_, err := m.WriteTo(c)
if err != nil {
err = Error(codes.DataLoss, err)
}
return err
}
// S6a ULA
func handleULA(s *s6aProxy) diam.HandlerFunc {
return func(c diam.Conn, m *diam.Message) {
var ula ULA
err := m.Unmarshal(&ula)
if err != nil {
log.Printf("ULA Unmarshal failed for remote %s & message %s: %s", c.RemoteAddr(), m, err)
return
}
s.sessionsMu.Lock()
ch, ok := s.sessions[ula.SessionID]
if ok {
delete(s.sessions, ula.SessionID)
s.sessionsMu.Unlock()
ch <- &ula
} else {
s.sessionsMu.Unlock()
log.Printf("ULA SessionID %s not found. Message: %s, Remote: %s", ula.SessionID, m, c.RemoteAddr())
}
}
}
// UpdateLocation sends ULR (Code 316) over diameter connection,
// waits (blocks) for ULAA & returns its RPC representation
func (s *s6aProxy) UpdateLocationImpl(req *protos.UpdateLocationRequest) (*protos.UpdateLocationAnswer, error,
) {
res := &protos.UpdateLocationAnswer{}
if req == nil {
return res, Errorf(codes.InvalidArgument, "Nil UL Request")
}
sid := genSID()
ch := make(chan interface{})
s.updateSession(sid, ch)
var (
err error
retries int = MAX_DIAM_RETRIES
c diam.Conn
)
for ; retries >= 0; retries-- {
c, err = s.acquireConnection()
if err != nil {
s.releaseConnection()
s.cleanupSession(sid)
log.Printf("Cannot connect to %s://%s; %v", s.cfg.Protocol, s.cfg.HssAddr, err)
return res, Error(codes.Unavailable, err)
}
err = s.sendULR(sid, req)
s.releaseConnection() // we can unlock reader after send
if err != nil {
log.Printf("Error sending ULR with SID %s: %v", sid, err)
if status, ok := status.FromError(err); ok && status != nil && status.Code() == codes.DataLoss {
s.cleanupConn(c)
continue
}
}
break
}
if err == nil {
select {
case resp, open := <-ch:
if open {
ula, ok := resp.(*ULA)
if ok {
err = TranslateBaseDiamResultCode(ula.ResultCode)
res.ErrorCode = protos.ErrorCode(ula.ExperimentalResult.ExperimentalResultCode)
res.DefaultContextId = ula.SubscriptionData.APNConfigurationProfile.ContextIdentifier
res.TotalAmbr = &protos.UpdateLocationAnswer_AggregatedMaximumBitrate{
MaxBandwidthUl: ula.SubscriptionData.AMBR.MaxRequestedBandwidthUL,
MaxBandwidthDl: ula.SubscriptionData.AMBR.MaxRequestedBandwidthDL,
}
res.AllApnsIncluded =
ula.SubscriptionData.APNConfigurationProfile.AllAPNConfigurationsIncludedIndicator == 0
for _, apnCfg := range ula.SubscriptionData.APNConfigurationProfile.APNConfigs {
res.Apn = append(
res.Apn,
&protos.UpdateLocationAnswer_APNConfiguration{
ContextId: apnCfg.ContextIdentifier,
ServiceSelection: apnCfg.ServiceSelection,
QosProfile: &protos.UpdateLocationAnswer_APNConfiguration_QoSProfile{
ClassId: apnCfg.EPSSubscribedQoSProfile.QoSClassIdentifier,
PriorityLevel: apnCfg.EPSSubscribedQoSProfile.AllocationRetentionPriority.PriorityLevel,
PreemptionCapability: apnCfg.EPSSubscribedQoSProfile.AllocationRetentionPriority.PreemptionCapability == 0,
PreemptionVulnerability: apnCfg.EPSSubscribedQoSProfile.AllocationRetentionPriority.PreemptionVulnerability == 0,
},
Ambr: &protos.UpdateLocationAnswer_AggregatedMaximumBitrate{
MaxBandwidthUl: apnCfg.AMBR.MaxRequestedBandwidthUL,
MaxBandwidthDl: apnCfg.AMBR.MaxRequestedBandwidthDL,
},
})
}
return res, err
} else {
err = Errorf(codes.Internal, "Invalid Response Type: %T, ULA expected.", resp)
}
} else {
err = Errorf(codes.Aborted, "ULR for Session ID: %s is canceled", sid)
}
case <-time.After(time.Second * TIMEOUT_SECONDS):
err = Errorf(codes.DeadlineExceeded, "ULR Timed Out for Session ID: %s", sid)
}
}
s.cleanupSession(sid)
return res, err
}