Skip to content

Commit 00c0b06

Browse files
authored
support/datastore: add helpers to get latest and oldest ledger sequence (#5805)
1 parent a2f3853 commit 00c0b06

File tree

12 files changed

+819
-42
lines changed

12 files changed

+819
-42
lines changed

ingest/producer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func TestBSBProducerFnConfigError(t *testing.T) {
116116
}
117117
mockDataStore.On("GetFile", mock.Anything, ".config.json").
118118
Return(io.NopCloser(bytes.NewReader(configManifestJSON(t))), nil).Once()
119-
mockDataStore.On("ListFilePaths", mock.Anything, "", 0).Return(nil, nil)
119+
mockDataStore.On("ListFilePaths", mock.Anything, datastore.ListFileOptions{}).Return(nil, nil)
120120

121121
datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) {
122122
return mockDataStore, nil
@@ -136,7 +136,7 @@ func TestBSBProducerFnInvalidRange(t *testing.T) {
136136
mockDataStore := new(datastore.MockDataStore)
137137
mockDataStore.On("GetFile", mock.Anything, ".config.json").
138138
Return(io.NopCloser(bytes.NewReader(configManifestJSON(t))), nil).Once()
139-
mockDataStore.On("ListFilePaths", mock.Anything, "", 0).Return(nil, nil)
139+
mockDataStore.On("ListFilePaths", mock.Anything, datastore.ListFileOptions{}).Return(nil, nil)
140140

141141
appCallback := func(lcm xdr.LedgerCloseMeta) error {
142142
return nil
@@ -168,7 +168,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) {
168168
// since buffer is multi-worker async, it may get to this on other worker, but not deterministic,
169169
// don't assert on it
170170
mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zst").Return(makeSingleLCMBatch(3), nil).Maybe()
171-
mockDataStore.On("ListFilePaths", mock.Anything, "", 0).Return(nil, nil)
171+
mockDataStore.On("ListFilePaths", mock.Anything, datastore.ListFileOptions{}).Return(nil, nil)
172172

173173
appCallback := func(lcm xdr.LedgerCloseMeta) error {
174174
return nil
@@ -232,7 +232,7 @@ func createMockdataStore(t *testing.T, start, end, partitionSize uint32) *datast
232232

233233
mockDataStore.On("GetFile", mock.Anything, ".config.json").
234234
Return(io.NopCloser(bytes.NewReader(configJSON)), nil).Once()
235-
mockDataStore.On("ListFilePaths", mock.Anything, "", 0).Return(nil, nil)
235+
mockDataStore.On("ListFilePaths", mock.Anything, datastore.ListFileOptions{}).Return(nil, nil)
236236

237237
partition := partitionSize - 1
238238
for i := start; i <= end; i++ {

services/galexie/internal/app_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func TestValidateExistingFileExtension(t *testing.T) {
158158
for _, tc := range testCases {
159159
t.Run(tc.name, func(t *testing.T) {
160160
ds := new(datastore.MockDataStore)
161-
ds.On("ListFilePaths", context.Background(), "", 0).Return(tc.files, tc.getExtError)
161+
ds.On("ListFilePaths", context.Background(), datastore.ListFileOptions{}).Return(tc.files, tc.getExtError)
162162

163163
actualErr := validateExistingFileExtension(context.Background(), ds)
164164

support/datastore/configure.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func LoadSchema(ctx context.Context, dataStore DataStore, cfg DataStoreConfig) (
187187
var ErrNoLedgerFiles = errors.New("no ledger files found")
188188

189189
func GetLedgerFileExtension(ctx context.Context, dataStore DataStore) (string, error) {
190-
files, err := dataStore.ListFilePaths(ctx, "", 0)
190+
files, err := dataStore.ListFilePaths(ctx, ListFileOptions{})
191191
if err != nil {
192192
return "", fmt.Errorf("failed to list ledger files: %w", err)
193193
}

support/datastore/configure_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func TestLoadSchema(t *testing.T) {
193193
t.Run("Manifest found and valid", func(t *testing.T) {
194194
mockOS := new(MockDataStore)
195195
mockOS.On("GetFile", ctx, manifestFilename).Return(io.NopCloser(bytes.NewReader(validManifestBytes)), nil).Once()
196-
mockOS.On("ListFilePaths", ctx, "", 0).Return(nil, nil)
196+
mockOS.On("ListFilePaths", ctx, ListFileOptions{}).Return(nil, nil)
197197
schema, err := LoadSchema(ctx, mockOS, defaultCfg)
198198
require.NoError(t, err)
199199
require.NotNil(t, schema)
@@ -206,7 +206,7 @@ func TestLoadSchema(t *testing.T) {
206206
t.Run("Manifest not found", func(t *testing.T) {
207207
mockOS := new(MockDataStore)
208208
mockOS.On("GetFile", ctx, manifestFilename).Return(nil, os.ErrNotExist).Once()
209-
mockOS.On("ListFilePaths", ctx, "", 0).Return(nil, nil)
209+
mockOS.On("ListFilePaths", ctx, ListFileOptions{}).Return(nil, nil)
210210

211211
schema, err := LoadSchema(ctx, mockOS, defaultCfg)
212212
require.NoError(t, err)
@@ -219,7 +219,7 @@ func TestLoadSchema(t *testing.T) {
219219
t.Run("Manifest found but invalid JSON", func(t *testing.T) {
220220
mockOS := new(MockDataStore)
221221
mockOS.On("GetFile", ctx, manifestFilename).Return(io.NopCloser(bytes.NewReader([]byte(`{"invalid": "json"`))), nil).Once()
222-
mockOS.On("ListFilePaths", ctx, "", 0).Return(nil, nil)
222+
mockOS.On("ListFilePaths", ctx, ListFileOptions{}).Return(nil, nil)
223223

224224
schema, err := LoadSchema(ctx, mockOS, defaultCfg)
225225
require.Error(t, err)
@@ -237,7 +237,7 @@ func TestLoadSchema(t *testing.T) {
237237
require.NoError(t, err)
238238

239239
mockOS.On("GetFile", ctx, manifestFilename).Return(io.NopCloser(bytes.NewReader(invalidManifestBytes)), nil).Once()
240-
mockOS.On("ListFilePaths", ctx, "", 0).Return(nil, nil)
240+
mockOS.On("ListFilePaths", ctx, ListFileOptions{}).Return(nil, nil)
241241

242242
schema, err := LoadSchema(ctx, mockOS, defaultCfg)
243243
require.Error(t, err)
@@ -249,7 +249,7 @@ func TestLoadSchema(t *testing.T) {
249249
t.Run("Manifest not found, and incomplete config", func(t *testing.T) {
250250
mockOS := new(MockDataStore)
251251
mockOS.On("GetFile", ctx, manifestFilename).Return(nil, os.ErrNotExist).Once()
252-
mockOS.On("ListFilePaths", ctx, "", 0).Return(nil, nil)
252+
mockOS.On("ListFilePaths", ctx, ListFileOptions{}).Return(nil, nil)
253253

254254
schema, err := LoadSchema(ctx, mockOS, DataStoreConfig{})
255255
require.Error(t, err)
@@ -311,7 +311,7 @@ func TestGetLedgerFileExtension(t *testing.T) {
311311
for _, tt := range cases {
312312
t.Run(tt.name, func(t *testing.T) {
313313
ds := new(MockDataStore)
314-
ds.On("ListFilePaths", mock.Anything, "", 0).Return(tt.files, tt.listErr).Once()
314+
ds.On("ListFilePaths", mock.Anything, ListFileOptions{}).Return(tt.files, tt.listErr).Once()
315315

316316
ext, err := GetLedgerFileExtension(context.Background(), ds)
317317
require.Equal(t, tt.ExpectedExt, ext)

support/datastore/datastore.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ type DataStoreConfig struct {
2323

2424
const listFilePathsMaxLimit = 1000
2525

26+
// ListFileOptions controls how ListFilePaths enumerates objects.
27+
type ListFileOptions struct {
28+
// Prefix filters the results to only include keys that start with this string.
29+
Prefix string
30+
31+
// StartAfter specifies the key from which to begin listing. The returned keys will be
32+
// lexicographically greater than this value.
33+
StartAfter string
34+
35+
// Limit restricts the number of keys returned. A value of 0 will use the default limit,
36+
// and any value above listFilePathsMaxLimit will be automatically capped.
37+
Limit int
38+
}
39+
2640
// DataStore defines an interface for interacting with data storage
2741
type DataStore interface {
2842
GetFileMetadata(ctx context.Context, path string) (map[string]string, error)
@@ -32,7 +46,7 @@ type DataStore interface {
3246
PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) (bool, error)
3347
Exists(ctx context.Context, path string) (bool, error)
3448
Size(ctx context.Context, path string) (int64, error)
35-
ListFilePaths(ctx context.Context, prefix string, limit int) ([]string, error)
49+
ListFilePaths(ctx context.Context, options ListFileOptions) ([]string, error)
3650
Close() error
3751
}
3852

support/datastore/gcs.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -203,32 +203,41 @@ func (b GCSDataStore) putFile(ctx context.Context, filePath string, in io.Writer
203203
}
204204

205205
// ListFilePaths lists up to 'limit' file paths under the provided prefix.
206-
// Returned paths are absolute within the datastore (including the given prefix)
206+
// Returned paths are relative to the bucket prefix.
207207
// and ordered lexicographically ascending as provided by the backend.
208208
// If limit <= 0, implementations default to a cap of 1,000; values > 1,000 are capped to 1,000.
209-
func (b GCSDataStore) ListFilePaths(ctx context.Context, prefix string, limit int) ([]string, error) {
209+
func (b GCSDataStore) ListFilePaths(ctx context.Context, options ListFileOptions) ([]string, error) {
210210
var fullPrefix string
211211

212212
// When 'prefix' is empty, ensure the base prefix ends with a slash (e.g., "a/b/")
213213
// so the query returns only objects within that directory, not similarly named paths like "a/b-1".
214-
if prefix == "" {
214+
if options.Prefix == "" {
215215
fullPrefix = b.prefix
216216
if !strings.HasSuffix(fullPrefix, "/") {
217217
fullPrefix += "/"
218218
}
219219
} else {
220220
// Join the caller-provided prefix with the datastore prefix
221-
fullPrefix = path.Join(b.prefix, prefix)
221+
fullPrefix = path.Join(b.prefix, options.Prefix)
222+
}
223+
224+
var StartAfter string
225+
if options.StartAfter != "" {
226+
StartAfter = path.Join(b.prefix, options.StartAfter)
227+
}
228+
229+
query := &storage.Query{
230+
Prefix: fullPrefix,
231+
StartOffset: StartAfter, // inclusive in GCS; we normalize to exclusive below
222232
}
223233

224-
query := &storage.Query{Prefix: fullPrefix}
225234
// Only request the object name to minimize payload
226235
query.SetAttrSelection([]string{"Name"})
227236
it := b.bucket.Objects(ctx, query)
228237

229238
keys := make([]string, 0)
230239
// Enforce an effective cap of 1000 total results and default to 1000 if <= 0
231-
remaining := limit
240+
remaining := options.Limit
232241
if remaining <= 0 || remaining > listFilePathsMaxLimit {
233242
remaining = listFilePathsMaxLimit
234243
}
@@ -243,8 +252,17 @@ func (b GCSDataStore) ListFilePaths(ctx context.Context, prefix string, limit in
243252
if err != nil {
244253
return nil, err
245254
}
246-
// Return full path (including the configured prefix)
247-
keys = append(keys, attrs.Name)
255+
256+
// GCS StartOffset is inclusive, so if the key same as StartAfter,
257+
// skip it so results begin strictly after that key.
258+
if attrs.Name == StartAfter {
259+
continue
260+
}
261+
262+
// Trim the configured prefix and any leading slash before appending
263+
relative := strings.TrimPrefix(attrs.Name, b.prefix+"/")
264+
keys = append(keys, relative)
265+
248266
remaining--
249267
}
250268
return keys, nil

support/datastore/gcs_test.go

Lines changed: 159 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -431,10 +431,10 @@ func TestGCSListFilePaths(t *testing.T) {
431431
require.NoError(t, err)
432432
t.Cleanup(func() { _ = store.Close() })
433433

434-
paths, err := store.ListFilePaths(context.Background(), "", 2)
434+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{Limit: 2})
435435
require.NoError(t, err)
436436

437-
require.Equal(t, []string{"objects/testnet/a", "objects/testnet/b"}, paths)
437+
require.Equal(t, []string{"a", "b"}, paths)
438438
}
439439

440440
func TestGCSListFilePaths_WithPrefix(t *testing.T) {
@@ -458,9 +458,9 @@ func TestGCSListFilePaths_WithPrefix(t *testing.T) {
458458
require.NoError(t, err)
459459
t.Cleanup(func() { _ = store.Close() })
460460

461-
paths, err := store.ListFilePaths(context.Background(), "a", 10)
461+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{Prefix: "a", Limit: 10})
462462
require.NoError(t, err)
463-
require.Equal(t, []string{"objects/testnet/a/x", "objects/testnet/a/y"}, paths)
463+
require.Equal(t, []string{"a/x", "a/y"}, paths)
464464
}
465465

466466
func TestGCSListFilePaths_LimitDefaultAndCap(t *testing.T) {
@@ -478,11 +478,164 @@ func TestGCSListFilePaths_LimitDefaultAndCap(t *testing.T) {
478478
require.NoError(t, err)
479479
t.Cleanup(func() { _ = store.Close() })
480480

481-
paths, err := store.ListFilePaths(context.Background(), "", 0)
481+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{})
482482
require.NoError(t, err)
483483
require.Equal(t, 1000, len(paths))
484484

485-
paths, err = store.ListFilePaths(context.Background(), "", 5000)
485+
paths, err = store.ListFilePaths(context.Background(), ListFileOptions{Limit: 5000})
486486
require.NoError(t, err)
487487
require.Equal(t, 1000, len(paths))
488488
}
489+
490+
func TestGCSListFilePaths_StartAfter(t *testing.T) {
491+
t.Run("basic start-after (no Prefix)", func(t *testing.T) {
492+
objects := make([]fakestorage.Object, 0, 11)
493+
for i := 0; i < 10; i++ {
494+
objects = append(objects, fakestorage.Object{
495+
ObjectAttrs: fakestorage.ObjectAttrs{
496+
BucketName: "test-bucket",
497+
Name: fmt.Sprintf("objects/testnet/%04d", i)},
498+
Content: []byte("x"),
499+
})
500+
}
501+
// decoy outside the prefix directory style
502+
objects = append(objects, fakestorage.Object{
503+
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet-foo/0000"},
504+
Content: []byte("x"),
505+
})
506+
507+
server := fakestorage.NewServer(objects)
508+
defer server.Stop()
509+
510+
store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet")
511+
require.NoError(t, err)
512+
t.Cleanup(func() { _ = store.Close() })
513+
514+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{
515+
StartAfter: "0005",
516+
})
517+
require.NoError(t, err)
518+
require.Equal(t, []string{"0006", "0007", "0008", "0009"}, paths,
519+
"should start strictly after 0005 and trim prefix")
520+
})
521+
522+
t.Run("with Prefix directory and start-after inside it", func(t *testing.T) {
523+
objects := []fakestorage.Object{
524+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/a/0001"}},
525+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/a/0002"}},
526+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/b/0002"}}, // different subdir
527+
// decoy outside the prefix directory style
528+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet-foo/0002"}},
529+
}
530+
server := fakestorage.NewServer(objects)
531+
defer server.Stop()
532+
533+
store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet")
534+
require.NoError(t, err)
535+
t.Cleanup(func() { _ = store.Close() })
536+
537+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{
538+
Prefix: "a/",
539+
StartAfter: "a/0001",
540+
})
541+
require.NoError(t, err)
542+
require.Equal(t, []string{"a/0002"}, paths)
543+
})
544+
545+
t.Run("start-after equals last key -> empty", func(t *testing.T) {
546+
objects := []fakestorage.Object{
547+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0000"}},
548+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0001"}},
549+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0002"}},
550+
// decoy outside the prefix directory style
551+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet-foo/0002"}},
552+
}
553+
server := fakestorage.NewServer(objects)
554+
defer server.Stop()
555+
556+
store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet")
557+
require.NoError(t, err)
558+
t.Cleanup(func() { _ = store.Close() })
559+
560+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{
561+
StartAfter: "0002",
562+
})
563+
require.NoError(t, err)
564+
require.Empty(t, paths)
565+
})
566+
567+
t.Run("start-after before first key -> all returned", func(t *testing.T) {
568+
objects := []fakestorage.Object{
569+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0001"}},
570+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0002"}},
571+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0003"}},
572+
// decoy outside the prefix directory style
573+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet-foo/0002"}},
574+
}
575+
server := fakestorage.NewServer(objects)
576+
defer server.Stop()
577+
578+
store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet")
579+
require.NoError(t, err)
580+
t.Cleanup(func() { _ = store.Close() })
581+
582+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{
583+
StartAfter: "0000",
584+
})
585+
require.NoError(t, err)
586+
require.Equal(t, []string{"0001", "0002", "0003"}, paths)
587+
})
588+
589+
t.Run("start-after missing-but-between keys -> next greater", func(t *testing.T) {
590+
objects := []fakestorage.Object{
591+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0002"}},
592+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0004"}},
593+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet/0006"}},
594+
// decoy outside the prefix directory style
595+
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet-foo/0002"}},
596+
}
597+
server := fakestorage.NewServer(objects)
598+
defer server.Stop()
599+
600+
store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet")
601+
require.NoError(t, err)
602+
t.Cleanup(func() { _ = store.Close() })
603+
604+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{
605+
StartAfter: "0003",
606+
})
607+
require.NoError(t, err)
608+
require.Equal(t, []string{"0004", "0006"}, paths)
609+
})
610+
611+
t.Run("respects limit together with start-after", func(t *testing.T) {
612+
objects := make([]fakestorage.Object, 0, 10)
613+
for i := 0; i < 10; i++ {
614+
objects = append(objects, fakestorage.Object{
615+
ObjectAttrs: fakestorage.ObjectAttrs{
616+
BucketName: "test-bucket",
617+
Name: fmt.Sprintf("objects/testnet/%04d", i)},
618+
Content: []byte("x"),
619+
})
620+
}
621+
// decoy outside the prefix directory style
622+
objects = append(objects, fakestorage.Object{
623+
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-bucket", Name: "objects/testnet-foo/0002"},
624+
Content: []byte("x"),
625+
})
626+
627+
server := fakestorage.NewServer(objects)
628+
defer server.Stop()
629+
630+
store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet")
631+
require.NoError(t, err)
632+
t.Cleanup(func() { _ = store.Close() })
633+
634+
paths, err := store.ListFilePaths(context.Background(), ListFileOptions{
635+
StartAfter: "0004",
636+
Limit: 3,
637+
})
638+
require.NoError(t, err)
639+
require.Equal(t, []string{"0005", "0006", "0007"}, paths)
640+
})
641+
}

0 commit comments

Comments
 (0)