Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix parquet parser for decimal type #1272

Merged
merged 5 commits into from Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Binary file added pkg/lightning/mydump/examples/test.parquet
Binary file not shown.
210 changes: 187 additions & 23 deletions pkg/lightning/mydump/parquet_parser.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math/big"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -191,7 +192,16 @@ func NewParquetParser(
// NOTE: the SchemaElement.Name is capitalized, SchemaHandler.Infos.ExName is the raw column name
// though in this context, there is no difference between these two fields
columns = append(columns, strings.ToLower(c.Name))
columnMetas = append(columnMetas, c)
// transfer old ConvertedType to LogicalType
columnMeta := c
if c.ConvertedType != nil && c.LogicalType == nil {
newMeta := *c
columnMeta = &newMeta
if err := convertToLogicType(columnMeta); err != nil {
return nil, err
}
}
columnMetas = append(columnMetas, columnMeta)
}
}

Expand All @@ -203,6 +213,96 @@ func NewParquetParser(
}, nil
}

func convertToLogicType(se *parquet.SchemaElement) error {
logicalType := &parquet.LogicalType{}
switch *se.ConvertedType {
case parquet.ConvertedType_UTF8:
logicalType.STRING = &parquet.StringType{}
case parquet.ConvertedType_ENUM:
logicalType.ENUM = &parquet.EnumType{}
case parquet.ConvertedType_DECIMAL:
logicalType.DECIMAL = &parquet.DecimalType{
Scale: *se.Scale,
Precision: *se.Precision,
}
case parquet.ConvertedType_DATE:
logicalType.DATE = &parquet.DateType{}
case parquet.ConvertedType_TIME_MILLIS:
logicalType.TIME = &parquet.TimeType{
Unit: &parquet.TimeUnit{
MILLIS: parquet.NewMilliSeconds(),
},
}
case parquet.ConvertedType_TIME_MICROS:
logicalType.TIME = &parquet.TimeType{
Unit: &parquet.TimeUnit{
MICROS: parquet.NewMicroSeconds(),
},
}
case parquet.ConvertedType_TIMESTAMP_MILLIS:
logicalType.TIMESTAMP = &parquet.TimestampType{
Unit: &parquet.TimeUnit{
MILLIS: parquet.NewMilliSeconds(),
},
}
case parquet.ConvertedType_TIMESTAMP_MICROS:
logicalType.TIMESTAMP = &parquet.TimestampType{
Unit: &parquet.TimeUnit{
MICROS: parquet.NewMicroSeconds(),
},
}
case parquet.ConvertedType_UINT_8:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 8,
IsSigned: false,
}
case parquet.ConvertedType_UINT_16:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 16,
IsSigned: false,
}
case parquet.ConvertedType_UINT_32:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 32,
IsSigned: false,
}
case parquet.ConvertedType_UINT_64:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 64,
IsSigned: false,
}
case parquet.ConvertedType_INT_8:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 8,
IsSigned: true,
}
case parquet.ConvertedType_INT_16:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 16,
IsSigned: true,
}
case parquet.ConvertedType_INT_32:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 32,
IsSigned: true,
}
case parquet.ConvertedType_INT_64:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 64,
IsSigned: true,
}
case parquet.ConvertedType_JSON:
logicalType.JSON = &parquet.JsonType{}
case parquet.ConvertedType_BSON:
logicalType.BSON = &parquet.BsonType{}
// case parquet.ConvertedType_INTERVAL, parquet.ConvertedType_MAP, parquet.ConvertedType_MAP_KEY_VALUE, parquet.ConvertedType_LIST:
default:
return errors.Errorf("unsupported type: '%s'", *se.ConvertedType)
}
se.LogicalType = logicalType
return nil
}

// Pos returns the currently row number of the parquet file
func (pp *ParquetParser) Pos() (pos int64, rowID int64) {
return pp.curStart + int64(pp.curIndex), pp.lastRow.RowID
Expand Down Expand Up @@ -272,67 +372,130 @@ func (pp *ParquetParser) ReadRow() error {
pp.lastRow.Row = pp.lastRow.Row[:length]
}
for i := 0; i < length; i++ {
setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i])
if err := setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]); err != nil {
return err
}
}
return nil
}

