Skip to content

Commit

Permalink
Merge branch 'master' into hanfei/race
Browse files Browse the repository at this point in the history
  • Loading branch information
shenli committed May 8, 2017
2 parents e561b32 + 0d6490e commit e76e5ba
Show file tree
Hide file tree
Showing 25 changed files with 149 additions and 106 deletions.
4 changes: 2 additions & 2 deletions ddl/column.go
Expand Up @@ -347,7 +347,7 @@ func (d *ddl) backfillColumnInTxn(t table.Table, colMeta *columnMeta, handles []
return 0, errors.Trace(err)
}

rowColumns, err := tablecodec.DecodeRow(rowVal, colMeta.oldColMap)
rowColumns, err := tablecodec.DecodeRow(rowVal, colMeta.oldColMap, time.UTC)
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -364,7 +364,7 @@ func (d *ddl) backfillColumnInTxn(t table.Table, colMeta *columnMeta, handles []
}
newColumnIDs = append(newColumnIDs, colMeta.colID)
newRow = append(newRow, colMeta.defaultVal)
newRowVal, err := tablecodec.EncodeRow(newRow, newColumnIDs)
newRowVal, err := tablecodec.EncodeRow(newRow, newColumnIDs, time.UTC)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/column_test.go
Expand Up @@ -275,7 +275,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx context.Context, t table.Table,
}
colMap := make(map[int64]*types.FieldType)
colMap[col.ID] = &col.FieldType
rowMap, err := tablecodec.DecodeRow(data, colMap)
rowMap, err := tablecodec.DecodeRow(data, colMap, ctx.GetSessionVars().GetTimeZone())
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Expand Up @@ -416,7 +416,7 @@ func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, taskOpInfo *ind
cols := t.Cols()
idxInfo := taskOpInfo.tblIndex.Meta()
for i, idxRecord := range idxRecords {
rowMap, err := tablecodec.DecodeRow(rawRecords[i], taskOpInfo.colMap)
rowMap, err := tablecodec.DecodeRow(rawRecords[i], taskOpInfo.colMap, time.UTC)
if err != nil {
ret.err = errors.Trace(err)
return nil, ret
Expand Down
8 changes: 6 additions & 2 deletions distsql/xeval/eval.go
Expand Up @@ -14,6 +14,8 @@
package xeval

import (
"time"

"github.com/juju/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/mysql"
Expand Down Expand Up @@ -59,14 +61,16 @@ type Evaluator struct {
fieldTps []*types.FieldType
valueLists map[*tipb.Expr]*decodedValueList
StatementCtx *variable.StatementContext
TimeZone *time.Location
}

// NewEvaluator creates a new Evaluator instance.
func NewEvaluator(sc *variable.StatementContext) *Evaluator {
func NewEvaluator(sc *variable.StatementContext, timeZone *time.Location) *Evaluator {
return &Evaluator{
Row: make(map[int64]types.Datum),
ColIDs: make(map[int64]int),
StatementCtx: sc,
TimeZone: timeZone,
}
}

Expand Down Expand Up @@ -100,7 +104,7 @@ func (e *Evaluator) SetRowValue(handle int64, row [][]byte, relatedColIDs map[in
} else {
data := row[offset]
ft := e.fieldTps[offset]
datum, err := tablecodec.DecodeColumnValue(data, ft)
datum, err := tablecodec.DecodeColumnValue(data, ft, e.TimeZone)
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 6 additions & 4 deletions distsql/xeval/eval_control_funcs_test.go
Expand Up @@ -14,6 +14,8 @@
package xeval

import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/types"
Expand All @@ -22,7 +24,7 @@ import (

func (s *testEvalSuite) TestEvalCaseWhen(c *C) {
colID := int64(1)
xevaluator := NewEvaluator(new(variable.StatementContext))
xevaluator := NewEvaluator(new(variable.StatementContext), time.Local)
xevaluator.Row[colID] = types.NewIntDatum(100)
trueCond := types.NewIntDatum(1)
falseCond := types.NewIntDatum(0)
Expand Down Expand Up @@ -85,7 +87,7 @@ func (s *testEvalSuite) TestEvalCaseWhen(c *C) {

func (s *testEvalSuite) TestEvalIf(c *C) {
colID := int64(1)
xevaluator := NewEvaluator(new(variable.StatementContext))
xevaluator := NewEvaluator(new(variable.StatementContext), time.Local)
xevaluator.Row[colID] = types.NewIntDatum(100)
trueCond, falseCond, null := types.NewIntDatum(1), types.NewIntDatum(0), types.Datum{}
expr1, expr2 := types.NewStringDatum("expr1"), types.NewStringDatum("expr2")
Expand Down Expand Up @@ -142,7 +144,7 @@ func (s *testEvalSuite) TestEvalIf(c *C) {

func (s *testEvalSuite) TestEvalNullIf(c *C) {
colID := int64(1)
xevaluator := NewEvaluator(new(variable.StatementContext))
xevaluator := NewEvaluator(new(variable.StatementContext), time.Local)
xevaluator.Row[colID] = types.NewDatum(100)
null := types.Datum{}
tests := []struct {
Expand Down Expand Up @@ -182,7 +184,7 @@ func (s *testEvalSuite) TestEvalNullIf(c *C) {

func (s *testEvalSuite) TestEvalIfNull(c *C) {
colID := int64(1)
xevaluator := NewEvaluator(new(variable.StatementContext))
xevaluator := NewEvaluator(new(variable.StatementContext), time.Local)
xevaluator.Row[colID] = types.NewDatum(100)
null, notNull, expr := types.Datum{}, types.NewStringDatum("left"), types.NewStringDatum("right")
tests := []struct {
Expand Down
8 changes: 4 additions & 4 deletions distsql/xeval/eval_test.go
Expand Up @@ -36,7 +36,7 @@ type testEvalSuite struct{}
// TODO: add more tests.
func (s *testEvalSuite) TestEval(c *C) {
colID := int64(1)
xevaluator := NewEvaluator(new(variable.StatementContext))
xevaluator := NewEvaluator(new(variable.StatementContext), time.Local)
xevaluator.Row[colID] = types.NewIntDatum(100)
tests := []struct {
expr *tipb.Expr
Expand Down Expand Up @@ -406,7 +406,7 @@ func (s *testEvalSuite) TestLike(c *C) {
result: 0,
},
}
ev := NewEvaluator(new(variable.StatementContext))
ev := NewEvaluator(new(variable.StatementContext), time.Local)
for _, tt := range tests {
res, err := ev.Eval(tt.expr)
c.Check(err, IsNil)
Expand Down Expand Up @@ -452,7 +452,7 @@ func (s *testEvalSuite) TestWhereIn(c *C) {
result: false,
},
}
ev := NewEvaluator(new(variable.StatementContext))
ev := NewEvaluator(new(variable.StatementContext), time.Local)
for _, tt := range tests {
res, err := ev.Eval(tt.expr)
c.Check(err, IsNil)
Expand All @@ -471,7 +471,7 @@ func (s *testEvalSuite) TestWhereIn(c *C) {

func (s *testEvalSuite) TestEvalIsNull(c *C) {
colID := int64(1)
xevaluator := NewEvaluator(new(variable.StatementContext))
xevaluator := NewEvaluator(new(variable.StatementContext), time.Local)
xevaluator.Row[colID] = types.NewIntDatum(100)
null, trueAns, falseAns := types.Datum{}, types.NewIntDatum(1), types.NewIntDatum(0)
tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Expand Up @@ -832,7 +832,7 @@ func (b *executorBuilder) buildAnalyze(v *plan.Analyze) Executor {
func (b *executorBuilder) constructDAGReq(plans []plan.PhysicalPlan) *tipb.DAGRequest {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = b.getStartTS()
dagReq.TimeZoneOffset = timeZoneOffset()
dagReq.TimeZoneOffset = timeZoneOffset(b.ctx)
sc := b.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for _, p := range plans {
Expand Down
23 changes: 12 additions & 11 deletions executor/distsql.go
Expand Up @@ -465,7 +465,7 @@ func (e *XSelectIndexExec) nextForSingleRead() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, schema)
err = decodeRawValues(values, schema, e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -477,11 +477,11 @@ func (e *XSelectIndexExec) nextForSingleRead() (*Row, error) {
}
}

func decodeRawValues(values []types.Datum, schema *expression.Schema) error {
func decodeRawValues(values []types.Datum, schema *expression.Schema, loc *time.Location) error {
var err error
for i := 0; i < schema.Len(); i++ {
if values[i].Kind() == types.KindRaw {
values[i], err = tablecodec.DecodeColumnValue(values[i].GetRaw(), schema.Columns[i].RetType)
values[i], err = tablecodec.DecodeColumnValue(values[i].GetRaw(), schema.Columns[i].RetType, loc)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -607,7 +607,7 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan<
func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
selIdxReq := new(tipb.SelectRequest)
selIdxReq.StartTs = e.startTS
selIdxReq.TimeZoneOffset = timeZoneOffset()
selIdxReq.TimeZoneOffset = timeZoneOffset(e.ctx)
selIdxReq.Flags = statementContextToFlags(e.ctx.GetSessionVars().StmtCtx)
selIdxReq.IndexInfo = distsql.IndexToProto(e.table.Meta(), e.index)
if e.desc {
Expand Down Expand Up @@ -748,7 +748,7 @@ func (e *XSelectIndexExec) extractRowsFromPartialResult(t table.Table, partialRe
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.Schema())
err = decodeRawValues(values, e.Schema(), e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -766,7 +766,7 @@ func (e *XSelectIndexExec) doTableRequest(handles []int64) (distsql.SelectResult
selTableReq.OrderBy = e.sortItemsPB
}
selTableReq.StartTs = e.startTS
selTableReq.TimeZoneOffset = timeZoneOffset()
selTableReq.TimeZoneOffset = timeZoneOffset(e.ctx)
selTableReq.Flags = statementContextToFlags(e.ctx.GetSessionVars().StmtCtx)
selTableReq.TableInfo = &tipb.TableInfo{
TableId: e.table.Meta().ID,
Expand Down Expand Up @@ -841,7 +841,7 @@ func (e *XSelectTableExec) Schema() *expression.Schema {
func (e *XSelectTableExec) doRequest() error {
selReq := new(tipb.SelectRequest)
selReq.StartTs = e.startTS
selReq.TimeZoneOffset = timeZoneOffset()
selReq.TimeZoneOffset = timeZoneOffset(e.ctx)
selReq.Flags = statementContextToFlags(e.ctx.GetSessionVars().StmtCtx)
selReq.Where = e.where
selReq.TableInfo = &tipb.TableInfo{
Expand Down Expand Up @@ -932,7 +932,7 @@ func (e *XSelectTableExec) Next() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.schema)
err = decodeRawValues(values, e.schema, e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -951,8 +951,9 @@ func (e *XSelectTableExec) slowQueryInfo(duration time.Duration) string {
}

// timeZoneOffset returns the local time zone offset in seconds.
func timeZoneOffset() int64 {
_, offset := time.Now().Zone()
func timeZoneOffset(ctx context.Context) int64 {
loc := ctx.GetSessionVars().GetTimeZone()
_, offset := time.Now().In(loc).Zone()
return int64(offset)
}

Expand Down Expand Up @@ -982,7 +983,7 @@ func setPBColumnsDefaultValue(ctx context.Context, pbColumns []*tipb.ColumnInfo,
return errors.Trace(err)
}

pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(d)
pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(d, ctx.GetSessionVars().GetTimeZone())
if err != nil {
return errors.Trace(err)
}
Expand Down
25 changes: 25 additions & 0 deletions executor/executor_test.go
Expand Up @@ -1360,3 +1360,28 @@ func (s *testSuite) TestConvertToBit(c *C) {
tk.MustExec(`insert t select a from t1`)
tk.MustQuery("select a+0 from t").Check(testkit.Rows("20090101000000"))
}

func (s *testSuite) TestTimestampTimeZone(c *C) {
defer func() {
s.cleanEnv(c)
testleak.AfterTest(c)()
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t (ts timestamp)")
tk.MustExec("set time_zone = '+00:00'")
tk.MustExec("insert into t values ('2017-04-27 22:40:42')")
// The timestamp will get different value if time_zone session variable changes.
tests := []struct {
timezone string
expect string
}{
{"+10:00", "2017-04-28 08:40:42"},
{"-6:00", "2017-04-27 16:40:42"},
}
for _, tt := range tests {
tk.MustExec(fmt.Sprintf("set time_zone = '%s'", tt.timezone))
tk.MustQuery(fmt.Sprintf("select * from t where ts = '%s'", tt.expect)).Check(testkit.Rows(tt.expect))
}
}
2 changes: 1 addition & 1 deletion executor/join.go
Expand Up @@ -302,7 +302,7 @@ func (e *HashJoinExec) decodeRow(data []byte) (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.smallExec.Schema())
err = decodeRawValues(values, e.smallExec.Schema(), e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/new_distsql.go
Expand Up @@ -91,7 +91,7 @@ func (e *TableReaderExecutor) Next() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.schema)
err = decodeRawValues(values, e.schema, e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (e *IndexReaderExecutor) Next() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.schema)
err = decodeRawValues(values, e.schema, e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions inspectkv/inspectkv.go
Expand Up @@ -16,6 +16,7 @@ package inspectkv
import (
"io"
"reflect"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
Expand Down Expand Up @@ -404,7 +405,7 @@ func rowWithCols(txn kv.Retriever, t table.Table, h int64, cols []*table.Column)
}
colTps[col.ID] = &col.FieldType
}
row, err := tablecodec.DecodeRow(value, colTps)
row, err := tablecodec.DecodeRow(value, colTps, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -456,7 +457,7 @@ func iterRecords(retriever kv.Retriever, t table.Table, startKey kv.Key, cols []
return errors.Trace(err)
}

rowMap, err := tablecodec.DecodeRow(it.Value(), colMap)
rowMap, err := tablecodec.DecodeRow(it.Value(), colMap, time.UTC)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion inspectkv/inspectkv_test.go
Expand Up @@ -16,6 +16,7 @@ package inspectkv
import (
"fmt"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -414,7 +415,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
func setColValue(c *C, txn kv.Transaction, key kv.Key, v types.Datum) {
row := []types.Datum{v, {}}
colIDs := []int64{2, 3}
value, err := tablecodec.EncodeRow(row, colIDs)
value, err := tablecodec.EncodeRow(row, colIDs, time.UTC)
c.Assert(err, IsNil)
err = txn.Set(key, value)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion plan/plan_to_pb.go
Expand Up @@ -114,7 +114,7 @@ func setPBColumnsDefaultValue(ctx context.Context, pbColumns []*tipb.ColumnInfo,
return errors.Trace(err)
}

pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(d)
pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(d, ctx.GetSessionVars().GetTimeZone())
if err != nil {
return errors.Trace(err)
}
Expand Down
9 changes: 9 additions & 0 deletions sessionctx/variable/session.go
Expand Up @@ -286,6 +286,15 @@ func (s *SessionVars) GetNextPreparedStmtID() uint32 {
return s.preparedStmtID
}

// GetTimeZone returns the value of time_zone session variable.
func (s *SessionVars) GetTimeZone() *time.Location {
loc := s.TimeZone
if loc == nil {
loc = time.Local
}
return loc
}

// special session variables.
const (
SQLModeVar = "sql_mode"
Expand Down

0 comments on commit e76e5ba

Please sign in to comment.