Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add stored status explicitly for logs #704

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"

"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
)

type fileStream struct {
Expand All @@ -20,7 +20,7 @@ type fileStream struct {
}

// NewFileStream returns a LogStreamer that streams directly from a log file on local disk.
func NewFileStream(ctx context.Context, log *v1alpha2.Log, config *config.Config) (Stream, error) {
func NewFileStream(ctx context.Context, log *v1alpha3.Log, config *config.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand All @@ -42,7 +42,7 @@ func NewFileStream(ctx context.Context, log *v1alpha2.Log, config *config.Config
}

func (*fileStream) Type() string {
return string(v1alpha2.FileLogType)
return string(v1alpha3.FileLogType)
}

// WriteTo reads the contents of the TaskRun log file and writes them to the provided writer, such
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"os"

server "github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"

"gocloud.dev/blob/gcsblob"
"gocloud.dev/gcp"
Expand All @@ -41,7 +41,7 @@ type gcsStream struct {
}

// NewGCSStream returns a log streamer for the GCS storage type.
func NewGCSStream(ctx context.Context, log *v1alpha2.Log, config *server.Config) (Stream, error) {
func NewGCSStream(ctx context.Context, log *v1alpha3.Log, config *server.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func getGCSClient(ctx context.Context, cfg *server.Config) (*gcp.HTTPClient, err
}

func (*gcsStream) Type() string {
return string(v1alpha2.GCSLogType)
return string(v1alpha3.GCSLogType)
}

func (gcs *gcsStream) WriteTo(w io.Writer) (int64, error) {
Expand Down
22 changes: 11 additions & 11 deletions pkg/api/server/v1alpha2/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/api/server/db"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -64,21 +64,21 @@ type Stream interface {
//
// NewStream may mutate the Log object's status, to provide implementation information
// for reading and writing files.
func NewStream(ctx context.Context, log *v1alpha2.Log, config *config.Config) (Stream, error) {
func NewStream(ctx context.Context, log *v1alpha3.Log, config *config.Config) (Stream, error) {
switch log.Spec.Type {
case v1alpha2.FileLogType:
case v1alpha3.FileLogType:
return NewFileStream(ctx, log, config)
case v1alpha2.S3LogType:
case v1alpha3.S3LogType:
return NewS3Stream(ctx, log, config)
case v1alpha2.GCSLogType:
case v1alpha3.GCSLogType:
return NewGCSStream(ctx, log, config)
}
return nil, fmt.Errorf("log streamer type %s is not supported", log.Spec.Type)
}

// ToStorage converts log record to marshaled json bytes
func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
log := &v1alpha2.Log{}
log := &v1alpha3.Log{}
if len(record.GetData().Value) > 0 {
err := json.Unmarshal(record.GetData().Value, log)
if err != nil {
Expand All @@ -88,7 +88,7 @@ func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
log.Default()

if log.Spec.Type == "" {
log.Spec.Type = v1alpha2.LogType(config.LOGS_TYPE)
log.Spec.Type = v1alpha3.LogType(config.LOGS_TYPE)
if len(log.Spec.Type) == 0 {
return nil, fmt.Errorf("failed to set up log storage type to spec")
}
Expand All @@ -100,11 +100,11 @@ func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
// First one is a new log streamer created by log record.
// Second one is log API resource retrieved from log record.
// Third argument is an error.
func ToStream(ctx context.Context, record *db.Record, config *config.Config) (Stream, *v1alpha2.Log, error) {
if record.Type != v1alpha2.LogRecordType {
func ToStream(ctx context.Context, record *db.Record, config *config.Config) (Stream, *v1alpha3.Log, error) {
if record.Type != v1alpha3.LogRecordType {
return nil, nil, fmt.Errorf("record type %s cannot stream logs", record.Type)
}
log := &v1alpha2.Log{}
log := &v1alpha3.Log{}
err := json.Unmarshal(record.Data, log)
if err != nil {
return nil, nil, fmt.Errorf("could not decode Log record: %w", err)
Expand All @@ -115,7 +115,7 @@ func ToStream(ctx context.Context, record *db.Record, config *config.Config) (St

// FilePath returns file path to store log. This file path can be
// path in the real file system or virtual value depending on storage type.
func FilePath(log *v1alpha2.Log) (string, error) {
func FilePath(log *v1alpha3.Log) (string, error) {
filePath := filepath.Join(log.GetNamespace(), string(log.GetUID()), log.Name)
if filePath == "" {
return "", fmt.Errorf("invalid file path")
Expand Down
26 changes: 13 additions & 13 deletions pkg/api/server/v1alpha2/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/api/server/db"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
"github.com/tektoncd/results/pkg/internal/jsonutil"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestFilePath(t *testing.T) {
log := &v1alpha2.Log{
log := &v1alpha3.Log{
ObjectMeta: v1.ObjectMeta{
Name: "test-log",
Namespace: "test",
Expand Down Expand Up @@ -112,13 +112,13 @@ func TestParseName(t *testing.T) {
}

func TestToStorage(t *testing.T) {
log := &v1alpha2.Log{
log := &v1alpha3.Log{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we not have at least one v1alpha2 test case to validate that behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests won't work because we no longer have v1alpha2, to simulate, we have to retain the v1alpha2 types as well.

ObjectMeta: v1.ObjectMeta{
Name: "test-taskrun-log",
},
Spec: v1alpha2.LogSpec{
Type: v1alpha2.FileLogType,
Resource: v1alpha2.Resource{
Spec: v1alpha3.LogSpec{
Type: v1alpha3.FileLogType,
Resource: v1alpha3.Resource{
Name: "test-taskrun",
},
},
Expand All @@ -133,7 +133,7 @@ func TestToStorage(t *testing.T) {
rec := &pb.Record{
Name: "test-log",
Data: &pb.Any{
Type: v1alpha2.LogRecordType,
Type: v1alpha3.LogRecordType,
Value: want,
},
}
Expand Down Expand Up @@ -183,24 +183,24 @@ func TestToStream(t *testing.T) {
ResultName: "push-main",
Name: "taskrun-compile-log",
ID: "a",
Type: v1alpha2.LogRecordType,
Data: jsonutil.AnyBytes(t, &v1alpha2.Log{
Type: v1alpha3.LogRecordType,
Data: jsonutil.AnyBytes(t, &v1alpha3.Log{
ObjectMeta: v1.ObjectMeta{
Name: "test-log",
Namespace: "test",
UID: "test-uid",
},
Spec: v1alpha2.LogSpec{
Type: v1alpha2.FileLogType,
Resource: v1alpha2.Resource{
Spec: v1alpha3.LogSpec{
Type: v1alpha3.FileLogType,
Resource: v1alpha3.Resource{
Namespace: "app",
Name: "taskrun-compile",
},
},
}),
},
want: &mockStream{
streamType: string(v1alpha2.FileLogType),
streamType: string(v1alpha3.FileLogType),
},
},
{
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
server "github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
)

const (
Expand Down Expand Up @@ -49,7 +49,7 @@ type s3Stream struct {
}

// NewS3Stream returns a log streamer for the S3 log storage type.
func NewS3Stream(ctx context.Context, log *v1alpha2.Log, config *server.Config) (Stream, error) {
func NewS3Stream(ctx context.Context, log *v1alpha3.Log, config *server.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand Down Expand Up @@ -130,7 +130,7 @@ func initConfig(ctx context.Context, cfg *server.Config) (*s3.Client, error) {
}

func (*s3Stream) Type() string {
return string(v1alpha2.S3LogType)
return string(v1alpha3.S3LogType)
}

func (s3s *s3Stream) WriteTo(w io.Writer) (n int64, err error) {
Expand Down
35 changes: 23 additions & 12 deletions pkg/api/server/v1alpha2/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/tektoncd/results/pkg/api/server/v1alpha2/auth"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/log"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/record"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
"github.com/tektoncd/results/pkg/logs"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -47,7 +47,7 @@ func (s *Server) GetLog(req *pb.GetLogRequest, srv pb.Logs_GetLogServer) error {
return err
}
// Check if the input record is referenced in any logs record in the result
if rec.Type != v1alpha2.LogRecordType {
if rec.Type != v1alpha3.LogRecordType {
rec, err = getLogRecord(s.db, parent, res, name)
if err != nil {
s.logger.Error(err)
Expand All @@ -60,9 +60,20 @@ func (s *Server) GetLog(req *pb.GetLogRequest, srv pb.Logs_GetLogServer) error {
s.logger.Error(err)
return status.Error(codes.Internal, "Error streaming log")
}
if object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")

// Handle v1alpha2 and earlier differently from v1alpha3 until v1alpha2 and earlier are deprecated
if "results.tekton.dev/v1alpha3" == object.APIVersion {
if !object.Status.IsStored || object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")
}
} else {
// For v1alpha2 checking log size is the best way to ensure if logs are stored
// this is however susceptible to race condition
if object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")
}
}

writer := logs.NewBufferedHTTPWriter(srv, req.GetName(), s.config.LOGS_BUFFER_SIZE)
Expand Down Expand Up @@ -95,7 +106,7 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
var name, parent, resultName, recordName string
var bytesWritten int64
var rec *db.Record
var object *v1alpha2.Log
var object *v1alpha3.Log
var stream log.Stream
defer func() {
if stream != nil {
Expand Down Expand Up @@ -164,7 +175,7 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
}
}

func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *v1alpha2.Log, written int64, returnErr error) error {
func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *v1alpha3.Log, written int64, returnErr error) error {
// When the srv reaches the end, srv.Recv() returns an io.EOF error
// Therefore we should not return io.EOF if it is received in this function.
// Otherwise, we should return the original error and not mask any subsequent errors handling cleanup/return.
Expand All @@ -175,9 +186,9 @@ func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *
}
apiRec := record.ToAPI(rec)
apiRec.UpdateTime = timestamppb.Now()
if written > 0 {
log.Status.Size = written
}
log.Status.Size = written
log.Status.IsStored = returnErr == io.EOF

data, err := json.Marshal(log)
if err != nil {
if !isNilOrEOF(returnErr) {
Expand Down Expand Up @@ -294,7 +305,7 @@ func (s *Server) getFilteredPaginatedSortedLogRecords(ctx context.Context, paren
for len(rec) < pageSize {
batchSize := batcher.Next()
dbrecords := make([]*db.Record, 0, batchSize)
q := s.db.WithContext(ctx).Where("type = ?", v1alpha2.LogRecordType)
q := s.db.WithContext(ctx).Where("type = ?", v1alpha3.LogRecordType)
q = q.Where("id > ?", start)
// Specifying `-` allows users to read Records across Results.
// See https://google.aip.dev/159 for more details.
Expand Down Expand Up @@ -364,7 +375,7 @@ func (s *Server) DeleteLog(ctx context.Context, req *pb.DeleteLogRequest) (*empt
return &empty.Empty{}, err
}
// Check if the input record is referenced in any logs record
if rec.Type != v1alpha2.LogRecordType {
if rec.Type != v1alpha3.LogRecordType {
rec, err = getLogRecord(s.db, parent, res, name)
if err != nil {
return &empty.Empty{}, err
Expand Down
Loading