Skip to content

Commit

Permalink
Add log stored status explicitly
Browse files Browse the repository at this point in the history
- adding log stored stored status explictly in the Log object
improves the detection for partial or no storage of logs
- it might help mitigate the race condition between pruning
the runs and storing the logs.

Signed-off-by: Avinal Kumar <avinal@redhat.com>

Signed-off-by: Khurram Baig <kbaig@redhat.com>
  • Loading branch information
avinal authored and khrm committed Jun 6, 2024
1 parent 0a020d9 commit f2f1a44
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 90 deletions.
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"

"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
)

type fileStream struct {
Expand All @@ -20,7 +20,7 @@ type fileStream struct {
}

// NewFileStream returns a LogStreamer that streams directly from a log file on local disk.
func NewFileStream(ctx context.Context, log *v1alpha2.Log, config *config.Config) (Stream, error) {
func NewFileStream(ctx context.Context, log *v1alpha3.Log, config *config.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand All @@ -42,7 +42,7 @@ func NewFileStream(ctx context.Context, log *v1alpha2.Log, config *config.Config
}

func (*fileStream) Type() string {
return string(v1alpha2.FileLogType)
return string(v1alpha3.FileLogType)
}

// WriteTo reads the contents of the TaskRun log file and writes them to the provided writer, such
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"os"

server "github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"

"gocloud.dev/blob/gcsblob"
"gocloud.dev/gcp"
Expand All @@ -41,7 +41,7 @@ type gcsStream struct {
}

// NewGCSStream returns a log streamer for the GCS storage type.
func NewGCSStream(ctx context.Context, log *v1alpha2.Log, config *server.Config) (Stream, error) {
func NewGCSStream(ctx context.Context, log *v1alpha3.Log, config *server.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func getGCSClient(ctx context.Context, cfg *server.Config) (*gcp.HTTPClient, err
}

func (*gcsStream) Type() string {
return string(v1alpha2.GCSLogType)
return string(v1alpha3.GCSLogType)
}

func (gcs *gcsStream) WriteTo(w io.Writer) (int64, error) {
Expand Down
22 changes: 11 additions & 11 deletions pkg/api/server/v1alpha2/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/api/server/db"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -64,21 +64,21 @@ type Stream interface {
//
// NewStream may mutate the Log object's status, to provide implementation information
// for reading and writing files.
func NewStream(ctx context.Context, log *v1alpha2.Log, config *config.Config) (Stream, error) {
func NewStream(ctx context.Context, log *v1alpha3.Log, config *config.Config) (Stream, error) {
switch log.Spec.Type {
case v1alpha2.FileLogType:
case v1alpha3.FileLogType:
return NewFileStream(ctx, log, config)
case v1alpha2.S3LogType:
case v1alpha3.S3LogType:
return NewS3Stream(ctx, log, config)
case v1alpha2.GCSLogType:
case v1alpha3.GCSLogType:
return NewGCSStream(ctx, log, config)
}
return nil, fmt.Errorf("log streamer type %s is not supported", log.Spec.Type)
}

// ToStorage converts log record to marshaled json bytes
func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
log := &v1alpha2.Log{}
log := &v1alpha3.Log{}
if len(record.GetData().Value) > 0 {
err := json.Unmarshal(record.GetData().Value, log)
if err != nil {
Expand All @@ -88,7 +88,7 @@ func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
log.Default()

if log.Spec.Type == "" {
log.Spec.Type = v1alpha2.LogType(config.LOGS_TYPE)
log.Spec.Type = v1alpha3.LogType(config.LOGS_TYPE)
if len(log.Spec.Type) == 0 {
return nil, fmt.Errorf("failed to set up log storage type to spec")
}
Expand All @@ -100,11 +100,11 @@ func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
// First one is a new log streamer created by log record.
// Second one is log API resource retrieved from log record.
// Third argument is an error.
func ToStream(ctx context.Context, record *db.Record, config *config.Config) (Stream, *v1alpha2.Log, error) {
if record.Type != v1alpha2.LogRecordType {
func ToStream(ctx context.Context, record *db.Record, config *config.Config) (Stream, *v1alpha3.Log, error) {
if record.Type != v1alpha3.LogRecordType && record.Type != v1alpha3.LogRecordTypeV2 {
return nil, nil, fmt.Errorf("record type %s cannot stream logs", record.Type)
}
log := &v1alpha2.Log{}
log := &v1alpha3.Log{}
err := json.Unmarshal(record.Data, log)
if err != nil {
return nil, nil, fmt.Errorf("could not decode Log record: %w", err)
Expand All @@ -115,7 +115,7 @@ func ToStream(ctx context.Context, record *db.Record, config *config.Config) (St

// FilePath returns file path to store log. This file path can be
// path in the real file system or virtual value depending on storage type.
func FilePath(log *v1alpha2.Log) (string, error) {
func FilePath(log *v1alpha3.Log) (string, error) {
filePath := filepath.Join(log.GetNamespace(), string(log.GetUID()), log.Name)
if filePath == "" {
return "", fmt.Errorf("invalid file path")
Expand Down
26 changes: 13 additions & 13 deletions pkg/api/server/v1alpha2/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/api/server/db"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
"github.com/tektoncd/results/pkg/internal/jsonutil"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestFilePath(t *testing.T) {
log := &v1alpha2.Log{
log := &v1alpha3.Log{
ObjectMeta: v1.ObjectMeta{
Name: "test-log",
Namespace: "test",
Expand Down Expand Up @@ -112,13 +112,13 @@ func TestParseName(t *testing.T) {
}

func TestToStorage(t *testing.T) {
log := &v1alpha2.Log{
log := &v1alpha3.Log{
ObjectMeta: v1.ObjectMeta{
Name: "test-taskrun-log",
},
Spec: v1alpha2.LogSpec{
Type: v1alpha2.FileLogType,
Resource: v1alpha2.Resource{
Spec: v1alpha3.LogSpec{
Type: v1alpha3.FileLogType,
Resource: v1alpha3.Resource{
Name: "test-taskrun",
},
},
Expand All @@ -133,7 +133,7 @@ func TestToStorage(t *testing.T) {
rec := &pb.Record{
Name: "test-log",
Data: &pb.Any{
Type: v1alpha2.LogRecordType,
Type: v1alpha3.LogRecordType,
Value: want,
},
}
Expand Down Expand Up @@ -183,24 +183,24 @@ func TestToStream(t *testing.T) {
ResultName: "push-main",
Name: "taskrun-compile-log",
ID: "a",
Type: v1alpha2.LogRecordType,
Data: jsonutil.AnyBytes(t, &v1alpha2.Log{
Type: v1alpha3.LogRecordType,
Data: jsonutil.AnyBytes(t, &v1alpha3.Log{
ObjectMeta: v1.ObjectMeta{
Name: "test-log",
Namespace: "test",
UID: "test-uid",
},
Spec: v1alpha2.LogSpec{
Type: v1alpha2.FileLogType,
Resource: v1alpha2.Resource{
Spec: v1alpha3.LogSpec{
Type: v1alpha3.FileLogType,
Resource: v1alpha3.Resource{
Namespace: "app",
Name: "taskrun-compile",
},
},
}),
},
want: &mockStream{
streamType: string(v1alpha2.FileLogType),
streamType: string(v1alpha3.FileLogType),
},
},
{
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
server "github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
)

const (
Expand Down Expand Up @@ -49,7 +49,7 @@ type s3Stream struct {
}

// NewS3Stream returns a log streamer for the S3 log storage type.
func NewS3Stream(ctx context.Context, log *v1alpha2.Log, config *server.Config) (Stream, error) {
func NewS3Stream(ctx context.Context, log *v1alpha3.Log, config *server.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand Down Expand Up @@ -130,7 +130,7 @@ func initConfig(ctx context.Context, cfg *server.Config) (*s3.Client, error) {
}

func (*s3Stream) Type() string {
return string(v1alpha2.S3LogType)
return string(v1alpha3.S3LogType)
}

func (s3s *s3Stream) WriteTo(w io.Writer) (n int64, err error) {
Expand Down
36 changes: 25 additions & 11 deletions pkg/api/server/v1alpha2/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/tektoncd/results/pkg/api/server/v1alpha2/auth"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/log"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/record"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
"github.com/tektoncd/results/pkg/logs"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -47,7 +47,7 @@ func (s *Server) GetLog(req *pb.GetLogRequest, srv pb.Logs_GetLogServer) error {
return err
}
// Check if the input record is referenced in any logs record in the result
if rec.Type != v1alpha2.LogRecordType {
if rec.Type != v1alpha3.LogRecordType && rec.Type != v1alpha3.LogRecordTypeV2 {
rec, err = getLogRecord(s.db, parent, res, name)
if err != nil {
s.logger.Error(err)
Expand All @@ -60,9 +60,20 @@ func (s *Server) GetLog(req *pb.GetLogRequest, srv pb.Logs_GetLogServer) error {
s.logger.Error(err)
return status.Error(codes.Internal, "Error streaming log")
}
if object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")

// Handle v1alpha2 and earlier differently from v1alpha3 until v1alpha2 and earlier are deprecated
if "results.tekton.dev/v1alpha3" == object.APIVersion {
if !object.Status.IsStored || object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")
}
} else {
// For v1alpha2 checking log size is the best way to ensure if logs are stored
// this is however susceptible to race condition
if object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")
}
}

writer := logs.NewBufferedHTTPWriter(srv, req.GetName(), s.config.LOGS_BUFFER_SIZE)
Expand Down Expand Up @@ -95,7 +106,7 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
var name, parent, resultName, recordName string
var bytesWritten int64
var rec *db.Record
var object *v1alpha2.Log
var object *v1alpha3.Log
var stream log.Stream
// fyi we cannot defer the flush call in case we need to return the error
// but instead we pass the stream into handleError to preserve the behavior of
Expand Down Expand Up @@ -162,7 +173,7 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
}
}

func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *v1alpha2.Log, written int64, stream log.Stream, returnErr error) error {
func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *v1alpha3.Log, written int64, stream log.Stream, returnErr error) error {
// When the srv reaches the end, srv.Recv() returns an io.EOF error
// Therefore we should not return io.EOF if it is received in this function.
// Otherwise, we should return the original error and not mask any subsequent errors handling cleanup/return.
Expand All @@ -184,9 +195,12 @@ func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *
}
apiRec := record.ToAPI(rec)
apiRec.UpdateTime = timestamppb.Now()
if written > 0 {
log.Status.Size = written
log.Status.Size = written
log.Status.IsStored = returnErr == io.EOF
if returnErr != nil && returnErr != io.EOF {
log.Status.ErrorOnStoreMsg = returnErr.Error()
}

data, err := json.Marshal(log)
if err != nil {
if stream != nil {
Expand Down Expand Up @@ -327,7 +341,7 @@ func (s *Server) getFilteredPaginatedSortedLogRecords(ctx context.Context, paren
for len(rec) < pageSize {
batchSize := batcher.Next()
dbrecords := make([]*db.Record, 0, batchSize)
q := s.db.WithContext(ctx).Where("type = ?", v1alpha2.LogRecordType)
q := s.db.WithContext(ctx).Where("type = ?", v1alpha3.LogRecordType)
q = q.Where("id > ?", start)
// Specifying `-` allows users to read Records across Results.
// See https://google.aip.dev/159 for more details.
Expand Down Expand Up @@ -397,7 +411,7 @@ func (s *Server) DeleteLog(ctx context.Context, req *pb.DeleteLogRequest) (*empt
return &empty.Empty{}, err
}
// Check if the input record is referenced in any logs record
if rec.Type != v1alpha2.LogRecordType {
if rec.Type != v1alpha3.LogRecordType {
rec, err = getLogRecord(s.db, parent, res, name)
if err != nil {
return &empty.Empty{}, err
Expand Down

0 comments on commit f2f1a44

Please sign in to comment.