Skip to content

Commit

Permalink
sql/opt: propagate row-level locking mode to zigzag joins
Browse files Browse the repository at this point in the history
This commit updates the execbuilder to propagate row-level locking modes
through transformations from standard Scan and Join operations to
ZigZagJoins, and allows for the use of zigzag joins when explicit
row-level locking modes are in use.

Release note (sql change): table scans performed as a part of zigzag
joins now respect the row-level locking strength and wait policy
specified by the optional FOR SHARE/UPDATE [NOWAIT] clause on SELECT
statements.
  • Loading branch information
nvanbenschoten committed Apr 22, 2022
1 parent ab365d8 commit a8b951b
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 104 deletions.
33 changes: 23 additions & 10 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,15 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *zigzagJoinNode:
for _, side := range n.sides {
if side.scan.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
// ZigZag joins that are performing row-level locking cannot
// currently be distributed because their locks would not be
// propagated back to the root transaction coordinator.
// TODO(nvanbenschoten): lift this restriction.
return cannotDistribute, cannotDistributeRowLevelLockingErr
}
}
if err := checkExpr(n.onCond); err != nil {
return cannotDistribute, err
}
Expand Down Expand Up @@ -2530,11 +2539,13 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
}

sides[i] = zigzagPlanningSide{
desc: side.scan.desc,
index: side.scan.index,
cols: side.scan.cols,
eqCols: side.eqCols,
fixedValues: valuesSpec,
desc: side.scan.desc,
index: side.scan.index,
cols: side.scan.cols,
eqCols: side.eqCols,
fixedValues: valuesSpec,
lockingStrength: side.scan.lockingStrength,
lockingWaitPolicy: side.scan.lockingWaitPolicy,
}
}

Expand All @@ -2547,11 +2558,13 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
}

type zigzagPlanningSide struct {
desc catalog.TableDescriptor
index catalog.Index
cols []catalog.Column
eqCols []int
fixedValues *execinfrapb.ValuesCoreSpec
desc catalog.TableDescriptor
index catalog.Index
cols []catalog.Column
eqCols []int
fixedValues *execinfrapb.ValuesCoreSpec
lockingStrength descpb.ScanLockingStrength
lockingWaitPolicy descpb.ScanLockingWaitPolicy
}

