Skip to content

Commit

Permalink
bugfix: prevent memory leak caused by time.After (#65)
Browse files Browse the repository at this point in the history
* bugfix: prevent memory leak caused by time.After

* fix golangci error

* update actions yaml
  • Loading branch information
ianchen0119 committed Apr 21, 2023
1 parent 405b2e0 commit 055371c
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 14 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ jobs:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2.4.0
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version-file: go.mod
cache: true
cache-dependency-path: go.sum
- name: golangci-lint
uses: golangci/golangci-lint-action@v2.5.2
uses: golangci/golangci-lint-action@v3
with:
version: v1.47.3
working-directory: pkg/pfcpsim
Expand Down
1 change: 1 addition & 0 deletions cmd/pfcpctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func main() {
parser := flags.NewNamedParser(path.Base(os.Args[0]),
flags.HelpFlag|flags.PassDoubleDash|flags.PassAfterNonOption)
_, err := parser.AddGroup("Global Options", "", &config.GlobalOptions)

if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/pfcpsim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func main() {
optHelp := getopt.BoolLong("help", 0, "Help")

getopt.Parse()

if *optHelp {
getopt.Usage()
os.Exit(0)
Expand Down
7 changes: 1 addition & 6 deletions internal/pfcpctl/commands/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,14 @@ var conn *grpc.ClientConn

func connect() pb.PFCPSimClient {
// Create an insecure gRPC Channel
var err error
conn, err = grpc.Dial(config.GlobalConfig.Server, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(config.GlobalConfig.Server, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Error dialing %v: %v", config.GlobalConfig.Server, err)
}

return pb.NewPFCPSimClient(conn)
}

func validateArgs(args *commonArgs) {

}

func disconnect() {
if conn != nil {
conn.Close()
Expand Down
3 changes: 3 additions & 0 deletions internal/pfcpctl/commands/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func RegisterServiceCommands(parser *flags.Parser) {

func (c *configureRemoteAddresses) Execute(args []string) error {
client := connect()

defer disconnect()

res, err := client.Configure(context.Background(), &pb.ConfigureRequest{
Expand All @@ -48,6 +49,7 @@ func (c *configureRemoteAddresses) Execute(args []string) error {

func (c *associate) Execute(args []string) error {
client := connect()

defer disconnect()

res, err := client.Associate(context.Background(), &pb.EmptyRequest{})
Expand All @@ -62,6 +64,7 @@ func (c *associate) Execute(args []string) error {

func (c *disassociate) Execute(args []string) error {
client := connect()

defer disconnect()

res, err := client.Disassociate(context.Background(), &pb.EmptyRequest{})
Expand Down
3 changes: 3 additions & 0 deletions internal/pfcpctl/commands/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (s *sessionCreate) Execute(args []string) error {
}

client := connect()

defer disconnect()

s.Args.validate()
Expand All @@ -90,6 +91,7 @@ func (s *sessionCreate) Execute(args []string) error {

func (s *sessionModify) Execute(args []string) error {
client := connect()

defer disconnect()

s.Args.validate()
Expand All @@ -115,6 +117,7 @@ func (s *sessionModify) Execute(args []string) error {

func (s *sessionDelete) Execute(args []string) error {
client := connect()

defer disconnect()

s.Args.validate()
Expand Down
2 changes: 2 additions & 0 deletions internal/pfcpsim/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func parseAppFilter(filter string) (string, uint8, uint32, error) {
proto, ipNetAddr, portRange, action, precedence := result[0], result[1], result[2], result[3], result[4]

var gateStatus uint8

switch action {
case "allow":
gateStatus = ie.GateStatusOpen
Expand Down Expand Up @@ -155,6 +156,7 @@ func parseAppFilter(filter string) (string, uint8, uint32, error) {
if lowerPort > upperPort {
return "", 0, 0, pfcpsim.NewInvalidFormatError("Port range. Lower port is greater than upper port")
}

return fmt.Sprintf(sdfFilterFormatWPort, proto, ipNetAddr, lowerPort, upperPort), gateStatus, precedenceUint, nil
} else {
return fmt.Sprintf(sdfFilterFormatWOPort, proto, ipNetAddr), gateStatus, precedenceUint, nil
Expand Down
10 changes: 9 additions & 1 deletion internal/pfcpsim/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (P pfcpSimService) Configure(ctx context.Context, request *pb.ConfigureRequ
if net.ParseIP(request.UpfN3Address) == nil {
errMsg := fmt.Sprintf("Error while parsing UPF N3 address: %v", request.UpfN3Address)
log.Error(errMsg)

return &pb.Response{}, status.Error(codes.Aborted, errMsg)
}
// remotePeerAddress is validated in pfcpsim
Expand All @@ -74,6 +75,7 @@ func (P pfcpSimService) Associate(ctx context.Context, empty *pb.EmptyRequest) (
if err := connectPFCPSim(); err != nil {
errMsg := fmt.Sprintf("Could not connect to remote peer :%v", err)
log.Error(errMsg)

return &pb.Response{}, status.Error(codes.Aborted, errMsg)
}
}
Expand Down Expand Up @@ -127,6 +129,7 @@ func (P pfcpSimService) CreateSession(ctx context.Context, request *pb.CreateSes
if err != nil {
errMsg := fmt.Sprintf(" Could not parse Address Pool: %v", err)
log.Error(errMsg)

return &pb.Response{}, status.Error(codes.Aborted, errMsg)
}

Expand Down Expand Up @@ -255,6 +258,7 @@ func (P pfcpSimService) CreateSession(ctx context.Context, request *pb.CreateSes
if err != nil {
return &pb.Response{}, status.Error(codes.Internal, err.Error())
}

insertSession(i, sess)
}

Expand All @@ -280,6 +284,7 @@ func (P pfcpSimService) ModifySession(ctx context.Context, request *pb.ModifySes
if len(activeSessions) < count {
err := pfcpsim.NewNotEnoughSessionsError()
log.Error(err)

return &pb.Response{}, status.Error(codes.Aborted, err.Error())
}

Expand Down Expand Up @@ -308,7 +313,7 @@ func (P pfcpSimService) ModifySession(ctx context.Context, request *pb.ModifySes
teid = 0 // When buffering, TEID = 0.
}

for _, _ = range request.AppFilters {
for range request.AppFilters {
downlinkFAR := session.NewFARBuilder().
WithID(ID). // Same FARID that was generated in create sessions
WithMethod(session.Update).
Expand All @@ -327,6 +332,7 @@ func (P pfcpSimService) ModifySession(ctx context.Context, request *pb.ModifySes
if !ok {
errMsg := fmt.Sprintf("Could not retrieve session with index %v", i)
log.Error(errMsg)

return &pb.Response{}, status.Error(codes.Internal, errMsg)
}

Expand Down Expand Up @@ -356,6 +362,7 @@ func (P pfcpSimService) DeleteSession(ctx context.Context, request *pb.DeleteSes
if len(activeSessions) < count {
err := pfcpsim.NewNotEnoughSessionsError()
log.Error(err)

return &pb.Response{}, status.Error(codes.Aborted, err.Error())
}

Expand All @@ -364,6 +371,7 @@ func (P pfcpSimService) DeleteSession(ctx context.Context, request *pb.DeleteSes
if !ok {
errMsg := "Session was nil. Check baseID"
log.Error(errMsg)

return &pb.Response{}, status.Error(codes.Aborted, errMsg)
}

Expand Down
26 changes: 21 additions & 5 deletions pkg/pfcpsim/pfcpsim.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,27 @@ func (c *PFCPClient) PeekNextHeartbeatResponse() (*message.HeartbeatResponse, er
// It's a blocking operation, which is timed out after c.responseTimeout period (5 seconds by default).
// Use SetPFCPResponseTimeout() to configure a custom timeout.
func (c *PFCPClient) PeekNextResponse() (message.Message, error) {
select {
case msg := <-c.recvChan:
return msg, nil
case <-time.After(c.responseTimeout):
return nil, NewTimeoutExpiredError()
var resMsg message.Message

var err error

delay := time.NewTimer(c.responseTimeout)

for {
select {
case msg := <-c.recvChan:
if !delay.Stop() {
<-delay.C
}

resMsg = msg
case <-delay.C:
if resMsg == nil {
err = NewTimeoutExpiredError()
}

return resMsg, err
}
}
}

Expand Down

0 comments on commit 055371c

Please sign in to comment.