Skip to content

Commit

Permalink
sql/opt: propagate row-level locking mode to index, lookup, and inver…
Browse files Browse the repository at this point in the history
…ted joins

Fixes cockroachdb#56941.

This commit updates the execbuilder to propagate row-level locking modes
through transformations from standard Scan and Join operations to
specialized IndexJoin, LookupJoin, and InvertedJoin operations.

Release note (sql change): table scans performed as a part of index
joins, lookup joins, and inverted joins now respect the row-level
locking strength and wait policy specified by the optional
FOR SHARE/UPDATE [NOWAIT] clause on SELECT statements.
  • Loading branch information
nvanbenschoten committed Apr 22, 2022
1 parent 11a0a9f commit ab365d8
Show file tree
Hide file tree
Showing 19 changed files with 706 additions and 56 deletions.
16 changes: 16 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,13 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return rec.compose(shouldDistribute), nil

case *indexJoinNode:
if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
// Index joins that are performing row-level locking cannot
// currently be distributed because their locks would not be
// propagated back to the root transaction coordinator.
// TODO(nvanbenschoten): lift this restriction.
return cannotDistribute, cannotDistributeRowLevelLockingErr
}
// n.table doesn't have meaningful spans, but we need to check support (e.g.
// for any filtering expression).
if _, err := checkSupportForPlanNode(n.table); err != nil {
Expand All @@ -467,6 +474,13 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return checkSupportForInvertedFilterNode(n)

case *invertedJoinNode:
if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
// Inverted joins that are performing row-level locking cannot
// currently be distributed because their locks would not be
// propagated back to the root transaction coordinator.
// TODO(nvanbenschoten): lift this restriction.
return cannotDistribute, cannotDistributeRowLevelLockingErr
}
if err := checkExpr(n.onExpr); err != nil {
return cannotDistribute, err
}
Expand Down Expand Up @@ -2425,6 +2439,8 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
Type: n.joinType,
MaintainOrdering: len(n.reqOrdering) > 0,
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
}