type zigzagPlanningInfo struct {
Expand Down
19 changes: 12 additions & 7 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ func (e *distSQLSpecExecFactory) constructZigzagJoinSide(
wantedCols exec.TableColumnOrdinalSet,
fixedVals []tree.TypedExpr,
eqCols []exec.TableColumnOrdinal,
locking opt.Locking,
) (zigzagPlanningSide, error) {
desc := table.(*optTable).desc
colCfg := makeScanColumnsConfig(table, wantedCols)
Expand All @@ -719,11 +720,13 @@ func (e *distSQLSpecExecFactory) constructZigzagJoinSide(
// TODO (cucaroach): update indexUsageStats.

return zigzagPlanningSide{
desc: desc,
index: index.(*optIndex).idx,
cols: cols,
eqCols: eqColOrdinals,
fixedValues: valuesSpec,
desc: desc,
index: index.(*optIndex).idx,
cols: cols,
eqCols: eqColOrdinals,
fixedValues: valuesSpec,
lockingStrength: descpb.ToScanLockingStrength(locking.Strength),
lockingWaitPolicy: descpb.ToScanLockingWaitPolicy(locking.WaitPolicy),
}, nil
}

Expand All @@ -733,11 +736,13 @@ func (e *distSQLSpecExecFactory) ConstructZigzagJoin(
leftCols exec.TableColumnOrdinalSet,
leftFixedVals []tree.TypedExpr,
leftEqCols []exec.TableColumnOrdinal,
leftLocking opt.Locking,
rightTable cat.Table,
rightIndex cat.Index,
rightCols exec.TableColumnOrdinalSet,
rightFixedVals []tree.TypedExpr,
rightEqCols []exec.TableColumnOrdinal,
rightLocking opt.Locking,
onCond tree.TypedExpr,
reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
Expand All @@ -747,11 +752,11 @@ func (e *distSQLSpecExecFactory) ConstructZigzagJoin(

sides := make([]zigzagPlanningSide, 2)
var err error
sides[0], err = e.constructZigzagJoinSide(planCtx, leftTable, leftIndex, leftCols, leftFixedVals, leftEqCols)
sides[0], err = e.constructZigzagJoinSide(planCtx, leftTable, leftIndex, leftCols, leftFixedVals, leftEqCols, leftLocking)
if err != nil {
return nil, err
}
sides[1], err = e.constructZigzagJoinSide(planCtx, rightTable, rightIndex, rightCols, rightFixedVals, rightEqCols)
sides[1], err = e.constructZigzagJoinSide(planCtx, rightTable, rightIndex, rightCols, rightFixedVals, rightEqCols, rightLocking)
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,15 @@ message ZigzagJoinerSpec {

// Fixed values, corresponding to a prefix of the index key columns.
optional ValuesCoreSpec fixed_values = 3 [(gogoproto.nullable) = false];

// Indicates the row-level locking strength to be used by the scan. If set to
// FOR_NONE, no row-level locking should be performed.
optional sqlbase.ScanLockingStrength locking_strength = 4 [(gogoproto.nullable) = false];

// Indicates the policy to be used by the scan for handling conflicting locks
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 5 [(gogoproto.nullable) = false];
}

repeated Side sides = 7 [(gogoproto.nullable) = false];
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1999,6 +1999,13 @@ func (b *Builder) buildZigzagJoin(join *memo.ZigzagJoinExpr) (execPlan, error) {
leftOrdinals, leftColMap := b.getColumns(leftCols, join.LeftTable)
rightOrdinals, rightColMap := b.getColumns(rightCols, join.RightTable)

leftLocking := join.LeftLocking
rightLocking := join.RightLocking
if b.forceForUpdateLocking {
leftLocking = forUpdateLocking
rightLocking = forUpdateLocking
}

allCols := joinOutputMap(leftColMap, rightColMap)

res := execPlan{outputCols: allCols}
Expand Down Expand Up @@ -2039,11 +2046,13 @@ func (b *Builder) buildZigzagJoin(join *memo.ZigzagJoinExpr) (execPlan, error) {
leftOrdinals,
leftFixedVals,
leftEqCols,
leftLocking,
rightTable,
rightIndex,
rightOrdinals,
rightFixedVals,
rightEqCols,
rightLocking,
onExpr,
res.reqOrdering(join),
)
Expand Down
96 changes: 82 additions & 14 deletions pkg/sql/opt/exec/execbuilder/testdata/select_for_update
Original file line number Diff line number Diff line change
Expand Up @@ -1824,28 +1824,29 @@ EXPLAIN (VERBOSE) SELECT * FROM inverted WHERE b @> '{1, 2}' FOR UPDATE
distribution: local
vectorized: true
·
index join
lookup join (inner)
│ columns: (a, b, c)
│ estimated row count: 12 (missing stats)
│ table: inverted@inverted_pkey
│ key columns: a
│ equality: (a) = (a)
│ equality cols are key
│ pred: b @> ARRAY[1,2]
│ locking strength: for update
└── • project
│ columns: (a)
│ estimated row count: 111 (missing stats)
│ estimated row count: 12 (missing stats)
└── • inverted filter
│ columns: (a, b_inverted_key)
│ inverted column: b_inverted_key
│ num spans: 1
└── • scan
columns: (a, b_inverted_key)
estimated row count: 111 (missing stats)
table: inverted@b_inv
spans: /1-/3
locking strength: for update
└── • zigzag join
columns: (a, b_inverted_key, a, b_inverted_key)
left table: inverted@b_inv
left columns: (a, b_inverted_key)
left fixed values: 1 column
left locking strength: for update
right table: inverted@b_inv
right columns: (a, b_inverted_key)
right fixed values: 1 column
right locking strength: for update

query T
EXPLAIN (VERBOSE) SELECT * FROM inverted WHERE b <@ '{1, 2}' FOR UPDATE
Expand Down Expand Up @@ -1916,6 +1917,73 @@ vectorized: true
spans: FULL SCAN
locking strength: for update

# ------------------------------------------------------------------------------
# Tests with zigzag joins.
# ------------------------------------------------------------------------------

statement ok
CREATE TABLE zigzag (
a INT PRIMARY KEY,
b INT,
c FLOAT,
d JSONB,
INDEX b_idx(b),
INDEX c_idx(c),
INVERTED INDEX d_idx(d)
)

query T
EXPLAIN (VERBOSE) SELECT a,b,c FROM zigzag WHERE b = 5 AND c = 6.0 FOR UPDATE
----
distribution: local
vectorized: true
·
• project
│ columns: (a, b, c)
│ estimated row count: 1 (missing stats)
└── • zigzag join
columns: (a, b, a, c)
pred: (b = 5) AND (c = 6.0)
left table: zigzag@b_idx
left columns: (a, b)
left fixed values: 1 column
left locking strength: for update
right table: zigzag@c_idx
right columns: (a, c)
right fixed values: 1 column
right locking strength: for update

query T
EXPLAIN (VERBOSE) SELECT * from zigzag where d @> '{"a": {"b": "c"}, "f": "g"}' FOR UPDATE
----
distribution: local
vectorized: true
·
• lookup join (inner)
│ columns: (a, b, c, d)
│ estimated row count: 12 (missing stats)
│ table: zigzag@zigzag_pkey
│ equality: (a) = (a)
│ equality cols are key
│ pred: d @> '{"a": {"b": "c"}, "f": "g"}'
│ locking strength: for update
└── • project
│ columns: (a)
│ estimated row count: 12 (missing stats)
└── • zigzag join
columns: (a, d_inverted_key, a, d_inverted_key)
left table: zigzag@d_idx
left columns: (a, d_inverted_key)
left fixed values: 1 column
left locking strength: for update
right table: zigzag@d_idx
right columns: (a, d_inverted_key)
right fixed values: 1 column
right locking strength: for update

# ------------------------------------------------------------------------------
# Tests with the NOWAIT lock wait policy.
# ------------------------------------------------------------------------------
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/opt/exec/explain/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,11 +666,13 @@ func (e *emitter) emitNodeAttributes(n *Node) error {
if n := len(a.LeftFixedVals); n > 0 {
ob.Attrf("left fixed values", "%d column%s", n, util.Pluralize(int64(n)))
}
e.emitLockingPolicyWithPrefix("left ", a.LeftLocking)
e.emitTableAndIndex("right table", a.RightTable, a.RightIndex)
ob.Attrf("right columns", "(%s)", printColumns(rightCols))
if n := len(a.RightFixedVals); n > 0 {
ob.Attrf("right fixed values", "%d column%s", n, util.Pluralize(int64(n)))
}
e.emitLockingPolicyWithPrefix("right ", a.RightLocking)

case invertedFilterOp:
a := n.args.(*invertedFilterArgs)
Expand Down Expand Up @@ -943,13 +945,17 @@ func (e *emitter) spansStr(table cat.Table, index cat.Index, scanParams exec.Sca
}

func (e *emitter) emitLockingPolicy(locking opt.Locking) {
e.emitLockingPolicyWithPrefix("", locking)
}

func (e *emitter) emitLockingPolicyWithPrefix(keyPrefix string, locking opt.Locking) {
strength := descpb.ToScanLockingStrength(locking.Strength)
waitPolicy := descpb.ToScanLockingWaitPolicy(locking.WaitPolicy)
if strength != descpb.ScanLockingStrength_FOR_NONE {
e.ob.Attr("locking strength", strength.PrettyString())
e.ob.Attr(keyPrefix+"locking strength", strength.PrettyString())
}
if waitPolicy != descpb.ScanLockingWaitPolicy_BLOCK {
e.ob.Attr("locking wait policy", waitPolicy.PrettyString())
e.ob.Attr(keyPrefix+"locking wait policy", waitPolicy.PrettyString())
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ define ZigzagJoin {
# corresponding 1-1 to RightEqCols.
LeftEqCols []exec.TableColumnOrdinal

# Left row-level locking properties.
LeftLocking opt.Locking

# Right table and index.
RightTable cat.Table
RightIndex cat.Index
Expand All @@ -351,6 +354,9 @@ define ZigzagJoin {
# corresponding 1-1 to LeftEqCols.
RightEqCols []exec.TableColumnOrdinal

# Right row-level locking properties.
RightLocking opt.Locking

# OnCond is an extra filter that is evaluated on the results.
# TODO(radu): remove this (it can be a separate Select).
OnCond tree.TypedExpr
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/opt/memo/expr_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) {
tp.Childf("left fixed columns: %v = %v", t.LeftFixedCols, leftVals)
tp.Childf("right fixed columns: %v = %v", t.RightFixedCols, rightVals)
}
f.formatLockingWithPrefix(tp, "left ", t.LeftLocking)
f.formatLockingWithPrefix(tp, "right ", t.RightLocking)

case *MergeJoinExpr:
if !t.Flags.Empty() {
Expand Down Expand Up @@ -1397,6 +1399,12 @@ func (f *ExprFmtCtx) formatCol(label string, id opt.ColumnID, notNullCols opt.Co
// formatLocking adds a new treeprinter child for the row-level locking policy,
// if the policy is configured to perform row-level locking.
func (f *ExprFmtCtx) formatLocking(tp treeprinter.Node, locking opt.Locking) {
f.formatLockingWithPrefix(tp, "", locking)
}

func (f *ExprFmtCtx) formatLockingWithPrefix(
tp treeprinter.Node, labelPrefix string, locking opt.Locking,
) {
if !locking.IsLocking() {
return
}
Expand Down Expand Up @@ -1424,7 +1432,7 @@ func (f *ExprFmtCtx) formatLocking(tp treeprinter.Node, locking opt.Locking) {
default:
panic(errors.AssertionFailedf("unexpected wait policy"))
}
tp.Childf("locking: %s%s", strength, wait)
tp.Childf("%slocking: %s%s", labelPrefix, strength, wait)
}

// ScanIsReverseFn is a callback that is used to figure out if a scan needs to
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/opt/ops/relational.opt
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,22 @@ define ZigzagJoinPrivate {
LeftFixedCols ColList
RightFixedCols ColList

# LeftLocking and RightLocking represent the row-level locking modes of the
# scans over the ZigzagJoin's two tables. Most zigzag joins leave these
# unset (Strength = ForNone), which indicates that no row-level locking will
# be performed while joining the two tables. Stronger locking modes are used
# by SELECT .. FOR [KEY] UPDATE/SHARE statements and by the initial row
# retrieval of DELETE and UPDATE statements.
#
# The row-level locking modes also dictates the policy used by the
# ZigzagJoin when handling conflicting locks held by other active
# transactions. Most zigzag joins leave the policies set to the default
# (WaitPolicy = Block), but different wait policies are used by SELECT ..
# FOR UPDATE/SHARE SKIP LOCKED/NOWAIT statements to react differently to
# conflicting locks.
LeftLocking Locking
RightLocking Locking

# Cols is the set of columns produced by the zigzag join. This set can
# contain columns from either side's index.
Cols ColSet
Expand Down
Loading

0 comments on commit a8b951b

Please sign in to comment.