Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2378,13 +2378,17 @@ func canExecuteStatementInUncommittedTransaction(reqCtx context.Context, ses FeS
func readThenWrite(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam, writer *io.PipeWriter, mysqlRrWr MysqlRrWr, skipWrite bool, epoch uint64) (_ bool, _ time.Duration, _ time.Duration, err error) {
var readTime, writeTime time.Duration
var payload []byte
readStart := time.Now()
start := time.Now()
defer func() {
if err != nil {
mysqlRrWr.FreeLoadLocal()
}
}()
payload, err = mysqlRrWr.ReadLoadLocalPacket()
readTime = time.Since(start)
if readTime > time.Minute {
logutil.Infof("ReadLoadLocalPacket longtime %v, size %d", readTime, len(payload))
}
if err != nil {
if errors.Is(err, errorInvalidLength0) {
return skipWrite, readTime, writeTime, err
Expand All @@ -2394,23 +2398,26 @@ func readThenWrite(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam, wri
}
return skipWrite, readTime, writeTime, err
}
readTime = time.Since(readStart)

//empty packet means the file is over.
length := len(payload)
if length == 0 {
size := len(payload)
if size == 0 {
return skipWrite, readTime, writeTime, errorInvalidLength0
}
ses.CountPayload(length)
ses.CountPayload(size)

// If inner error occurs(unexpected or expected(ctrl-c)), proc.Base.LoadLocalReader will be closed.
// Then write will return error, but we need to read the rest of the data and not write it to pipe.
// So we need a flag[skipWrite] to tell us whether we need to write the data to pipe.
// https://github.com/matrixorigin/matrixone/issues/6665#issuecomment-1422236478

writeStart := time.Now()
start = time.Now()
if !skipWrite {
_, err = writer.Write(payload)
writeTime = time.Since(start)
if writeTime > time.Minute {
logutil.Infof("WritePacket longtime %v, size %d", writeTime, len(payload))
}
if err != nil {
ses.Errorf(execCtx.reqCtx,
"Failed to load local file",
Expand All @@ -2419,7 +2426,6 @@ func readThenWrite(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam, wri
zap.Error(err))
skipWrite = true
}
writeTime = time.Since(writeStart)

}
return skipWrite, readTime, writeTime, err
Expand All @@ -2443,34 +2449,36 @@ func processLoadLocal(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam,
defer func() {
close(quitC)
}()
mysqlRrWr := ses.GetResponser().MysqlRrWr()
mysqlRwer := ses.GetResponser().MysqlRrWr()
defer func() {
err2 := writer.Close()
if err == nil {
err = err2
}
//free load local buffer anyway
mysqlRrWr.FreeLoadLocal()
mysqlRwer.FreeLoadLocal()
}()
err = plan2.InitInfileParam(param)
if err != nil {
return
}
err = mysqlRrWr.WriteLocalInfileRequest(param.Filepath)
err = mysqlRwer.WriteLocalInfileRequest(param.Filepath)
if err != nil {
return
}
var skipWrite bool
skipWrite = false
var readTime, writeTime time.Duration
var retError error
start := time.Now()
epoch, printEvery, minReadTime, maxReadTime, minWriteTime, maxWriteTime := uint64(0), uint64(1024*60), 24*time.Hour, time.Nanosecond, 24*time.Hour, time.Nanosecond
epoch, printTime, minReadTime, maxReadTime, minWriteTime, maxWriteTime := uint64(0), uint64(1024*60), 24*time.Hour, time.Nanosecond, 24*time.Hour, time.Nanosecond

skipWrite, readTime, writeTime, err = readThenWrite(ses, execCtx, param, writer, mysqlRrWr, skipWrite, epoch)
skipWrite, readTime, writeTime, err = readThenWrite(ses, execCtx, param, writer, mysqlRwer, skipWrite, epoch)
if err != nil {
if errors.Is(err, errorInvalidLength0) {
return nil
}
retError = err
}
if readTime > maxReadTime {
maxReadTime = readTime
Expand All @@ -2486,13 +2494,34 @@ func processLoadLocal(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam,
minWriteTime = writeTime
}

const maxRetries = 100 // Maximum number of consecutive errors
const maxTotalTime = 3 * time.Minute // Maximum total consecutive processing time
var consecutiveErrors int
consecutiveLoopStartTime := time.Now()

for {
skipWrite, readTime, writeTime, err = readThenWrite(ses, execCtx, param, writer, mysqlRrWr, skipWrite, epoch)
skipWrite, readTime, writeTime, err = readThenWrite(ses, execCtx, param, writer, mysqlRwer, skipWrite, epoch)
if err != nil {
if errors.Is(err, errorInvalidLength0) {
if retError != nil {
err = retError
break
}
err = nil
break
}
retError = err
consecutiveErrors++
ses.Errorf(execCtx.reqCtx, "readThenWrite error (attempt %d): %v", consecutiveErrors, err)
time.Sleep(10 * time.Millisecond)

if consecutiveErrors >= maxRetries || time.Since(consecutiveLoopStartTime) > maxTotalTime {
return moerr.NewInternalErrorf(execCtx.reqCtx,
"load local file failed: consecutive errors (%d), timeout after %v", maxRetries, maxTotalTime)
}
} else {
consecutiveErrors = 0
consecutiveLoopStartTime = time.Now()
}

if readTime > maxReadTime {
Expand All @@ -2509,7 +2538,7 @@ func processLoadLocal(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam,
minWriteTime = writeTime
}

if epoch%printEvery == 0 {
if epoch%printTime == 0 {
if execCtx.isIssue3482 {
ses.Infof(execCtx.reqCtx, "load local '%s', epoch: %d, skipWrite: %v, minReadTime: %s, maxReadTime: %s, minWriteTime: %s, maxWriteTime: %s,\n", param.Filepath, epoch, skipWrite, minReadTime.String(), maxReadTime.String(), minWriteTime.String(), maxWriteTime.String())
}
Expand Down