From 20c7b68603fd475977f4484d40dd113ed3b1c77f Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Mon, 20 Nov 2023 21:54:22 +0100 Subject: [PATCH] chore(): added backward type casting --- internal/message/arrow_type_helper.go | 123 ++++++++++++++++++++++++++ internal/processors/openai/plugin.go | 2 +- internal/schema/schema.go | 4 +- 3 files changed, 126 insertions(+), 3 deletions(-) diff --git a/internal/message/arrow_type_helper.go b/internal/message/arrow_type_helper.go index 2e32c70..7e021a1 100644 --- a/internal/message/arrow_type_helper.go +++ b/internal/message/arrow_type_helper.go @@ -4,7 +4,9 @@ import ( "github.com/apache/arrow/go/v14/arrow" "github.com/apache/arrow/go/v14/arrow/array" "github.com/charmbracelet/log" + cqtypes "github.com/cloudquery/plugin-sdk/v4/types" "reflect" + "strconv" ) func inferArrowType(value interface{}) arrow.DataType { @@ -66,3 +68,124 @@ func getValue(column arrow.Array, rowIndex int) interface{} { return nil } } + +func ArrowToPg10(t arrow.DataType) string { + switch dt := t.(type) { + case *arrow.BooleanType: + return "boolean" + case *arrow.Int8Type: + return "smallint" + case *arrow.Int16Type: + return "smallint" + case *arrow.Int32Type: + return "int" + case *arrow.Int64Type: + return "bigint" + case *arrow.Uint8Type: + return "smallint" + case *arrow.Uint16Type: + return "int" + case *arrow.Uint32Type: + return "bigint" + case *arrow.Uint64Type: + return "numeric(20,0)" + case *arrow.Float32Type: + return "real" + case *arrow.Float64Type: + return "double precision" + case arrow.DecimalType: + return "numeric(" + strconv.Itoa(int(dt.GetPrecision())) + "," + strconv.Itoa(int(dt.GetScale())) + ")" + case *arrow.StringType: + return "text" + case *arrow.BinaryType: + return "bytea" + case *arrow.LargeBinaryType: + return "bytea" + case *arrow.TimestampType: + return "timestamp without time zone" + case *arrow.Time32Type, *arrow.Time64Type: + return "time without time zone" + case *arrow.Date32Type, *arrow.Date64Type: + return "date" + case *cqtypes.UUIDType: + return "uuid" + case *cqtypes.JSONType: + return "jsonb" + case *cqtypes.MACType: + return "macaddr" + case *cqtypes.InetType: + return "inet" + case *arrow.ListType: + return ArrowToPg10(dt.Elem()) + "[]" + case *arrow.FixedSizeListType: + return ArrowToPg10(dt.Elem()) + "[]" + case *arrow.LargeListType: + return ArrowToPg10(dt.Elem()) + "[]" + case *arrow.MapType: + return "text" + default: + return "text" + } +} + +// ArrowToCockroach converts arrow data type to cockroach data type. CockroachDB lacks support for +// some data types like macaddr and has different aliases for ints. +// See: https://www.cockroachlabs.com/docs/stable/int.html +func ArrowToCockroach(t arrow.DataType) string { + switch dt := t.(type) { + case *arrow.BooleanType: + return "boolean" + case *arrow.Int8Type: + return "int2" + case *arrow.Int16Type: + return "int2" + case *arrow.Int32Type: + return "int8" + case *arrow.Int64Type: + return "int8" + case *arrow.Uint8Type: + return "int2" + case *arrow.Uint16Type: + return "int8" + case *arrow.Uint32Type: + return "int8" + case *arrow.Uint64Type: + return "numeric(20,0)" + case *arrow.Float32Type: + return "real" + case *arrow.Float64Type: + return "double precision" + case arrow.DecimalType: + return "numeric(" + strconv.Itoa(int(dt.GetPrecision())) + "," + strconv.Itoa(int(dt.GetScale())) + ")" + case *arrow.StringType: + return "text" + case *arrow.BinaryType: + return "bytea" + case *arrow.LargeBinaryType: + return "bytea" + case *arrow.TimestampType: + return "timestamp without time zone" + case *arrow.Time32Type, *arrow.Time64Type: + return "time without time zone" + case *arrow.Date32Type, *arrow.Date64Type: + return "date" + case *cqtypes.UUIDType: + return "uuid" + case *cqtypes.JSONType: + return "jsonb" + case *cqtypes.MACType: + return "text" + case *cqtypes.InetType: + return "inet" + case *arrow.ListType: + return ArrowToCockroach(dt.Elem()) + "[]" + case *arrow.FixedSizeListType: + return ArrowToCockroach(dt.Elem()) + "[]" + case *arrow.LargeListType: + return ArrowToCockroach(dt.Elem()) + "[]" + case *arrow.MapType: + return "text" + default: + return "text" + } +} diff --git a/internal/processors/openai/plugin.go b/internal/processors/openai/plugin.go index b763c1b..d070e9e 100644 --- a/internal/processors/openai/plugin.go +++ b/internal/processors/openai/plugin.go @@ -30,6 +30,6 @@ func (p *Plugin) Process(context context.Context, msg message.Message) (message. // EvolveSchema will add a string column to the schema in order to store OpenAI response func (p *Plugin) EvolveSchema(streamSchema *schema.StreamSchemaObj) error { - streamSchema.AddField(p.config.StreamName, p.config.TargetField, arrow.BinaryTypes.String) + streamSchema.AddField(p.config.StreamName, p.config.TargetField, arrow.BinaryTypes.String, message.ArrowToPg10(arrow.BinaryTypes.String)) return nil } diff --git a/internal/schema/schema.go b/internal/schema/schema.go index 9399cff..1da3038 100644 --- a/internal/schema/schema.go +++ b/internal/schema/schema.go @@ -35,7 +35,7 @@ func (s *StreamSchemaObj) GetLatestSchema() []StreamSchema { return s.streamSchemaVersions[s.lastVersion] } -func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.DataType) { +func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.DataType, driverType string) { streamSchema := s.streamSchemaVersions[s.lastVersion] var streamSchemaCopy = make([]StreamSchema, len(streamSchema)) copy(streamSchemaCopy, streamSchema) @@ -44,7 +44,7 @@ func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.Data arrowColumn := Column{ Name: name, DatabrewType: fieldType.String(), - NativeConnectorType: "String", + NativeConnectorType: driverType, PK: false, Nullable: true, }