Skip to content

Commit

Permalink
fix: generate init record before importing in collection (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovaistariq committed May 14, 2023
1 parent 921b424 commit 644a825
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 47 deletions.
53 changes: 22 additions & 31 deletions cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,61 +79,52 @@ func evolveSchema(ctx context.Context, db string, coll string, docs []json.RawMe
return util.Error(err, "create or update collection")
}

func fixNumbers(ctx context.Context, coll string, docs []json.RawMessage) {
err := schema.Infer(&sch, coll, docs[0:1], PrimaryKey, AutoGenerate, 1)
util.Fatal(err, "infer schema")

if schema.HasArrayOfObjects {
b, err := json.Marshal(schema.DummyRecord)
util.Fatal(err, "marshal number fixer record")

err = client.Transact(ctx, config.GetProjectName(), func(ctx context.Context, tx driver.Tx) error {
_, err = tx.Insert(ctx, coll, []driver.Document{b})
util.Fatal(err, "insert")

_, err = tx.Delete(ctx, coll, driver.Filter("{}"))
util.Fatal(err, "delete")
func writeInitRecord(ctx context.Context, coll string, docs []json.RawMessage) {
initDoc, err := schema.GenerateInitDoc(&sch, docs[0])
log.Debug().Interface("initDoc", string(initDoc)).Msg("generating init record")

return nil
})
util.Fatal(err, "fix number transaction")
}
}
util.Fatal(err, "init record generation")

func guaranteeFloatsInFirstRecord(ctx context.Context, coll string, docs []json.RawMessage) {
if !FirstRecord {
return
}

cnt, err := client.GetDB().Count(ctx, coll, driver.Filter("{}"))

var ep *driver.Error

if err != nil {
var ep *driver.Error
if errors.As(err, &ep) && ep.Code == api.Code_NOT_FOUND {
log.Debug().Msg("collection doesn't exits, skip fixing numbers")
log.Debug().Msg("collection doesn't exits, skipping init record")
return
}

util.Fatal(err, "get count")
}

if cnt == 0 {
schema.DetectArrayOfObjects = true
schema.ReplaceNumber = true
if cnt != 0 {
log.Debug().Msg("collection is not empty, skipping init record")

fixNumbers(ctx, coll, docs)
FirstRecord = false

schema.DetectArrayOfObjects = false
schema.ReplaceNumber = false
return
}

err = client.Transact(ctx, config.GetProjectName(), func(ctx context.Context, tx driver.Tx) error {
_, err = tx.Insert(ctx, coll, []driver.Document{initDoc})
util.Fatal(err, "insert init record")

_, err = tx.Delete(ctx, coll, driver.Filter("{}"))
util.Fatal(err, "delete init record")

return nil
})
util.Fatal(err, "init record transaction")

FirstRecord = false
}

func insertWithInference(ctx context.Context, coll string, docs []json.RawMessage) error {
// FIXME: This is temporary fix, should moved to server ASAP
guaranteeFloatsInFirstRecord(ctx, coll, docs)
writeInitRecord(ctx, coll, docs)

ptr := unsafe.Pointer(&docs)

Expand Down
72 changes: 56 additions & 16 deletions schema/inference.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ var (
ErrExpectedNumber = fmt.Errorf("expected json.Number")
ErrUnsupportedType = fmt.Errorf("unsupported type")

HasArrayOfObjects bool
DummyRecord map[string]any
DetectArrayOfObjects bool
ReplaceNumber bool
HasArrayOfObjects bool
)

func newInompatibleSchemaError(name, oldType, oldFormat, newType, newFormat string) error {
Expand Down Expand Up @@ -257,11 +254,9 @@ func traverseArray(name string, existingField *schema.Field, newField *schema.Fi
newField.Items.Format = nf

if t == typeObject {
if DetectArrayOfObjects {
log.Debug().Msg("detected array of objects")
log.Debug().Msg("detected array of objects")

HasArrayOfObjects = true
}
HasArrayOfObjects = true

values, _ := reflect.ValueOf(v).Index(i).Interface().(map[string]any)
if err = traverseObject(name, newField.Items, newField.Items, values); err != nil {
Expand Down Expand Up @@ -349,10 +344,6 @@ func traverseFields(sch map[string]*schema.Field, fields map[string]any, autoGen
continue
}

if sch != nil && sch[name] != nil && sch[name].Type == typeNumber && ReplaceNumber {
fields[name] = 1.1
}

setAutoGenerate(autoGen, name, f)

sch[name] = f
Expand Down Expand Up @@ -391,10 +382,6 @@ func docToSchema(sch *schema.Schema, name string, data []byte, pk []string, auto
sch.PrimaryKey = []string{"id"}
}

if ReplaceNumber {
DummyRecord = m
}

return nil
}

Expand All @@ -410,3 +397,56 @@ func Infer(sch *schema.Schema, name string, docs []json.RawMessage, primaryKey [

return nil
}

func GenerateInitDoc(sch *schema.Schema, doc json.RawMessage) ([]byte, error) {
if sch.Fields == nil {
return nil, nil
}

var initDoc map[string]interface{}

dec := json.NewDecoder(bytes.NewBuffer(doc))
dec.UseNumber()

if err := dec.Decode(&initDoc); err != nil {
return nil, err
}

for name := range sch.Fields {
if err := initDocTraverseFields(sch.Fields[name], initDoc, name); err != nil {
log.Debug().Err(err).Msg("init doc traverse fields")
return nil, err
}
}

return json.Marshal(initDoc)
}

func initDocTraverseFields(field *schema.Field, doc map[string]any, fieldName string) error {
switch field.Type {
case typeNumber:
doc[fieldName] = 0.0000001
case typeObject:
vo := map[string]any{}
for name := range field.Fields {
if err := initDocTraverseFields(field.Fields[name], vo, name); err != nil {
return err
}
}

doc[fieldName] = vo
case typeArray:
if field.Items.Type == typeObject {
vo := map[string]any{}
for name := range field.Items.Fields {
if err := initDocTraverseFields(field.Items.Fields[name], vo, name); err != nil {
return err
}
}

doc[fieldName] = []map[string]any{vo}
}
}

return nil
}

0 comments on commit 644a825

Please sign in to comment.