From f435599951d1049ee00533ef02b0090d92867f5e Mon Sep 17 00:00:00 2001 From: chenmingsong Date: Sat, 30 Dec 2023 15:55:24 +0800 Subject: [PATCH] [cherry-pick] pr #13909 fix some unhandled error and redundant codes (#13910) some error was unhandled and it may cause hung. removed some unused codes like redundant type conversion. Approved by: @nnsgmsone, @sukki37 --- pkg/sql/compile/compile.go | 6 +++++- pkg/sql/compile/ddl.go | 8 ++++---- pkg/sql/compile/fuzzyCheck.go | 13 +++++++++---- pkg/sql/compile/runtime_filter.go | 2 +- pkg/sql/compile/scope.go | 6 +++++- pkg/sql/compile/types.go | 2 +- pkg/sql/compile/util.go | 4 ++-- 7 files changed, 27 insertions(+), 14 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 060933fd7320..48869bba4a94 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -528,7 +528,7 @@ func (c *Compile) runOnce() error { for i := range c.scope { wg.Add(1) scope := c.scope[i] - ants.Submit(func() { + errSubmit := ants.Submit(func() { defer func() { if e := recover(); e != nil { err := moerr.ConvertPanicError(c.ctx, e) @@ -541,6 +541,10 @@ func (c *Compile) runOnce() error { }() errC <- c.run(scope) }) + if errSubmit != nil { + errC <- errSubmit + wg.Done() + } } wg.Wait() close(errC) diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index be95102f8ffe..a362aa0192bf 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -464,7 +464,7 @@ func (s *Scope) AlterTableInplace(c *Compile) error { } if !originHasIndexDef && addIndex != nil { newCt.Cts = append(newCt.Cts, &engine.IndexDef{ - Indexes: []*plan.IndexDef(addIndex), + Indexes: addIndex, }) } @@ -1331,7 +1331,7 @@ func (s *Scope) TruncateTable(c *Compile) error { for _, name := range tqry.PartitionTableNames { var err error if isTemp { - dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, name)) + _, err = dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, name)) } else { _, err = dbSource.Truncate(c.ctx, name) } @@ -2268,8 +2268,8 @@ func makeAlterSequenceParam[T constraints.Integer](ctx context.Context, stmt *tr preStartWith := preLastSeqNum if stmt.StartWith != nil { startNum = getValue[T](stmt.StartWith.Minus, stmt.StartWith.Num) - if startNum < T(preStartWith) { - startNum = T(preStartWith) + if startNum < preStartWith { + startNum = preStartWith } } else { startNum = getInterfaceValue[T](preStartWith) diff --git a/pkg/sql/compile/fuzzyCheck.go b/pkg/sql/compile/fuzzyCheck.go index 5f101050cb85..94e3c3dffc54 100644 --- a/pkg/sql/compile/fuzzyCheck.go +++ b/pkg/sql/compile/fuzzyCheck.go @@ -4,13 +4,14 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package compile import ( @@ -132,7 +133,9 @@ func (f *fuzzyCheck) fill(ctx context.Context, bat *batch.Batch) error { // the last condition does not need to be followed by "and" one.WriteString(fmt.Sprintf("%s = %s", cAttrs[j], pkeys[j][i])) one.WriteByte(')') - one.WriteTo(&all) + if _, err = one.WriteTo(&all); err != nil { + return err + } // use or join each compound primary keys all.WriteString(" or ") @@ -144,7 +147,9 @@ func (f *fuzzyCheck) fill(ctx context.Context, bat *batch.Batch) error { } one.WriteString(fmt.Sprintf("%s = %s", cAttrs[j], pkeys[j][lastRow])) - one.WriteTo(&all) + if _, err = one.WriteTo(&all); err != nil { + return err + } f.condition = all.String() } @@ -377,7 +382,7 @@ func (f *fuzzyCheck) formatNonCompound(toCheck *vector.Vector, useInErr bool) ([ // datime time and timestamp type can not split by space directly func (f *fuzzyCheck) handletimesType(toCheck *vector.Vector) []string { - result := []string{} + result := make([]string, 0, toCheck.Length()) typ := toCheck.GetType() if typ.Oid == types.T_timestamp { diff --git a/pkg/sql/compile/runtime_filter.go b/pkg/sql/compile/runtime_filter.go index 68eafdf17b7d..68f9f7233430 100644 --- a/pkg/sql/compile/runtime_filter.go +++ b/pkg/sql/compile/runtime_filter.go @@ -76,7 +76,7 @@ func ApplyRuntimeFilters( case pipeline.RuntimeFilter_MIN_MAX: evaluators[i] = &RuntimeZonemapFilter{ - Zm: objectio.ZoneMap(filter.Data), + Zm: filter.Data, } } } diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 36ca4efafb56..e0914fab3d58 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -114,7 +114,7 @@ func (s *Scope) MergeRun(c *Compile) error { for i := range s.PreScopes { wg.Add(1) scope := s.PreScopes[i] - ants.Submit(func() { + errSubmit := ants.Submit(func() { defer func() { if e := recover(); e != nil { err := moerr.ConvertPanicError(c.ctx, e) @@ -136,6 +136,10 @@ func (s *Scope) MergeRun(c *Compile) error { errChan <- scope.ParallelRun(c, scope.IsRemote) } }) + if errSubmit != nil { + errChan <- errSubmit + wg.Done() + } } defer wg.Wait() diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index d416388264a2..80d55b9cc5ad 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -68,7 +68,7 @@ const ( Replace ) -// Source contains information of a relation which will be used in execution, +// Source contains information of a relation which will be used in execution. type Source struct { PushdownId uint64 PushdownAddr string diff --git a/pkg/sql/compile/util.go b/pkg/sql/compile/util.go index f421a9a4ef3b..a858d51be990 100644 --- a/pkg/sql/compile/util.go +++ b/pkg/sql/compile/util.go @@ -169,8 +169,8 @@ func genInsertMOIndexesSql(eg engine.Engine, proc *process.Process, databaseId s case *engine.IndexDef: for _, indexdef := range def.Indexes { ctx, cancelFunc := context.WithTimeout(proc.Ctx, time.Second*30) - defer cancelFunc() index_id, err := eg.AllocateIDByKey(ctx, ALLOCID_INDEX_KEY) + cancelFunc() if err != nil { return "", err } @@ -247,8 +247,8 @@ func genInsertMOIndexesSql(eg engine.Engine, proc *process.Process, databaseId s } case *engine.PrimaryKeyDef: ctx, cancelFunc := context.WithTimeout(proc.Ctx, time.Second*30) - defer cancelFunc() index_id, err := eg.AllocateIDByKey(ctx, ALLOCID_INDEX_KEY) + cancelFunc() if err != nil { return "", err }