From a2204cf18a34cf2e280fe2668b31f9e5938af748 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 23 Jun 2021 17:48:40 +0800 Subject: [PATCH 1/5] fix decimal parquet parser --- pkg/lightning/mydump/examples/test.parquet | Bin 0 -> 2686 bytes pkg/lightning/mydump/parquet_parser.go | 210 +++++++++++++++++--- pkg/lightning/mydump/parquet_parser_test.go | 94 ++++++--- 3 files changed, 258 insertions(+), 46 deletions(-) create mode 100644 pkg/lightning/mydump/examples/test.parquet diff --git a/pkg/lightning/mydump/examples/test.parquet b/pkg/lightning/mydump/examples/test.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3e5c4897897fc0530e9e9368c6dddd7100bb879f GIT binary patch literal 2686 zcmbtWZ%k8H6u<9LXkl-a81LKb(q&B-nxduY-?u^tF!y0*5J#59ZCRAQ_O;m2@~16| zG6w9AlDS1(Nb~~;esIa6iA2m|2#X41qS*jVT!tZWn#@hM4|Q&0+@w4AwXBb|7`io0 zdha>s_dCCH?>#52ec@FrMmUIHGhqf6d03_8RCy@z;Z#&|#Uf({D>$D$ph|?~cs*{n z`-^(93mJ^xGRNN>a%}4Jzke0ISKG_*5Q7z1hg~)BqgVk0!2Xv$c0Y|yzfuOD&<+)JA59z%+K<69`=o z{Ww?-PLRvtsv6L|qQDb=x4XIRtEuLy`!_mK+u4o*h6E$L0hgzW)VOua>KF^E$|N;mg>6&gB)|?pXEwM9h8Q zUMH&DIQ#BuxwZ;pkn4vtFRmdM!fm*hN*8@P|6!l&*wy|RBJUyX zG7vQrs5$(`$ia(u4EN&kc*94NEu)NR+5d@oZP}hnuTyfUBtW77ZUJ# zc+*8RM+d~jLa|E-S`k)5f6^S;*oq23q!F+rOtebd!sRXTDloIWwZpa$gExVvMaME) z4U>W3oswY-iQpsyFF{<0Wi(n%zd@zN3XM~R@-##fRp{3D+W)KB6 z8@{6wlnS7fs*^~+rr3Qn_<%Ki$dFN(ZbcdsnQXuc)Z_=q=X11JozyFxmPG!LV#g@( zmCHtpY`N<6k;^`ktA*=ws)%ejc8Rx#V!x*hM{>*mHA*rFi{;7THW@sYC&6XYAp?90 z?je=kE`v|0B)F{MGr)%mQpW<1K!VRw?9Y^8sDQvT_q)9O9!AdKlo-Zo_B3tiVF<%o znpe&@s+RNd5?>e1&Qpd_HQ{?))5&<)SvAYRL*TfkaJ^Y*7NCpFQbU<{`NEMvQzMsa zGds*Y;)>HEA}v8P^j=G=FKP}q?KX#eQ9%@<0(V={u8;0%_UX&?k!ZNp8`Yci{(vti zM)YNGwd)&&5OD=WFek%;O^x-iYitb$1y2yX0P~sj%Z&Q6_KptIKS 127 + if negative { + for i := 0; i < len(rawBytes); i++ { + rawBytes[i] = ^rawBytes[i] + } + for i := len(rawBytes) - 1; i >= 0; i-- { + rawBytes[i] += 1 + if rawBytes[i] != 0 { + break + } + } + } + + intValue := big.NewInt(0) + intValue = intValue.SetBytes(rawBytes) + val := fmt.Sprintf("%0*d", scale, intValue) + dotIndex := len(val) - scale + var res strings.Builder + if negative { + res.WriteByte('-') + } + if dotIndex == 0 { + res.WriteByte('0') + } else { + res.Write([]byte(val[:dotIndex])) } + if scale > 0 { + res.WriteByte('.') + res.Write([]byte(val[dotIndex:])) + } + return res.String() } // when the value type is int32/int64, convert to value to target logical type in tidb -func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) { - if meta.ConvertedType == nil { +func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error { + if meta.ConvertedType == nil && meta.LogicalType == nil { d.SetInt64(v) - return + return nil } - switch *meta.ConvertedType { - // decimal - case parquet.ConvertedType_DECIMAL: - minLen := *meta.Scale + 1 + + logicalType := meta.LogicalType + switch { + case logicalType.DECIMAL != nil: + if logicalType.DECIMAL.Scale == 0 { + d.SetInt64(v) + return nil + } + minLen := logicalType.DECIMAL.Scale + 1 if v < 0 { minLen++ } val := fmt.Sprintf("%0*d", minLen, v) dotIndex := len(val) - int(*meta.Scale) d.SetString(val[:dotIndex]+"."+val[dotIndex:], "") - case parquet.ConvertedType_DATE: + case logicalType.DATE != nil: dateStr := time.Unix(v*86400, 0).Format("2006-01-02") d.SetString(dateStr, "") - // convert all timestamp types (datetime/timestamp) to string - case parquet.ConvertedType_TIMESTAMP_MICROS: - dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999") - d.SetString(dateStr, "") - case parquet.ConvertedType_TIMESTAMP_MILLIS: - dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999") + case logicalType.TIMESTAMP != nil: + // convert all timestamp types (datetime/timestamp) to string + var sec, nsec int64 + units := logicalType.TIMESTAMP.Unit + if units.MICROS != nil { + sec = v / 1e6 + nsec = (v % 1e6) * 1e3 + } else if units.MILLIS != nil { + sec = v / 1e3 + nsec = (v % 1e3) * 1e6 + } else { + // nano + sec = v / 1e9 + nsec = v % 1e9 + } + // TODO: how to deal with TimeZone + dateStr := time.Unix(sec, nsec).Format("2006-01-02 15:04:05.999") d.SetString(dateStr, "") - // covert time types to string - case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS: - if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS { + case logicalType.TIME != nil: + units := logicalType.TIME.Unit + if units.NANOS != nil { + v /= 1e6 + } else if units.MICROS != nil { v /= 1e3 } millis := v % 1e3 @@ -345,6 +508,7 @@ func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) { default: d.SetInt64(v) } + return nil } func (pp *ParquetParser) LastRow() Row { diff --git a/pkg/lightning/mydump/parquet_parser_test.go b/pkg/lightning/mydump/parquet_parser_test.go index d86136a65..f5d418481 100644 --- a/pkg/lightning/mydump/parquet_parser_test.go +++ b/pkg/lightning/mydump/parquet_parser_test.go @@ -97,12 +97,10 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { TimestampMillis int64 `parquet:"name=timestampmillis, type=TIMESTAMP_MILLIS"` TimestampMicros int64 `parquet:"name=timestampmicros, type=TIMESTAMP_MICROS"` - Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"` - Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"` - Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"` - Decimal4 string `parquet:"name=decimal4, type=DECIMAL, scale=2, precision=10, basetype=FIXED_LEN_BYTE_ARRAY, length=12"` - Decimal5 string `parquet:"name=decimal5, type=DECIMAL, scale=2, precision=20, basetype=BYTE_ARRAY"` - Decimal6 int32 `parquet:"name=decimal6, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"` + Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"` + Decimal6 int32 `parquet:"name=decimal6, type=DECIMAL, scale=4, precision=4, basetype=INT32"` } dir := c.MkDir() @@ -124,9 +122,7 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { Decimal1: -12345678, // -123456.78 Decimal2: 456, // 0.0456 Decimal3: 123456789012345678, // 1234567890123456.78 - Decimal4: "-12345678.09", - Decimal5: "-1234567890123456.78", - Decimal6: -1, // -0.0001 + Decimal6: -1, // -0.0001 } c.Assert(writer.Write(v), IsNil) c.Assert(writer.WriteStop(), IsNil) @@ -140,22 +136,19 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { c.Assert(err, IsNil) defer reader.Close() - c.Assert(len(reader.columns), Equals, 11) + c.Assert(len(reader.columns), Equals, 9) c.Assert(reader.ReadRow(), IsNil) - c.Assert(reader.lastRow.Row, DeepEquals, []types.Datum{ - types.NewCollationStringDatum("2020-10-29", "", 0), - types.NewCollationStringDatum("17:26:15.123", "", 0), - types.NewCollationStringDatum("17:26:15.123", "", 0), - types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0), - types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0), - types.NewCollationStringDatum("-123456.78", "", 0), - types.NewCollationStringDatum("0.0456", "", 0), - types.NewCollationStringDatum("1234567890123456.78", "", 0), - types.NewCollationStringDatum("-12345678.09", "", 0), - types.NewCollationStringDatum("-1234567890123456.78", "", 0), - types.NewCollationStringDatum("-0.0001", "", 0), - }) + rowValue := []string{ + "2020-10-29", "17:26:15.123", "17:26:15.123", "2020-10-29 17:27:52.356", "2020-10-29 17:27:52.356", + "-123456.78", "0.0456", "1234567890123456.78", "-0.0001", + } + row := reader.lastRow.Row + c.Assert(len(rowValue), Equals, len(row)) + for i := 0; i < len(row); i++ { + c.Assert(row[i].Kind(), Equals, types.KindString) + c.Assert(rowValue[i], Equals, row[i].GetString()) + } type TestDecimal struct { Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=3, precision=5, basetype=INT32"` @@ -215,3 +208,58 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { } } } + +func (s testParquetParserSuite) TestParquetAurora(c *C) { + store, err := storage.NewLocalStorage("examples") + c.Assert(err, IsNil) + + fileName := "test.parquet" + r, err := store.Open(context.TODO(), fileName) + c.Assert(err, IsNil) + parser, err := NewParquetParser(context.TODO(), store, r, fileName) + c.Assert(err, IsNil) + + c.Assert(parser.Columns(), DeepEquals, []string{"id", "val1", "val2", "d1", "d2", "d3", "d4", "d5", "d6"}) + + expectedRes := [][]interface{}{ + {int64(1), int64(1), "0", int64(123), "1.23", "0.00000001", "1234567890", "123", "1.23000000"}, + { + int64(2), int64(123456), "0", int64(123456), "9999.99", "0.12345678", "99999999999999999999", + "999999999999999999999999999999999999", "99999999999999999999.99999999", + }, + { + int64(3), int64(123456), "0", int64(-123456), "-9999.99", "-0.12340000", "-99999999999999999999", + "-999999999999999999999999999999999999", "-99999999999999999999.99999999", + }, + { + int64(4), int64(1), "0", int64(123), "1.23", "0.00000001", "1234567890", "123", "1.23000000", + }, + { + int64(5), int64(123456), "0", int64(123456), "9999.99", "0.12345678", "12345678901234567890", + "123456789012345678901234567890123456", "99999999999999999999.99999999", + }, + { + int64(6), int64(123456), "0", int64(-123456), "-9999.99", "-0.12340000", + "-12345678901234567890", "-123456789012345678901234567890123456", + "-99999999999999999999.99999999", + }, + } + + for i := 0; i < len(expectedRes); i++ { + err = parser.ReadRow() + c.Assert(err, IsNil) + expectedValues := expectedRes[i] + row := parser.LastRow().Row + c.Assert(len(expectedValues), Equals, len(row)) + for j := 0; j < len(row); j++ { + switch v := expectedValues[j].(type) { + case int64: + c.Assert(v, Equals, row[j].GetInt64()) + case string: + c.Assert(v, Equals, row[j].GetString()) + } + } + } + + c.Assert(parser.ReadRow(), Equals, io.EOF) +} From 205d0b1dbc8dc3521324614ff2a14872dcf51a11 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 23 Jun 2021 23:23:22 +0800 Subject: [PATCH 2/5] resolve comments --- pkg/lightning/mydump/parquet_parser.go | 12 ++++++++---- pkg/lightning/mydump/parquet_parser_test.go | 2 ++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/lightning/mydump/parquet_parser.go b/pkg/lightning/mydump/parquet_parser.go index 740cbb5c9..e69a95221 100644 --- a/pkg/lightning/mydump/parquet_parser.go +++ b/pkg/lightning/mydump/parquet_parser.go @@ -229,24 +229,28 @@ func convertToLogicType(se *parquet.SchemaElement) error { logicalType.DATE = &parquet.DateType{} case parquet.ConvertedType_TIME_MILLIS: logicalType.TIME = &parquet.TimeType{ + IsAdjustedToUTC: true, Unit: &parquet.TimeUnit{ MILLIS: parquet.NewMilliSeconds(), }, } case parquet.ConvertedType_TIME_MICROS: logicalType.TIME = &parquet.TimeType{ + IsAdjustedToUTC: true, Unit: &parquet.TimeUnit{ MICROS: parquet.NewMicroSeconds(), }, } case parquet.ConvertedType_TIMESTAMP_MILLIS: logicalType.TIMESTAMP = &parquet.TimestampType{ + IsAdjustedToUTC: true, Unit: &parquet.TimeUnit{ MILLIS: parquet.NewMilliSeconds(), }, } case parquet.ConvertedType_TIMESTAMP_MICROS: logicalType.TIMESTAMP = &parquet.TimestampType{ + IsAdjustedToUTC: true, Unit: &parquet.TimeUnit{ MICROS: parquet.NewMicroSeconds(), }, @@ -401,7 +405,7 @@ func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) return setDatumValue(d, v.Elem(), meta) } default: - log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()), + log.L().Error("unknown value", zap.Stringer("kind", v.Kind()), zap.String("type", v.Type().Name()), zap.Reflect("value", v.Interface())) return errors.Errorf("unknown value: %v", v) } @@ -440,11 +444,11 @@ func binaryToDecimalStr(rawBytes []byte, scale int) string { if dotIndex == 0 { res.WriteByte('0') } else { - res.Write([]byte(val[:dotIndex])) + res.WriteString(val[:dotIndex]) } if scale > 0 { res.WriteByte('.') - res.Write([]byte(val[dotIndex:])) + res.WriteString(val[dotIndex:]) } return res.String() } @@ -488,7 +492,7 @@ func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error { sec = v / 1e9 nsec = v % 1e9 } - // TODO: how to deal with TimeZone + // TODO: how to deal with TimeZone if `IsAdjustedToUTC = false` dateStr := time.Unix(sec, nsec).Format("2006-01-02 15:04:05.999") d.SetString(dateStr, "") case logicalType.TIME != nil: diff --git a/pkg/lightning/mydump/parquet_parser_test.go b/pkg/lightning/mydump/parquet_parser_test.go index f5d418481..37082b1ef 100644 --- a/pkg/lightning/mydump/parquet_parser_test.go +++ b/pkg/lightning/mydump/parquet_parser_test.go @@ -257,6 +257,8 @@ func (s testParquetParserSuite) TestParquetAurora(c *C) { c.Assert(v, Equals, row[j].GetInt64()) case string: c.Assert(v, Equals, row[j].GetString()) + default: + c.Error("unexpected value: ", expectedValues[j]) } } } From 64e694c49f7111e1a85acc78acb5dc217e11fbe0 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 24 Jun 2021 10:20:23 +0800 Subject: [PATCH 3/5] fix utc --- pkg/lightning/mydump/parquet_parser.go | 56 ++++++++++----------- pkg/lightning/mydump/parquet_parser_test.go | 2 +- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/pkg/lightning/mydump/parquet_parser.go b/pkg/lightning/mydump/parquet_parser.go index e69a95221..10a36d2e3 100644 --- a/pkg/lightning/mydump/parquet_parser.go +++ b/pkg/lightning/mydump/parquet_parser.go @@ -479,42 +479,40 @@ func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error { d.SetString(dateStr, "") case logicalType.TIMESTAMP != nil: // convert all timestamp types (datetime/timestamp) to string - var sec, nsec int64 - units := logicalType.TIMESTAMP.Unit - if units.MICROS != nil { - sec = v / 1e6 - nsec = (v % 1e6) * 1e3 - } else if units.MILLIS != nil { - sec = v / 1e3 - nsec = (v % 1e3) * 1e6 - } else { - // nano - sec = v / 1e9 - nsec = v % 1e9 - } - // TODO: how to deal with TimeZone if `IsAdjustedToUTC = false` - dateStr := time.Unix(sec, nsec).Format("2006-01-02 15:04:05.999") - d.SetString(dateStr, "") + timeStr := formatTime(v, logicalType.TIMESTAMP.Unit, "2006-01-02 15:04:05.999999", + "2006-01-02 15:04:05.999999Z", logicalType.TIMESTAMP.IsAdjustedToUTC) + d.SetString(timeStr, "") case logicalType.TIME != nil: - units := logicalType.TIME.Unit - if units.NANOS != nil { - v /= 1e6 - } else if units.MICROS != nil { - v /= 1e3 - } - millis := v % 1e3 - v /= 1e3 - sec := v % 60 - v /= 60 - min := v % 60 - v /= 60 - d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), "") + // convert all timestamp types (datetime/timestamp) to string + timeStr := formatTime(v, logicalType.TIME.Unit, "15:04:05.999999", "15:04:05.999999Z", + logicalType.TIME.IsAdjustedToUTC) + d.SetString(timeStr, "") default: d.SetInt64(v) } return nil } +func formatTime(v int64, units *parquet.TimeUnit, format, utcFormat string, utc bool) string { + var sec, nsec int64 + if units.MICROS != nil { + sec = v / 1e6 + nsec = (v % 1e6) * 1e3 + } else if units.MILLIS != nil { + sec = v / 1e3 + nsec = (v % 1e3) * 1e6 + } else { + // nano + sec = v / 1e9 + nsec = v % 1e9 + } + t := time.Unix(sec, nsec) + if utc { + return t.UTC().Format(utcFormat) + } + return t.Format(format) +} + func (pp *ParquetParser) LastRow() Row { return pp.lastRow } diff --git a/pkg/lightning/mydump/parquet_parser_test.go b/pkg/lightning/mydump/parquet_parser_test.go index 37082b1ef..f12eaeecb 100644 --- a/pkg/lightning/mydump/parquet_parser_test.go +++ b/pkg/lightning/mydump/parquet_parser_test.go @@ -140,7 +140,7 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { c.Assert(reader.ReadRow(), IsNil) rowValue := []string{ - "2020-10-29", "17:26:15.123", "17:26:15.123", "2020-10-29 17:27:52.356", "2020-10-29 17:27:52.356", + "2020-10-29", "17:26:15.123", "17:26:15.123", "2020-10-29 17:27:52.356000Z", "2020-10-29 17:27:52.356956Z", "-123456.78", "0.0456", "1234567890123456.78", "-0.0001", } row := reader.lastRow.Row From 2b55338e024b229c5f1584317bf5c91c9db006ff Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 24 Jun 2021 10:39:48 +0800 Subject: [PATCH 4/5] fix test --- pkg/lightning/mydump/parquet_parser_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/lightning/mydump/parquet_parser_test.go b/pkg/lightning/mydump/parquet_parser_test.go index f12eaeecb..b6f280b9c 100644 --- a/pkg/lightning/mydump/parquet_parser_test.go +++ b/pkg/lightning/mydump/parquet_parser_test.go @@ -116,9 +116,9 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { v := &Test{ Date: 18564, // 2020-10-29 TimeMillis: 62775123, // 17:26:15.123 (note all time are in UTC+8!) - TimeMicros: 62775123000, // 17:26:15.123 - TimestampMillis: 1603963672356, // 2020-10-29T17:27:52.356 - TimestampMicros: 1603963672356956, // 2020-10-29T17:27:52.356956 + TimeMicros: 62775123456, // 17:26:15.123 + TimestampMillis: 1603963672356, // 2020-10-29T09:27:52.356Z + TimestampMicros: 1603963672356956, // 2020-10-29T09:27:52.356956Z Decimal1: -12345678, // -123456.78 Decimal2: 456, // 0.0456 Decimal3: 123456789012345678, // 1234567890123456.78 @@ -140,7 +140,7 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { c.Assert(reader.ReadRow(), IsNil) rowValue := []string{ - "2020-10-29", "17:26:15.123", "17:26:15.123", "2020-10-29 17:27:52.356000Z", "2020-10-29 17:27:52.356956Z", + "2020-10-29", "17:26:15.123Z", "17:26:15.123456Z", "2020-10-29 09:27:52.356Z", "2020-10-29 09:27:52.356956Z", "-123456.78", "0.0456", "1234567890123456.78", "-0.0001", } row := reader.lastRow.Row From f871acc8c02f6ceeae1c21f07a95b9c1a93d47f0 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 24 Jun 2021 11:48:20 +0800 Subject: [PATCH 5/5] fix time when IsAdjustedToUTC is false --- pkg/lightning/mydump/parquet_parser.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/lightning/mydump/parquet_parser.go b/pkg/lightning/mydump/parquet_parser.go index 10a36d2e3..303ea42ca 100644 --- a/pkg/lightning/mydump/parquet_parser.go +++ b/pkg/lightning/mydump/parquet_parser.go @@ -506,9 +506,9 @@ func formatTime(v int64, units *parquet.TimeUnit, format, utcFormat string, utc sec = v / 1e9 nsec = v % 1e9 } - t := time.Unix(sec, nsec) + t := time.Unix(sec, nsec).UTC() if utc { - return t.UTC().Format(utcFormat) + return t.Format(utcFormat) } return t.Format(format) }