Skip to content

Commit

Permalink
copy: add a session variable for the number of retries per batch
Browse files Browse the repository at this point in the history
Previously, for every batch of the non-atomic COPY would get retried up
to 5 times (meaning there would be at most 6 attempts). This commit
introduces a session variable that can control this number - this might
come handy in the future as an additional knob to make COPY succeed
(e.g. in the context of DMS).

Release note: None
  • Loading branch information
yuzefovich committed Jan 6, 2024
1 parent 12589a1 commit 1c1d313
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 1 deletion.
8 changes: 7 additions & 1 deletion pkg/sql/copy_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,13 @@ func (c *copyMachine) insertRows(ctx context.Context, finalBatch bool) error {
var err error

rOpts := base.DefaultRetryOptions()
rOpts.MaxRetries = 5
rOpts.MaxRetries = int(c.p.SessionData().CopyNumRetriesPerBatch)
if rOpts.MaxRetries < 1 {
// MaxRetries == 0 means infinite number of attempts, and although
// CopyNumRetriesPerBatch should always be a positive number, let's be
// careful here.
rOpts.MaxRetries = 1
}
r := retry.StartWithCtx(ctx, rOpts)
for r.Next() {
if err = c.insertRowsInternal(ctx, finalBatch); err == nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3421,6 +3421,10 @@ func (m *sessionDataMutator) SetCopyWritePipeliningEnabled(val bool) {
m.data.CopyWritePipeliningEnabled = val
}

func (m *sessionDataMutator) SetCopyNumRetriesPerBatch(val int32) {
m.data.CopyNumRetriesPerBatch = val
}

func (m *sessionDataMutator) SetOptSplitScanLimit(val int32) {
m.data.OptSplitScanLimit = val
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -5488,6 +5488,7 @@ client_encoding UTF8
client_min_messages notice
copy_from_atomic_enabled on
copy_from_retries_enabled on
copy_num_retries_per_batch 5
copy_transaction_quality_of_service background
copy_write_pipelining_enabled off
cost_scans_with_default_col_size off
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2801,6 +2801,7 @@ client_encoding UTF8 N
client_min_messages notice NULL NULL NULL string
copy_from_atomic_enabled on NULL NULL NULL string
copy_from_retries_enabled on NULL NULL NULL string
copy_num_retries_per_batch 5 NULL NULL NULL string
copy_transaction_quality_of_service background NULL NULL NULL string
copy_write_pipelining_enabled off NULL NULL NULL string
cost_scans_with_default_col_size off NULL NULL NULL string
Expand Down Expand Up @@ -2974,6 +2975,7 @@ client_encoding UTF8 N
client_min_messages notice NULL user NULL notice notice
copy_from_atomic_enabled on NULL user NULL on on
copy_from_retries_enabled on NULL user NULL on on
copy_num_retries_per_batch 5 NULL user NULL 5 5
copy_transaction_quality_of_service background NULL user NULL background background
copy_write_pipelining_enabled off NULL user NULL off off
cost_scans_with_default_col_size off NULL user NULL off off
Expand Down Expand Up @@ -3141,6 +3143,7 @@ client_min_messages NULL NULL NULL
copy_fast_path_enabled NULL NULL NULL NULL NULL
copy_from_atomic_enabled NULL NULL NULL NULL NULL
copy_from_retries_enabled NULL NULL NULL NULL NULL
copy_num_retries_per_batch NULL NULL NULL NULL NULL
copy_transaction_quality_of_service NULL NULL NULL NULL NULL
copy_write_pipelining_enabled NULL NULL NULL NULL NULL
cost_scans_with_default_col_size NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ client_encoding UTF8
client_min_messages notice
copy_from_atomic_enabled on
copy_from_retries_enabled on
copy_num_retries_per_batch 5
copy_transaction_quality_of_service background
copy_write_pipelining_enabled off
cost_scans_with_default_col_size off
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,9 @@ message LocalOnlySessionData {
// query plans with merge joins. When false, the optimizer does not attempt
// to plan merge joins.
bool optimizer_merge_joins_enabled = 119;
// CopyNumRetriesPerBatch determines the number of times a single batch of
// rows can be retried for non-atomic COPY.
int32 copy_num_retries_per_batch = 120;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
27 changes: 27 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,33 @@ var varGen = map[string]sessionVar{
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`copy_num_retries_per_batch`: {
GetStringVal: makePostgresBoolGetStringValFn(`copy_num_retries_per_batch`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
if b <= 0 {
return pgerror.Newf(pgcode.InvalidParameterValue,
"copy_num_retries_per_batch must be a positive value: %d", b)
}
if b > math.MaxInt32 {
return pgerror.Newf(pgcode.InvalidParameterValue,
"cannot set copy_num_retries_per_batch to a value greater than %d: %d", math.MaxInt32, b)
}
m.SetCopyNumRetriesPerBatch(int32(b))
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return strconv.FormatInt(int64(evalCtx.SessionData().CopyNumRetriesPerBatch), 10), nil
},
GlobalDefault: func(sv *settings.Values) string {
return "5"
},
},

// CockroachDB extension.
`opt_split_scan_limit`: {
GetStringVal: makeIntGetStringValFn(`opt_split_scan_limit`),
Expand Down

0 comments on commit 1c1d313

Please sign in to comment.