// convert a parquet value to Datum
//
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) {
func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) error {
switch v.Kind() {
case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
d.SetUint64(v.Uint())
case reflect.Int8, reflect.Int16:
d.SetInt64(v.Int())
case reflect.Int32, reflect.Int64:
setDatumByInt(d, v.Int(), meta)
return setDatumByInt(d, v.Int(), meta)
case reflect.String:
d.SetString(v.String(), "")
setDatumByString(d, v.String(), meta)
3pointer marked this conversation as resolved.
Show resolved Hide resolved
case reflect.Float32, reflect.Float64:
d.SetFloat64(v.Float())
case reflect.Ptr:
if v.IsNil() {
d.SetNull()
} else {
setDatumValue(d, v.Elem(), meta)
return setDatumValue(d, v.Elem(), meta)
}
default:
log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()),
zap.String("type", v.Type().Name()), zap.Reflect("value", v.Interface()))
return errors.Errorf("unknown value: %v", v)
glorv marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

func setDatumByString(d *types.Datum, v string, meta *parquet.SchemaElement) {
if meta.LogicalType != nil && meta.LogicalType.DECIMAL != nil {
v = binaryToDecimalStr([]byte(v), int(meta.LogicalType.DECIMAL.Scale))
}
d.SetString(v, "")
}

func binaryToDecimalStr(rawBytes []byte, scale int) string {
negative := rawBytes[0] > 127
if negative {
for i := 0; i < len(rawBytes); i++ {
rawBytes[i] = ^rawBytes[i]
}
for i := len(rawBytes) - 1; i >= 0; i-- {
rawBytes[i] += 1
if rawBytes[i] != 0 {
break
}
}
}

intValue := big.NewInt(0)
intValue = intValue.SetBytes(rawBytes)
val := fmt.Sprintf("%0*d", scale, intValue)
dotIndex := len(val) - scale
var res strings.Builder
if negative {
res.WriteByte('-')
}
if dotIndex == 0 {
res.WriteByte('0')
} else {
res.Write([]byte(val[:dotIndex]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
res.Write([]byte(val[:dotIndex]))
res.WriteString(val[:dotIndex])

}
if scale > 0 {
res.WriteByte('.')
res.Write([]byte(val[dotIndex:]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
res.Write([]byte(val[dotIndex:]))
res.WriteString(val[dotIndex:])

}
return res.String()
}

// when the value type is int32/int64, convert to value to target logical type in tidb
func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) {
if meta.ConvertedType == nil {
func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error {
if meta.ConvertedType == nil && meta.LogicalType == nil {
d.SetInt64(v)
return
return nil
}
switch *meta.ConvertedType {
// decimal
case parquet.ConvertedType_DECIMAL:
minLen := *meta.Scale + 1

logicalType := meta.LogicalType
switch {
case logicalType.DECIMAL != nil:
if logicalType.DECIMAL.Scale == 0 {
d.SetInt64(v)
return nil
}
minLen := logicalType.DECIMAL.Scale + 1
if v < 0 {
minLen++
}
val := fmt.Sprintf("%0*d", minLen, v)
dotIndex := len(val) - int(*meta.Scale)
d.SetString(val[:dotIndex]+"."+val[dotIndex:], "")
case parquet.ConvertedType_DATE:
case logicalType.DATE != nil:
dateStr := time.Unix(v*86400, 0).Format("2006-01-02")
d.SetString(dateStr, "")
// convert all timestamp types (datetime/timestamp) to string
case parquet.ConvertedType_TIMESTAMP_MICROS:
dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, "")
case parquet.ConvertedType_TIMESTAMP_MILLIS:
dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999")
case logicalType.TIMESTAMP != nil:
// convert all timestamp types (datetime/timestamp) to string
var sec, nsec int64
units := logicalType.TIMESTAMP.Unit
if units.MICROS != nil {
sec = v / 1e6
nsec = (v % 1e6) * 1e3
} else if units.MILLIS != nil {
sec = v / 1e3
nsec = (v % 1e3) * 1e6
} else {
// nano
sec = v / 1e9
nsec = v % 1e9
}
// TODO: how to deal with TimeZone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there more we need to do about time zones?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time/timestamp value read from parquet is either UTC or Local timezone (based on the IsAdjustedToUTC setting). Since currently lightning don't support set timezone (always set to target cluster's timezone). Not sure how should we deal with the unknown timezone offset if IsAdjustedToUTC = false (at least the default is true). @kennytm PTAL

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the produced string 2006-01-02 15:04:05.999999Z when isAdjustedToUTC is true, and 2006-01-02 15:04:05.999999 when it is false.

(why do we only support 3 fractional digits instead of 6?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

dateStr := time.Unix(sec, nsec).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, "")
// covert time types to string
case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS:
if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS {
case logicalType.TIME != nil:
units := logicalType.TIME.Unit
if units.NANOS != nil {
v /= 1e6
} else if units.MICROS != nil {
v /= 1e3
}
millis := v % 1e3
Expand All @@ -345,6 +508,7 @@ func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) {
default:
d.SetInt64(v)
}
return nil
}

func (pp *ParquetParser) LastRow() Row {
Expand Down