Skip to content

Commit

Permalink
do not lose flush error on server side update log
Browse files Browse the repository at this point in the history
rh-pre-commit.version: 2.2.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
gabemontero authored and tekton-robot committed May 21, 2024
1 parent 067091e commit 0a020d9
Showing 1 changed file with 50 additions and 17 deletions.
67 changes: 50 additions & 17 deletions pkg/api/server/v1alpha2/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,9 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
var rec *db.Record
var object *v1alpha2.Log
var stream log.Stream
defer func() {
if stream != nil {
if err := stream.Flush(); err != nil {
s.logger.Error(err)
}
}
}()
// fyi we cannot defer the flush call in case we need to return the error
// but instead we pass the stream into handleError to preserve the behavior of
// calling Flush regardless when we previously called Flush via defer
for {
// the underlying grpc stream RecvMsg method blocks until this receives a message or it is done,
// with the client now setting a context deadline, if a timeout occurs, that should make this done/canceled; let's check to confirm
Expand All @@ -116,61 +112,74 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
recv, err := srv.Recv()
// If we reach the end of the srv, we receive an io.EOF error
if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
}
// Ensure that we are receiving logs for the same record
if name == "" {
name = recv.GetName()
s.logger.Debugf("receiving logs for %s", name)
parent, resultName, recordName, err = log.ParseName(name)
if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
}

if err := s.auth.Check(srv.Context(), parent, auth.ResourceLogs, auth.PermissionUpdate); err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, err)
if err = s.auth.Check(srv.Context(), parent, auth.ResourceLogs, auth.PermissionUpdate); err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
}
}
if name != recv.GetName() {
err := fmt.Errorf("cannot put logs for multiple records in the same server")
err = fmt.Errorf("cannot put logs for multiple records in the same server")
return s.handleReturn(srv,
rec,
object,
bytesWritten,
stream,
err)
}

if rec == nil {
rec, err = getRecord(s.db.WithContext(srv.Context()), parent, resultName, recordName)
if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
}
}

if stream == nil {
stream, object, err = log.ToStream(srv.Context(), rec, s.config)
if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
}
}

buffer := bytes.NewBuffer(recv.GetData())
written, err := stream.ReadFrom(buffer)
var written int64
written, err = stream.ReadFrom(buffer)
bytesWritten += written

if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
}
}
}

func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *v1alpha2.Log, written int64, returnErr error) error {
func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *v1alpha2.Log, written int64, stream log.Stream, returnErr error) error {
// When the srv reaches the end, srv.Recv() returns an io.EOF error
// Therefore we should not return io.EOF if it is received in this function.
// Otherwise, we should return the original error and not mask any subsequent errors handling cleanup/return.

returnErrorStr := ""
if returnErr != nil {
returnErrorStr = returnErr.Error()
}

// If no database record or Log, return the original error
if rec == nil || log == nil {
if stream != nil {
if flushErr := stream.Flush(); flushErr != nil {
s.logger.Error(flushErr)
return fmt.Errorf("got flush error %s with returnErr: %s", flushErr.Error(), returnErrorStr)
}
}
return returnErr
}
apiRec := record.ToAPI(rec)
Expand All @@ -180,6 +189,12 @@ func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *
}
data, err := json.Marshal(log)
if err != nil {
if stream != nil {
if flushErr := stream.Flush(); flushErr != nil {
s.logger.Error(flushErr)
return fmt.Errorf("got flush error %s with returnErr: %s", flushErr.Error(), returnErrorStr)
}
}
if !isNilOrEOF(returnErr) {
return returnErr
}
Expand All @@ -196,19 +211,37 @@ func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *
})

if err != nil {
if stream != nil {
if flushErr := stream.Flush(); flushErr != nil {
s.logger.Error(flushErr)
return fmt.Errorf("got flush error %s with returnErr: %s", flushErr.Error(), returnErrorStr)
}
}
if !isNilOrEOF(returnErr) {
return returnErr
}
return err
}

if returnErr == io.EOF {
if stream != nil {
if flushErr := stream.Flush(); flushErr != nil {
s.logger.Error(flushErr)
return flushErr
}
}
s.logger.Debugf("received %d bytes for %s", written, apiRec.GetName())
return srv.SendAndClose(&pb.LogSummary{
Record: apiRec.Name,
BytesReceived: written,
})
}
if stream != nil {
if flushErr := stream.Flush(); flushErr != nil {
s.logger.Error(flushErr)
return fmt.Errorf("got flush error %s with returnErr: %s", flushErr.Error(), returnErrorStr)
}
}
return returnErr
}

Expand Down

0 comments on commit 0a020d9

Please sign in to comment.