Skip to content

Commit

Permalink
[fix] : fix #14521 (#14816)
Browse files Browse the repository at this point in the history
The previous code didn't take into account the out-of-order case such as

```sql
create table t(a int, b varchar, c varchar, primary key(c, a));
explain insert into t (b, c, a) values("b", "c", 1);
```

now fix it

Approved by: @ouyuanning, @sukki37
  • Loading branch information
jensenojs committed Mar 7, 2024
1 parent e4b1f98 commit 5d4ff58
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions pkg/sql/plan/build_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ func getPkValueExpr(builder *QueryBuilder, ctx CompilerContext, tableDef *TableD
pkColLength := len(pkPosInValues)
var colTyp *Type
var insertRowIdx int
var pkColIdx int
var pkOrder int

for insertRowIdx, pkColIdx = range pkPosInValues {
for insertRowIdx, pkOrder = range pkPosInValues {
valExprs := make([]*Expr, rowsCount)
rowTyp := bat.Vecs[insertRowIdx].GetType()
colTyp = makePlan2Type(rowTyp)
Expand Down Expand Up @@ -354,7 +354,7 @@ func getPkValueExpr(builder *QueryBuilder, ctx CompilerContext, tableDef *TableD
}
}
}
colExprs[pkColIdx] = valExprs
colExprs[pkOrder] = valExprs
}

if pkColLength == 1 {
Expand All @@ -364,7 +364,7 @@ func getPkValueExpr(builder *QueryBuilder, ctx CompilerContext, tableDef *TableD
Typ: colTyp,
Expr: &plan.Expr_Col{
Col: &ColRef{
ColPos: int32(pkColIdx),
ColPos: int32(pkOrder),
Name: tableDef.Pkey.PkeyColName,
},
},
Expand Down Expand Up @@ -393,7 +393,7 @@ func getPkValueExpr(builder *QueryBuilder, ctx CompilerContext, tableDef *TableD
Typ: colTyp,
Expr: &plan.Expr_Col{
Col: &ColRef{
ColPos: int32(pkColIdx),
ColPos: int32(pkOrder),
Name: tableDef.Pkey.PkeyColName,
},
},
Expand All @@ -414,40 +414,55 @@ func getPkValueExpr(builder *QueryBuilder, ctx CompilerContext, tableDef *TableD
return []*Expr{orExpr}
}
} else {

// insertRowIdx is the order in insert value SQL, pkColIdx is the order in tableDef.Pkey.Names
pkOrder2Idx := make(map[int]int)
for _, o := range pkPosInValues {
pkName := tableDef.Pkey.Names[o]
for i, c := range tableDef.Cols {
if c.Name == pkName {
pkOrder2Idx[o] = i
break
}
}
}

// multi cols pk & one row for insert
if rowsCount == 1 {
filterExprs := make([]*Expr, pkColLength)
for insertRowIdx, pkColIdx = range pkPosInValues {
for insertRowIdx, pkOrder = range pkPosInValues {
pkColIdx := pkOrder2Idx[pkOrder]
expr, err := BindFuncExprImplByPlanExpr(builder.GetContext(), "=", []*Expr{{
Typ: tableDef.Cols[insertRowIdx].Typ,
Expr: &plan.Expr_Col{
Col: &ColRef{
ColPos: int32(pkColIdx),
Name: tableDef.Cols[insertRowIdx].Name,
ColPos: int32(pkOrder),
Name: tableDef.Cols[pkColIdx].Name,
},
},
}, colExprs[pkColIdx][0]})
}, colExprs[pkOrder][0]})
if err != nil {
return nil
}
filterExprs[pkColIdx] = expr
filterExprs[pkOrder] = expr
}
return filterExprs
} else {
// seems serial function have poor performance. we have to use or function
var orExpr *Expr
for i := 0; i < rowsCount; i++ {
var andExpr *Expr
for insertRowIdx, pkColIdx = range pkPosInValues {
for insertRowIdx, pkOrder = range pkPosInValues {
pkColIdx := pkOrder2Idx[pkOrder]
eqExpr, err := BindFuncExprImplByPlanExpr(builder.GetContext(), "=", []*Expr{{
Typ: tableDef.Cols[insertRowIdx].Typ,
Expr: &plan.Expr_Col{
Col: &ColRef{
ColPos: int32(pkColIdx),
Name: tableDef.Cols[insertRowIdx].Name,
ColPos: int32(pkOrder),
Name: tableDef.Cols[pkColIdx].Name,
},
},
}, colExprs[pkColIdx][i]})
}, colExprs[pkOrder][i]})
if err != nil {
return nil
}
Expand Down

0 comments on commit 5d4ff58

Please sign in to comment.