Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log stored status explicitly #760

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"

"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
)

type fileStream struct {
Expand All @@ -20,7 +20,7 @@ type fileStream struct {
}

// NewFileStream returns a LogStreamer that streams directly from a log file on local disk.
func NewFileStream(ctx context.Context, log *v1alpha2.Log, config *config.Config) (Stream, error) {
func NewFileStream(ctx context.Context, log *v1alpha3.Log, config *config.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand All @@ -42,7 +42,7 @@ func NewFileStream(ctx context.Context, log *v1alpha2.Log, config *config.Config
}

func (*fileStream) Type() string {
return string(v1alpha2.FileLogType)
return string(v1alpha3.FileLogType)
}

// WriteTo reads the contents of the TaskRun log file and writes them to the provided writer, such
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"os"

server "github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"

"gocloud.dev/blob/gcsblob"
"gocloud.dev/gcp"
Expand All @@ -41,7 +41,7 @@ type gcsStream struct {
}

// NewGCSStream returns a log streamer for the GCS storage type.
func NewGCSStream(ctx context.Context, log *v1alpha2.Log, config *server.Config) (Stream, error) {
func NewGCSStream(ctx context.Context, log *v1alpha3.Log, config *server.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func getGCSClient(ctx context.Context, cfg *server.Config) (*gcp.HTTPClient, err
}

func (*gcsStream) Type() string {
return string(v1alpha2.GCSLogType)
return string(v1alpha3.GCSLogType)
}

func (gcs *gcsStream) WriteTo(w io.Writer) (int64, error) {
Expand Down
22 changes: 11 additions & 11 deletions pkg/api/server/v1alpha2/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/api/server/db"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -64,21 +64,21 @@ type Stream interface {
//
// NewStream may mutate the Log object's status, to provide implementation information
// for reading and writing files.
func NewStream(ctx context.Context, log *v1alpha2.Log, config *config.Config) (Stream, error) {
func NewStream(ctx context.Context, log *v1alpha3.Log, config *config.Config) (Stream, error) {
switch log.Spec.Type {
case v1alpha2.FileLogType:
case v1alpha3.FileLogType:
return NewFileStream(ctx, log, config)
case v1alpha2.S3LogType:
case v1alpha3.S3LogType:
return NewS3Stream(ctx, log, config)
case v1alpha2.GCSLogType:
case v1alpha3.GCSLogType:
return NewGCSStream(ctx, log, config)
}
return nil, fmt.Errorf("log streamer type %s is not supported", log.Spec.Type)
}

// ToStorage converts log record to marshaled json bytes
func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
log := &v1alpha2.Log{}
log := &v1alpha3.Log{}
if len(record.GetData().Value) > 0 {
err := json.Unmarshal(record.GetData().Value, log)
if err != nil {
Expand All @@ -88,7 +88,7 @@ func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
log.Default()

if log.Spec.Type == "" {
log.Spec.Type = v1alpha2.LogType(config.LOGS_TYPE)
log.Spec.Type = v1alpha3.LogType(config.LOGS_TYPE)
if len(log.Spec.Type) == 0 {
return nil, fmt.Errorf("failed to set up log storage type to spec")
}
Expand All @@ -100,11 +100,11 @@ func ToStorage(record *pb.Record, config *config.Config) ([]byte, error) {
// First one is a new log streamer created by log record.
// Second one is log API resource retrieved from log record.
// Third argument is an error.
func ToStream(ctx context.Context, record *db.Record, config *config.Config) (Stream, *v1alpha2.Log, error) {
if record.Type != v1alpha2.LogRecordType {
func ToStream(ctx context.Context, record *db.Record, config *config.Config) (Stream, *v1alpha3.Log, error) {
if record.Type != v1alpha3.LogRecordType && record.Type != v1alpha3.LogRecordTypeV2 {
return nil, nil, fmt.Errorf("record type %s cannot stream logs", record.Type)
}
log := &v1alpha2.Log{}
log := &v1alpha3.Log{}
err := json.Unmarshal(record.Data, log)
if err != nil {
return nil, nil, fmt.Errorf("could not decode Log record: %w", err)
Expand All @@ -115,7 +115,7 @@ func ToStream(ctx context.Context, record *db.Record, config *config.Config) (St

// FilePath returns file path to store log. This file path can be
// path in the real file system or virtual value depending on storage type.
func FilePath(log *v1alpha2.Log) (string, error) {
func FilePath(log *v1alpha3.Log) (string, error) {
filePath := filepath.Join(log.GetNamespace(), string(log.GetUID()), log.Name)
if filePath == "" {
return "", fmt.Errorf("invalid file path")
Expand Down
26 changes: 13 additions & 13 deletions pkg/api/server/v1alpha2/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (

"github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/api/server/db"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
"github.com/tektoncd/results/pkg/internal/jsonutil"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestFilePath(t *testing.T) {
log := &v1alpha2.Log{
log := &v1alpha3.Log{
ObjectMeta: v1.ObjectMeta{
Name: "test-log",
Namespace: "test",
Expand Down Expand Up @@ -112,13 +112,13 @@ func TestParseName(t *testing.T) {
}

func TestToStorage(t *testing.T) {
log := &v1alpha2.Log{
log := &v1alpha3.Log{
ObjectMeta: v1.ObjectMeta{
Name: "test-taskrun-log",
},
Spec: v1alpha2.LogSpec{
Type: v1alpha2.FileLogType,
Resource: v1alpha2.Resource{
Spec: v1alpha3.LogSpec{
Type: v1alpha3.FileLogType,
Resource: v1alpha3.Resource{
Name: "test-taskrun",
},
},
Expand All @@ -133,7 +133,7 @@ func TestToStorage(t *testing.T) {
rec := &pb.Record{
Name: "test-log",
Data: &pb.Any{
Type: v1alpha2.LogRecordType,
Type: v1alpha3.LogRecordType,
Value: want,
},
}
Expand Down Expand Up @@ -183,24 +183,24 @@ func TestToStream(t *testing.T) {
ResultName: "push-main",
Name: "taskrun-compile-log",
ID: "a",
Type: v1alpha2.LogRecordType,
Data: jsonutil.AnyBytes(t, &v1alpha2.Log{
Type: v1alpha3.LogRecordType,
Data: jsonutil.AnyBytes(t, &v1alpha3.Log{
ObjectMeta: v1.ObjectMeta{
Name: "test-log",
Namespace: "test",
UID: "test-uid",
},
Spec: v1alpha2.LogSpec{
Type: v1alpha2.FileLogType,
Resource: v1alpha2.Resource{
Spec: v1alpha3.LogSpec{
Type: v1alpha3.FileLogType,
Resource: v1alpha3.Resource{
Namespace: "app",
Name: "taskrun-compile",
},
},
}),
},
want: &mockStream{
streamType: string(v1alpha2.FileLogType),
streamType: string(v1alpha3.FileLogType),
},
},
{
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/server/v1alpha2/log/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
server "github.com/tektoncd/results/pkg/api/server/config"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
)

const (
Expand Down Expand Up @@ -49,7 +49,7 @@ type s3Stream struct {
}

// NewS3Stream returns a log streamer for the S3 log storage type.
func NewS3Stream(ctx context.Context, log *v1alpha2.Log, config *server.Config) (Stream, error) {
func NewS3Stream(ctx context.Context, log *v1alpha3.Log, config *server.Config) (Stream, error) {
if log.Status.Path == "" {
filePath, err := FilePath(log)
if err != nil {
Expand Down Expand Up @@ -130,7 +130,7 @@ func initConfig(ctx context.Context, cfg *server.Config) (*s3.Client, error) {
}

func (*s3Stream) Type() string {
return string(v1alpha2.S3LogType)
return string(v1alpha3.S3LogType)
}

func (s3s *s3Stream) WriteTo(w io.Writer) (n int64, err error) {
Expand Down
52 changes: 34 additions & 18 deletions pkg/api/server/v1alpha2/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/tektoncd/results/pkg/api/server/v1alpha2/auth"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/log"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/record"
"github.com/tektoncd/results/pkg/apis/v1alpha2"
"github.com/tektoncd/results/pkg/apis/v1alpha3"
"github.com/tektoncd/results/pkg/logs"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -47,7 +47,7 @@ func (s *Server) GetLog(req *pb.GetLogRequest, srv pb.Logs_GetLogServer) error {
return err
}
// Check if the input record is referenced in any logs record in the result
if rec.Type != v1alpha2.LogRecordType {
if rec.Type != v1alpha3.LogRecordType && rec.Type != v1alpha3.LogRecordTypeV2 {
rec, err = getLogRecord(s.db, parent, res, name)
if err != nil {
s.logger.Error(err)
Expand All @@ -60,9 +60,20 @@ func (s *Server) GetLog(req *pb.GetLogRequest, srv pb.Logs_GetLogServer) error {
s.logger.Error(err)
return status.Error(codes.Internal, "Error streaming log")
}
if object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")

// Handle v1alpha2 and earlier differently from v1alpha3 until v1alpha2 and earlier are deprecated
if "results.tekton.dev/v1alpha3" == object.APIVersion {
if !object.Status.IsStored || object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")
}
} else {
// For v1alpha2 checking log size is the best way to ensure if logs are stored
// this is however susceptible to race condition
if object.Status.Size == 0 {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")
}
}

writer := logs.NewBufferedHTTPWriter(srv, req.GetName(), s.config.LOGS_BUFFER_SIZE)
Expand Down Expand Up @@ -95,7 +106,7 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
var name, parent, resultName, recordName string
var bytesWritten int64
var rec *db.Record
var object *v1alpha2.Log
var object *v1alpha3.Log
var stream log.Stream
// fyi we cannot defer the flush call in case we need to return the error
// but instead we pass the stream into handleError to preserve the behavior of
Expand All @@ -112,19 +123,19 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
recv, err := srv.Recv()
// If we reach the end of the srv, we receive an io.EOF error
if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err, true)
}
// Ensure that we are receiving logs for the same record
if name == "" {
name = recv.GetName()
s.logger.Debugf("receiving logs for %s", name)
parent, resultName, recordName, err = log.ParseName(name)
if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err, true)
}

if err = s.auth.Check(srv.Context(), parent, auth.ResourceLogs, auth.PermissionUpdate); err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err, false)
}
}
if name != recv.GetName() {
Expand All @@ -134,20 +145,21 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
object,
bytesWritten,
stream,
err)
err,
false)
}

if rec == nil {
rec, err = getRecord(s.db.WithContext(srv.Context()), parent, resultName, recordName)
if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err, true)
}
}

if stream == nil {
stream, object, err = log.ToStream(srv.Context(), rec, s.config)
if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err, false)
}
}

Expand All @@ -157,12 +169,12 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
bytesWritten += written

if err != nil {
return s.handleReturn(srv, rec, object, bytesWritten, stream, err)
return s.handleReturn(srv, rec, object, bytesWritten, stream, err, true)
}
}
}

func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *v1alpha2.Log, written int64, stream log.Stream, returnErr error) error {
func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *v1alpha3.Log, written int64, stream log.Stream, returnErr error, isRetryableErr bool) error {
// When the srv reaches the end, srv.Recv() returns an io.EOF error
// Therefore we should not return io.EOF if it is received in this function.
// Otherwise, we should return the original error and not mask any subsequent errors handling cleanup/return.
Expand All @@ -184,9 +196,13 @@ func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *
}
apiRec := record.ToAPI(rec)
apiRec.UpdateTime = timestamppb.Now()
if written > 0 {
log.Status.Size = written
log.Status.Size = written
log.Status.IsStored = returnErr == io.EOF
if returnErr != nil && returnErr != io.EOF {
log.Status.ErrorOnStoreMsg = returnErr.Error()
log.Status.IsRetryableErr = isRetryableErr
}

data, err := json.Marshal(log)
if err != nil {
if stream != nil {
Expand Down Expand Up @@ -327,7 +343,7 @@ func (s *Server) getFilteredPaginatedSortedLogRecords(ctx context.Context, paren
for len(rec) < pageSize {
batchSize := batcher.Next()
dbrecords := make([]*db.Record, 0, batchSize)
q := s.db.WithContext(ctx).Where("type = ?", v1alpha2.LogRecordType)
q := s.db.WithContext(ctx).Where("type = ?", v1alpha3.LogRecordType)
q = q.Where("id > ?", start)
// Specifying `-` allows users to read Records across Results.
// See https://google.aip.dev/159 for more details.
Expand Down Expand Up @@ -397,7 +413,7 @@ func (s *Server) DeleteLog(ctx context.Context, req *pb.DeleteLogRequest) (*empt
return &empty.Empty{}, err
}
// Check if the input record is referenced in any logs record
if rec.Type != v1alpha2.LogRecordType {
if rec.Type != v1alpha3.LogRecordType {
rec, err = getLogRecord(s.db, parent, res, name)
if err != nil {
return &empty.Empty{}, err
Expand Down
Loading