Skip to content

Commit

Permalink
fix: daemon service client memory leak (#161)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Sep 7, 2022
1 parent bfe9669 commit 5f5b2df
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 1 deletion.
6 changes: 6 additions & 0 deletions controllers/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,9 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
if err != nil {
return true, err
}
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err := daemonClient.IsDrained(ctx, pl.Name)
if err != nil {
return true, err
Expand Down Expand Up @@ -634,5 +637,8 @@ func (r *pipelineReconciler) safeToDelete(ctx context.Context, pl *dfv1.Pipeline
if err != nil {
return false, err
}
defer func() {
_ = daemonClient.Close()
}()
return daemonClient.IsDrained(ctx, pl.Name)
}
3 changes: 3 additions & 0 deletions controllers/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
if err != nil {
return fmt.Errorf("failed to get daemon service client for pipeline %s, %w", pl.Name, err)
}
defer func() {
_ = dClient.Close()
}()
vMetrics, err := dClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name)
if err != nil {
return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err)
Expand Down
11 changes: 10 additions & 1 deletion pkg/daemon/client/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type DaemonClient struct {
client daemon.DaemonServiceClient
conn *grpc.ClientConn
}

func NewDaemonServiceClient(address string) (*DaemonClient, error) {
Expand All @@ -23,7 +24,15 @@ func NewDaemonServiceClient(address string) (*DaemonClient, error) {
return nil, err
}
daemonClient := daemon.NewDaemonServiceClient(conn)
return &DaemonClient{client: daemonClient}, nil
return &DaemonClient{conn: conn, client: daemonClient}, nil
}

// Close function closes the gRPC connection, it has to be called after a daemon client has finished all its jobs.
func (dc *DaemonClient) Close() error {
if dc.conn != nil {
return dc.conn.Close()
}
return nil
}

func (dc *DaemonClient) IsDrained(ctx context.Context, pipeline string) (bool, error) {
Expand Down
12 changes: 12 additions & 0 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func (h *handler) ListPipelineEdges(c *gin.Context) {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
defer func() {
_ = client.Close()
}()
l, err := client.ListPipelineBuffers(context.Background(), pipeline)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
Expand All @@ -248,6 +251,9 @@ func (h *handler) GetPipelineEdge(c *gin.Context) {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
defer func() {
_ = client.Close()
}()
// Assume edge is the buffer name
i, err := client.GetPipelineBuffer(context.Background(), pipeline, c.Param("edge"))
if err != nil {
Expand All @@ -267,6 +273,9 @@ func (h *handler) GetVertexMetrics(c *gin.Context) {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
defer func() {
_ = client.Close()
}()
l, err := client.GetVertexMetrics(context.Background(), pipeline, vertex)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
Expand All @@ -285,6 +294,9 @@ func (h *handler) GetVertexWatermark(c *gin.Context) {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
defer func() {
_ = client.Close()
}()
l, err := client.GetVertexWatermark(context.Background(), pipeline, vertex)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
Expand Down
6 changes: 6 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (s *FunctionalSuite) TestCreateSimplePipeline() {
// Test Daemon service with gRPC
client, err := daemonclient.NewDaemonServiceClient("localhost:1234")
assert.NoError(s.T(), err)
defer func() {
_ = client.Close()
}()
buffers, err := client.ListPipelineBuffers(context.Background(), pipelineName)
assert.NoError(s.T(), err)
assert.Equal(s.T(), 2, len(buffers))
Expand Down Expand Up @@ -212,6 +215,9 @@ func (s *FunctionalSuite) TestWatermarkEnabled() {
// Test Daemon service with gRPC
client, err := daemonclient.NewDaemonServiceClient("localhost:1234")
assert.NoError(s.T(), err)
defer func() {
_ = client.Close()
}()
buffers, err := client.ListPipelineBuffers(context.Background(), pipelineName)
assert.NoError(s.T(), err)
assert.Equal(s.T(), 5, len(buffers))
Expand Down

0 comments on commit 5f5b2df

Please sign in to comment.