diff --git a/pkg/lightning/mydump/examples/test.parquet b/pkg/lightning/mydump/examples/test.parquet new file mode 100644 index 000000000..3e5c48978 Binary files /dev/null and b/pkg/lightning/mydump/examples/test.parquet differ diff --git a/pkg/lightning/mydump/parquet_parser.go b/pkg/lightning/mydump/parquet_parser.go index 3a40f60cb..303ea42ca 100644 --- a/pkg/lightning/mydump/parquet_parser.go +++ b/pkg/lightning/mydump/parquet_parser.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "math/big" "reflect" "strings" "time" @@ -191,7 +192,16 @@ func NewParquetParser( // NOTE: the SchemaElement.Name is capitalized, SchemaHandler.Infos.ExName is the raw column name // though in this context, there is no difference between these two fields columns = append(columns, strings.ToLower(c.Name)) - columnMetas = append(columnMetas, c) + // transfer old ConvertedType to LogicalType + columnMeta := c + if c.ConvertedType != nil && c.LogicalType == nil { + newMeta := *c + columnMeta = &newMeta + if err := convertToLogicType(columnMeta); err != nil { + return nil, err + } + } + columnMetas = append(columnMetas, columnMeta) } } @@ -203,6 +213,100 @@ func NewParquetParser( }, nil } +func convertToLogicType(se *parquet.SchemaElement) error { + logicalType := &parquet.LogicalType{} + switch *se.ConvertedType { + case parquet.ConvertedType_UTF8: + logicalType.STRING = &parquet.StringType{} + case parquet.ConvertedType_ENUM: + logicalType.ENUM = &parquet.EnumType{} + case parquet.ConvertedType_DECIMAL: + logicalType.DECIMAL = &parquet.DecimalType{ + Scale: *se.Scale, + Precision: *se.Precision, + } + case parquet.ConvertedType_DATE: + logicalType.DATE = &parquet.DateType{} + case parquet.ConvertedType_TIME_MILLIS: + logicalType.TIME = &parquet.TimeType{ + IsAdjustedToUTC: true, + Unit: &parquet.TimeUnit{ + MILLIS: parquet.NewMilliSeconds(), + }, + } + case parquet.ConvertedType_TIME_MICROS: + logicalType.TIME = &parquet.TimeType{ + IsAdjustedToUTC: true, + Unit: &parquet.TimeUnit{ + MICROS: parquet.NewMicroSeconds(), + }, + } + case parquet.ConvertedType_TIMESTAMP_MILLIS: + logicalType.TIMESTAMP = &parquet.TimestampType{ + IsAdjustedToUTC: true, + Unit: &parquet.TimeUnit{ + MILLIS: parquet.NewMilliSeconds(), + }, + } + case parquet.ConvertedType_TIMESTAMP_MICROS: + logicalType.TIMESTAMP = &parquet.TimestampType{ + IsAdjustedToUTC: true, + Unit: &parquet.TimeUnit{ + MICROS: parquet.NewMicroSeconds(), + }, + } + case parquet.ConvertedType_UINT_8: + logicalType.INTEGER = &parquet.IntType{ + BitWidth: 8, + IsSigned: false, + } + case parquet.ConvertedType_UINT_16: + logicalType.INTEGER = &parquet.IntType{ + BitWidth: 16, + IsSigned: false, + } + case parquet.ConvertedType_UINT_32: + logicalType.INTEGER = &parquet.IntType{ + BitWidth: 32, + IsSigned: false, + } + case parquet.ConvertedType_UINT_64: + logicalType.INTEGER = &parquet.IntType{ + BitWidth: 64, + IsSigned: false, + } + case parquet.ConvertedType_INT_8: + logicalType.INTEGER = &parquet.IntType{ + BitWidth: 8, + IsSigned: true, + } + case parquet.ConvertedType_INT_16: + logicalType.INTEGER = &parquet.IntType{ + BitWidth: 16, + IsSigned: true, + } + case parquet.ConvertedType_INT_32: + logicalType.INTEGER = &parquet.IntType{ + BitWidth: 32, + IsSigned: true, + } + case parquet.ConvertedType_INT_64: + logicalType.INTEGER = &parquet.IntType{ + BitWidth: 64, + IsSigned: true, + } + case parquet.ConvertedType_JSON: + logicalType.JSON = &parquet.JsonType{} + case parquet.ConvertedType_BSON: + logicalType.BSON = &parquet.BsonType{} + // case parquet.ConvertedType_INTERVAL, parquet.ConvertedType_MAP, parquet.ConvertedType_MAP_KEY_VALUE, parquet.ConvertedType_LIST: + default: + return errors.Errorf("unsupported type: '%s'", *se.ConvertedType) + } + se.LogicalType = logicalType + return nil +} + // Pos returns the currently row number of the parquet file func (pp *ParquetParser) Pos() (pos int64, rowID int64) { return pp.curStart + int64(pp.curIndex), pp.lastRow.RowID @@ -272,7 +376,9 @@ func (pp *ParquetParser) ReadRow() error { pp.lastRow.Row = pp.lastRow.Row[:length] } for i := 0; i < length; i++ { - setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]) + if err := setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]); err != nil { + return err + } } return nil } @@ -280,71 +386,131 @@ func (pp *ParquetParser) ReadRow() error { // convert a parquet value to Datum // // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md -func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) { +func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) error { switch v.Kind() { case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: d.SetUint64(v.Uint()) case reflect.Int8, reflect.Int16: d.SetInt64(v.Int()) case reflect.Int32, reflect.Int64: - setDatumByInt(d, v.Int(), meta) + return setDatumByInt(d, v.Int(), meta) case reflect.String: - d.SetString(v.String(), "") + setDatumByString(d, v.String(), meta) case reflect.Float32, reflect.Float64: d.SetFloat64(v.Float()) case reflect.Ptr: if v.IsNil() { d.SetNull() } else { - setDatumValue(d, v.Elem(), meta) + return setDatumValue(d, v.Elem(), meta) } default: - log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()), + log.L().Error("unknown value", zap.Stringer("kind", v.Kind()), zap.String("type", v.Type().Name()), zap.Reflect("value", v.Interface())) + return errors.Errorf("unknown value: %v", v) + } + return nil +} + +func setDatumByString(d *types.Datum, v string, meta *parquet.SchemaElement) { + if meta.LogicalType != nil && meta.LogicalType.DECIMAL != nil { + v = binaryToDecimalStr([]byte(v), int(meta.LogicalType.DECIMAL.Scale)) } + d.SetString(v, "") +} + +func binaryToDecimalStr(rawBytes []byte, scale int) string { + negative := rawBytes[0] > 127 + if negative { + for i := 0; i < len(rawBytes); i++ { + rawBytes[i] = ^rawBytes[i] + } + for i := len(rawBytes) - 1; i >= 0; i-- { + rawBytes[i] += 1 + if rawBytes[i] != 0 { + break + } + } + } + + intValue := big.NewInt(0) + intValue = intValue.SetBytes(rawBytes) + val := fmt.Sprintf("%0*d", scale, intValue) + dotIndex := len(val) - scale + var res strings.Builder + if negative { + res.WriteByte('-') + } + if dotIndex == 0 { + res.WriteByte('0') + } else { + res.WriteString(val[:dotIndex]) + } + if scale > 0 { + res.WriteByte('.') + res.WriteString(val[dotIndex:]) + } + return res.String() } // when the value type is int32/int64, convert to value to target logical type in tidb -func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) { - if meta.ConvertedType == nil { +func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error { + if meta.ConvertedType == nil && meta.LogicalType == nil { d.SetInt64(v) - return + return nil } - switch *meta.ConvertedType { - // decimal - case parquet.ConvertedType_DECIMAL: - minLen := *meta.Scale + 1 + + logicalType := meta.LogicalType + switch { + case logicalType.DECIMAL != nil: + if logicalType.DECIMAL.Scale == 0 { + d.SetInt64(v) + return nil + } + minLen := logicalType.DECIMAL.Scale + 1 if v < 0 { minLen++ } val := fmt.Sprintf("%0*d", minLen, v) dotIndex := len(val) - int(*meta.Scale) d.SetString(val[:dotIndex]+"."+val[dotIndex:], "") - case parquet.ConvertedType_DATE: + case logicalType.DATE != nil: dateStr := time.Unix(v*86400, 0).Format("2006-01-02") d.SetString(dateStr, "") - // convert all timestamp types (datetime/timestamp) to string - case parquet.ConvertedType_TIMESTAMP_MICROS: - dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999") - d.SetString(dateStr, "") - case parquet.ConvertedType_TIMESTAMP_MILLIS: - dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999") - d.SetString(dateStr, "") - // covert time types to string - case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS: - if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS { - v /= 1e3 - } - millis := v % 1e3 - v /= 1e3 - sec := v % 60 - v /= 60 - min := v % 60 - v /= 60 - d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), "") + case logicalType.TIMESTAMP != nil: + // convert all timestamp types (datetime/timestamp) to string + timeStr := formatTime(v, logicalType.TIMESTAMP.Unit, "2006-01-02 15:04:05.999999", + "2006-01-02 15:04:05.999999Z", logicalType.TIMESTAMP.IsAdjustedToUTC) + d.SetString(timeStr, "") + case logicalType.TIME != nil: + // convert all timestamp types (datetime/timestamp) to string + timeStr := formatTime(v, logicalType.TIME.Unit, "15:04:05.999999", "15:04:05.999999Z", + logicalType.TIME.IsAdjustedToUTC) + d.SetString(timeStr, "") default: d.SetInt64(v) } + return nil +} + +func formatTime(v int64, units *parquet.TimeUnit, format, utcFormat string, utc bool) string { + var sec, nsec int64 + if units.MICROS != nil { + sec = v / 1e6 + nsec = (v % 1e6) * 1e3 + } else if units.MILLIS != nil { + sec = v / 1e3 + nsec = (v % 1e3) * 1e6 + } else { + // nano + sec = v / 1e9 + nsec = v % 1e9 + } + t := time.Unix(sec, nsec).UTC() + if utc { + return t.Format(utcFormat) + } + return t.Format(format) } func (pp *ParquetParser) LastRow() Row { diff --git a/pkg/lightning/mydump/parquet_parser_test.go b/pkg/lightning/mydump/parquet_parser_test.go index d86136a65..b6f280b9c 100644 --- a/pkg/lightning/mydump/parquet_parser_test.go +++ b/pkg/lightning/mydump/parquet_parser_test.go @@ -97,12 +97,10 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { TimestampMillis int64 `parquet:"name=timestampmillis, type=TIMESTAMP_MILLIS"` TimestampMicros int64 `parquet:"name=timestampmicros, type=TIMESTAMP_MICROS"` - Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"` - Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"` - Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"` - Decimal4 string `parquet:"name=decimal4, type=DECIMAL, scale=2, precision=10, basetype=FIXED_LEN_BYTE_ARRAY, length=12"` - Decimal5 string `parquet:"name=decimal5, type=DECIMAL, scale=2, precision=20, basetype=BYTE_ARRAY"` - Decimal6 int32 `parquet:"name=decimal6, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"` + Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"` + Decimal6 int32 `parquet:"name=decimal6, type=DECIMAL, scale=4, precision=4, basetype=INT32"` } dir := c.MkDir() @@ -118,15 +116,13 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { v := &Test{ Date: 18564, // 2020-10-29 TimeMillis: 62775123, // 17:26:15.123 (note all time are in UTC+8!) - TimeMicros: 62775123000, // 17:26:15.123 - TimestampMillis: 1603963672356, // 2020-10-29T17:27:52.356 - TimestampMicros: 1603963672356956, // 2020-10-29T17:27:52.356956 + TimeMicros: 62775123456, // 17:26:15.123 + TimestampMillis: 1603963672356, // 2020-10-29T09:27:52.356Z + TimestampMicros: 1603963672356956, // 2020-10-29T09:27:52.356956Z Decimal1: -12345678, // -123456.78 Decimal2: 456, // 0.0456 Decimal3: 123456789012345678, // 1234567890123456.78 - Decimal4: "-12345678.09", - Decimal5: "-1234567890123456.78", - Decimal6: -1, // -0.0001 + Decimal6: -1, // -0.0001 } c.Assert(writer.Write(v), IsNil) c.Assert(writer.WriteStop(), IsNil) @@ -140,22 +136,19 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { c.Assert(err, IsNil) defer reader.Close() - c.Assert(len(reader.columns), Equals, 11) + c.Assert(len(reader.columns), Equals, 9) c.Assert(reader.ReadRow(), IsNil) - c.Assert(reader.lastRow.Row, DeepEquals, []types.Datum{ - types.NewCollationStringDatum("2020-10-29", "", 0), - types.NewCollationStringDatum("17:26:15.123", "", 0), - types.NewCollationStringDatum("17:26:15.123", "", 0), - types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0), - types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0), - types.NewCollationStringDatum("-123456.78", "", 0), - types.NewCollationStringDatum("0.0456", "", 0), - types.NewCollationStringDatum("1234567890123456.78", "", 0), - types.NewCollationStringDatum("-12345678.09", "", 0), - types.NewCollationStringDatum("-1234567890123456.78", "", 0), - types.NewCollationStringDatum("-0.0001", "", 0), - }) + rowValue := []string{ + "2020-10-29", "17:26:15.123Z", "17:26:15.123456Z", "2020-10-29 09:27:52.356Z", "2020-10-29 09:27:52.356956Z", + "-123456.78", "0.0456", "1234567890123456.78", "-0.0001", + } + row := reader.lastRow.Row + c.Assert(len(rowValue), Equals, len(row)) + for i := 0; i < len(row); i++ { + c.Assert(row[i].Kind(), Equals, types.KindString) + c.Assert(rowValue[i], Equals, row[i].GetString()) + } type TestDecimal struct { Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=3, precision=5, basetype=INT32"` @@ -215,3 +208,60 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { } } } + +func (s testParquetParserSuite) TestParquetAurora(c *C) { + store, err := storage.NewLocalStorage("examples") + c.Assert(err, IsNil) + + fileName := "test.parquet" + r, err := store.Open(context.TODO(), fileName) + c.Assert(err, IsNil) + parser, err := NewParquetParser(context.TODO(), store, r, fileName) + c.Assert(err, IsNil) + + c.Assert(parser.Columns(), DeepEquals, []string{"id", "val1", "val2", "d1", "d2", "d3", "d4", "d5", "d6"}) + + expectedRes := [][]interface{}{ + {int64(1), int64(1), "0", int64(123), "1.23", "0.00000001", "1234567890", "123", "1.23000000"}, + { + int64(2), int64(123456), "0", int64(123456), "9999.99", "0.12345678", "99999999999999999999", + "999999999999999999999999999999999999", "99999999999999999999.99999999", + }, + { + int64(3), int64(123456), "0", int64(-123456), "-9999.99", "-0.12340000", "-99999999999999999999", + "-999999999999999999999999999999999999", "-99999999999999999999.99999999", + }, + { + int64(4), int64(1), "0", int64(123), "1.23", "0.00000001", "1234567890", "123", "1.23000000", + }, + { + int64(5), int64(123456), "0", int64(123456), "9999.99", "0.12345678", "12345678901234567890", + "123456789012345678901234567890123456", "99999999999999999999.99999999", + }, + { + int64(6), int64(123456), "0", int64(-123456), "-9999.99", "-0.12340000", + "-12345678901234567890", "-123456789012345678901234567890123456", + "-99999999999999999999.99999999", + }, + } + + for i := 0; i < len(expectedRes); i++ { + err = parser.ReadRow() + c.Assert(err, IsNil) + expectedValues := expectedRes[i] + row := parser.LastRow().Row + c.Assert(len(expectedValues), Equals, len(row)) + for j := 0; j < len(row); j++ { + switch v := expectedValues[j].(type) { + case int64: + c.Assert(v, Equals, row[j].GetInt64()) + case string: + c.Assert(v, Equals, row[j].GetString()) + default: + c.Error("unexpected value: ", expectedValues[j]) + } + } + } + + c.Assert(parser.ReadRow(), Equals, io.EOF) +}