Skip to content

Commit

Permalink
chore(): added backward type casting
Browse files Browse the repository at this point in the history
  • Loading branch information
le-vlad committed Nov 20, 2023
1 parent 29e3f3e commit 20c7b68
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 3 deletions.
123 changes: 123 additions & 0 deletions internal/message/arrow_type_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/charmbracelet/log"
cqtypes "github.com/cloudquery/plugin-sdk/v4/types"
"reflect"
"strconv"
)

func inferArrowType(value interface{}) arrow.DataType {
Expand Down Expand Up @@ -66,3 +68,124 @@ func getValue(column arrow.Array, rowIndex int) interface{} {
return nil
}
}

func ArrowToPg10(t arrow.DataType) string {
switch dt := t.(type) {
case *arrow.BooleanType:
return "boolean"
case *arrow.Int8Type:
return "smallint"
case *arrow.Int16Type:
return "smallint"
case *arrow.Int32Type:
return "int"
case *arrow.Int64Type:
return "bigint"
case *arrow.Uint8Type:
return "smallint"
case *arrow.Uint16Type:
return "int"
case *arrow.Uint32Type:
return "bigint"
case *arrow.Uint64Type:
return "numeric(20,0)"
case *arrow.Float32Type:
return "real"
case *arrow.Float64Type:
return "double precision"
case arrow.DecimalType:
return "numeric(" + strconv.Itoa(int(dt.GetPrecision())) + "," + strconv.Itoa(int(dt.GetScale())) + ")"
case *arrow.StringType:
return "text"
case *arrow.BinaryType:
return "bytea"
case *arrow.LargeBinaryType:
return "bytea"
case *arrow.TimestampType:
return "timestamp without time zone"
case *arrow.Time32Type, *arrow.Time64Type:
return "time without time zone"
case *arrow.Date32Type, *arrow.Date64Type:
return "date"
case *cqtypes.UUIDType:
return "uuid"
case *cqtypes.JSONType:
return "jsonb"
case *cqtypes.MACType:
return "macaddr"
case *cqtypes.InetType:
return "inet"
case *arrow.ListType:
return ArrowToPg10(dt.Elem()) + "[]"
case *arrow.FixedSizeListType:
return ArrowToPg10(dt.Elem()) + "[]"
case *arrow.LargeListType:
return ArrowToPg10(dt.Elem()) + "[]"
case *arrow.MapType:
return "text"
default:
return "text"
}
}

// ArrowToCockroach converts arrow data type to cockroach data type. CockroachDB lacks support for
// some data types like macaddr and has different aliases for ints.
// See: https://www.cockroachlabs.com/docs/stable/int.html
func ArrowToCockroach(t arrow.DataType) string {
switch dt := t.(type) {
case *arrow.BooleanType:
return "boolean"
case *arrow.Int8Type:
return "int2"
case *arrow.Int16Type:
return "int2"
case *arrow.Int32Type:
return "int8"
case *arrow.Int64Type:
return "int8"
case *arrow.Uint8Type:
return "int2"
case *arrow.Uint16Type:
return "int8"
case *arrow.Uint32Type:
return "int8"
case *arrow.Uint64Type:
return "numeric(20,0)"
case *arrow.Float32Type:
return "real"
case *arrow.Float64Type:
return "double precision"
case arrow.DecimalType:
return "numeric(" + strconv.Itoa(int(dt.GetPrecision())) + "," + strconv.Itoa(int(dt.GetScale())) + ")"
case *arrow.StringType:
return "text"
case *arrow.BinaryType:
return "bytea"
case *arrow.LargeBinaryType:
return "bytea"
case *arrow.TimestampType:
return "timestamp without time zone"
case *arrow.Time32Type, *arrow.Time64Type:
return "time without time zone"
case *arrow.Date32Type, *arrow.Date64Type:
return "date"
case *cqtypes.UUIDType:
return "uuid"
case *cqtypes.JSONType:
return "jsonb"
case *cqtypes.MACType:
return "text"
case *cqtypes.InetType:
return "inet"
case *arrow.ListType:
return ArrowToCockroach(dt.Elem()) + "[]"
case *arrow.FixedSizeListType:
return ArrowToCockroach(dt.Elem()) + "[]"
case *arrow.LargeListType:
return ArrowToCockroach(dt.Elem()) + "[]"
case *arrow.MapType:
return "text"
default:
return "text"
}
}
2 changes: 1 addition & 1 deletion internal/processors/openai/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ func (p *Plugin) Process(context context.Context, msg message.Message) (message.

// EvolveSchema will add a string column to the schema in order to store OpenAI response
func (p *Plugin) EvolveSchema(streamSchema *schema.StreamSchemaObj) error {
streamSchema.AddField(p.config.StreamName, p.config.TargetField, arrow.BinaryTypes.String)
streamSchema.AddField(p.config.StreamName, p.config.TargetField, arrow.BinaryTypes.String, message.ArrowToPg10(arrow.BinaryTypes.String))
return nil
}
4 changes: 2 additions & 2 deletions internal/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *StreamSchemaObj) GetLatestSchema() []StreamSchema {
return s.streamSchemaVersions[s.lastVersion]
}

func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.DataType) {
func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.DataType, driverType string) {
streamSchema := s.streamSchemaVersions[s.lastVersion]
var streamSchemaCopy = make([]StreamSchema, len(streamSchema))
copy(streamSchemaCopy, streamSchema)
Expand All @@ -44,7 +44,7 @@ func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.Data
arrowColumn := Column{
Name: name,
DatabrewType: fieldType.String(),
NativeConnectorType: "String",
NativeConnectorType: driverType,
PK: false,
Nullable: true,
}
Expand Down

0 comments on commit 20c7b68

Please sign in to comment.