fetchColIDs := make([]descpb.ColumnID, len(n.table.cols))
Expand Down
20 changes: 10 additions & 10 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,14 @@ func (e *distSQLSpecExecFactory) ConstructScan(
if err := rowenc.InitIndexFetchSpec(&trSpec.FetchSpec, e.planner.ExecCfg().Codec, tabDesc, idx, columnIDs); err != nil {
return nil, err
}
if params.Locking.IsLocking() {
trSpec.LockingStrength = descpb.ToScanLockingStrength(params.Locking.Strength)
trSpec.LockingWaitPolicy = descpb.ToScanLockingWaitPolicy(params.Locking.WaitPolicy)
if trSpec.LockingStrength != descpb.ScanLockingStrength_FOR_NONE {
// Scans that are performing row-level locking cannot currently be
// distributed because their locks would not be propagated back to
// the root transaction coordinator.
// TODO(nvanbenschoten): lift this restriction.
recommendation = cannotDistribute
}
trSpec.LockingStrength = descpb.ToScanLockingStrength(params.Locking.Strength)
trSpec.LockingWaitPolicy = descpb.ToScanLockingWaitPolicy(params.Locking.WaitPolicy)
if trSpec.LockingStrength != descpb.ScanLockingStrength_FOR_NONE {
// Scans that are performing row-level locking cannot currently be
// distributed because their locks would not be propagated back to
// the root transaction coordinator.
// TODO(nvanbenschoten): lift this restriction.
recommendation = cannotDistribute
}

// Note that we don't do anything about the possible filter here since we
Expand Down Expand Up @@ -646,6 +644,7 @@ func (e *distSQLSpecExecFactory) ConstructIndexJoin(
keyCols []exec.NodeColumnOrdinal,
tableCols exec.TableColumnOrdinalSet,
reqOrdering exec.OutputOrdering,
locking opt.Locking,
limitHint int64,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: index join")
Expand Down Expand Up @@ -683,6 +682,7 @@ func (e *distSQLSpecExecFactory) ConstructInvertedJoin(
onCond tree.TypedExpr,
isFirstJoinInPairedJoiner bool,
reqOrdering exec.OutputOrdering,
locking opt.Locking,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: inverted join")
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ message TableReaderSpec {

// Indicates the policy to be used by the scan for handling conflicting locks
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_stength is FOR_NONE.
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 11 [(gogoproto.nullable) = false];

reserved 1, 2, 4, 6, 7, 8, 13, 14, 15, 16, 19;
Expand Down Expand Up @@ -158,7 +158,7 @@ message IndexSkipTableReaderSpec {

// Indicates the policy to be used by the scan for handling conflicting locks
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_stength is FOR_NONE.
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 7 [(gogoproto.nullable) = false];
}

Expand Down Expand Up @@ -348,7 +348,7 @@ message JoinReaderSpec {

// Indicates the policy to be used by the join for handling conflicting locks
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_stength is FOR_NONE.
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 10 [(gogoproto.nullable) = false];

// Indicates that the join reader should maintain the ordering of the input
Expand Down Expand Up @@ -709,6 +709,15 @@ message InvertedJoinerSpec {
// prefix_equality_columns should be equal to the number of non-inverted
// prefix columns in the index.
repeated uint32 prefix_equality_columns = 9 [packed = true];

// Indicates the row-level locking strength to be used by the scan. If set to
// FOR_NONE, no row-level locking should be performed.
optional sqlbase.ScanLockingStrength locking_strength = 12 [(gogoproto.nullable) = false];

// Indicates the policy to be used by the scan for handling conflicting locks
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 13 [(gogoproto.nullable) = false];
}

// InvertedFiltererSpec is the specification of a processor that does filtering
Expand Down
18 changes: 16 additions & 2 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ func (b *Builder) scanParams(
}

// Raise error if row-level locking is part of a read-only transaction.
// TODO(nvanbenschoten): this check should be shared across all expressions
// that can perform row-level locking.
if locking.IsLocking() && b.evalCtx.TxnReadOnly {
return exec.ScanParams{}, opt.ColMap{}, pgerror.Newf(pgcode.ReadOnlySQLTransaction,
"cannot execute %s in a read-only transaction", locking.Strength.String())
Expand Down Expand Up @@ -1720,9 +1722,15 @@ func (b *Builder) buildIndexJoin(join *memo.IndexJoinExpr) (execPlan, error) {

cols := join.Cols
needed, output := b.getColumns(cols, join.Table)

locking := join.Locking
if b.forceForUpdateLocking {
locking = forUpdateLocking
}

res := execPlan{outputCols: output}
res.root, err = b.factory.ConstructIndexJoin(
input.root, tab, keyCols, needed, res.reqOrdering(join), join.RequiredPhysical().LimitHintInt64(),
input.root, tab, keyCols, needed, res.reqOrdering(join), locking, join.RequiredPhysical().LimitHintInt64(),
)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -1811,7 +1819,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
tab := md.Table(join.Table)
idx := tab.Index(join.Index)

var locking opt.Locking
locking := join.Locking
if b.forceForUpdateLocking {
locking = forUpdateLocking
}
Expand Down Expand Up @@ -1926,6 +1934,11 @@ func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, erro
return execPlan{}, err
}

locking := join.Locking
if b.forceForUpdateLocking {
locking = forUpdateLocking
}

res.root, err = b.factory.ConstructInvertedJoin(
joinOpToJoinType(join.JoinType),
invertedExpr,
Expand All @@ -1937,6 +1950,7 @@ func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, erro
onExpr,
join.IsFirstJoinInPairedJoiner,
res.reqOrdering(join),
locking,
)
if err != nil {
return execPlan{}, err
Expand Down
Loading

0 comments on commit ab365d8

Please sign in to comment.