Skip to content

Commit

Permalink
ptsstorage: allow synthetic timestamps in pts storage
Browse files Browse the repository at this point in the history
Previously synthetic timestamps were causing failures in changefeeds if
checkpoint contained a synthetic timestamps. Timestamp representation
was parsed as decimal for storage which is not the case for synthetic
timestamps.
This commit changes pts storage to strip synthetic flag to mitigate the
issue.

Release note: None
  • Loading branch information
aliher1911 committed Nov 15, 2022
1 parent 47c7b3a commit 951add4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/protectedts/ptstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *storage) UpdateTimestamp(
) error {
row, err := p.ex.QueryRowEx(ctx, "protectedts-update", txn,
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
updateTimestampQuery, id.GetBytesMut(), timestamp.AsOfSystemTime())
updateTimestampQuery, id.GetBytesMut(), timestamp.WithSynthetic(false).AsOfSystemTime())
if err != nil {
return errors.Wrapf(err, "failed to update record %v", id)
}
Expand All @@ -97,7 +97,7 @@ func (p *storage) deprecatedProtect(
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
protectQueryWithoutTarget,
s.maxSpans, s.maxBytes, len(r.DeprecatedSpans),
r.ID, r.Timestamp.AsOfSystemTime(),
r.ID, r.Timestamp.WithSynthetic(false).AsOfSystemTime(),
r.MetaType, meta,
len(r.DeprecatedSpans), encodedSpans)
if err != nil {
Expand Down Expand Up @@ -177,7 +177,7 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
protectQuery,
s.maxSpans, s.maxBytes, len(r.DeprecatedSpans),
r.ID, r.Timestamp.AsOfSystemTime(),
r.ID, r.Timestamp.WithSynthetic(false).AsOfSystemTime(),
r.MetaType, meta,
len(r.DeprecatedSpans), encodedTarget, encodedTarget)
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,35 @@ var testCases = []testCase{
}),
},
},
{
name: "Protect using synthetic timestamp",
ops: []op{
funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) {
rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now().WithSynthetic(true), "", nil, tableTarget(42),
tableSpan(42))
err := tCtx.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return tCtx.pts.Protect(ctx, txn, &rec)
})
require.NoError(t, err)
// Synthetic should be reset when writing timestamps to make it
// compatible with underlying sql schema.
rec.Timestamp.Synthetic = false
tCtx.state.Records = append(tCtx.state.Records, rec)
tCtx.state.Version++
tCtx.state.NumRecords++
tCtx.state.NumSpans += uint64(len(rec.DeprecatedSpans))
var encoded []byte
if tCtx.runWithDeprecatedSpans {
encoded, err = protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans})
require.NoError(t, err)
} else {
encoded, err = protoutil.Marshal(&ptpb.Target{Union: rec.Target.GetUnion()})
require.NoError(t, err)
}
tCtx.state.TotalBytes += uint64(len(encoded))
}),
},
},
}

type testContext struct {
Expand Down

0 comments on commit 951add4

Please sign in to comment.