Skip to content

Commit

Permalink
get by index
Browse files Browse the repository at this point in the history
  • Loading branch information
latolukasz committed Feb 21, 2024
1 parent 3814a8c commit a4364a6
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 240 deletions.
10 changes: 0 additions & 10 deletions bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ func (b *BindError) Error() string {
return "[" + b.Field + "] " + b.Message
}

type DuplicatedKeyBindError struct {
Index string
ID uint64
Columns []string
}

func (d *DuplicatedKeyBindError) Error() string {
return "duplicated value for unique index '" + d.Index + "'"
}

func (b Bind) Get(key string) any {
return b[key]
}
Expand Down
28 changes: 15 additions & 13 deletions edit_entity_field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,19 +463,21 @@ func testUpdateFieldExecute(t *testing.T, async, local, redis bool) {
assert.Equal(t, int16(12), entity.Int)

/* unique index */
err = runEditEntityField(orm, entity, "Name", "name 3", async)
assert.EqualError(t, err, "duplicated value for unique index 'Name'")
orm.ClearFlush()
err = runEditEntityField(orm, entity, "Name", "name 100", async)
assert.NoError(t, err)
entity, _ = GetByUniqueIndex[updateEntity](orm, "Name", "name 100")
assert.NotNil(t, entity)
assert.Equal(t, ids[1], uint64(entity.ID))
err = runEditEntityField(orm, entity, "Int", 100, async)
assert.NoError(t, err)
entity, _ = GetByUniqueIndex[updateEntity](orm, "Multi", 13, 100)
assert.NotNil(t, entity)
assert.Equal(t, ids[1], uint64(entity.ID))
if !async {
err = runEditEntityField(orm, entity, "Name", "name 3", async)
assert.EqualError(t, err, "Error 1062 (23000): Duplicate entry 'name 3' for key 'updateEntity.Name'")
orm.ClearFlush()
err = runEditEntityField(orm, entity, "Name", "name 100", async)
assert.NoError(t, err)
entity, _ = GetByUniqueIndex[updateEntity](orm, "Name", "name 100")
assert.NotNil(t, entity)
assert.Equal(t, ids[1], uint64(entity.ID))
err = runEditEntityField(orm, entity, "Int", 100, async)
assert.NoError(t, err)
entity, _ = GetByUniqueIndex[updateEntity](orm, "Multi", 13, 100)
assert.NotNil(t, entity)
assert.Equal(t, ids[1], uint64(entity.ID))
}
}

func runEditEntityField(orm ORM, entity *updateEntity, field string, value any, async bool) error {
Expand Down
65 changes: 40 additions & 25 deletions entity_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ type entitySchema struct {
fieldBindSetters map[string]fieldBindSetter
fieldSetters map[string]fieldSetter
fieldGetters map[string]fieldGetter
uniqueIndices map[string][]string
uniqueIndexes map[string]indexDefinition
uniqueIndexesColumns map[string][]string
cachedUniqueIndexes map[string]indexDefinition
references map[string]referenceDefinition
cachedReferences map[string]referenceDefinition
indexes map[string]indexDefinition
Expand Down Expand Up @@ -264,7 +266,7 @@ func (e *entitySchema) GetColumns() []string {
}

func (e *entitySchema) GetUniqueIndexes() map[string][]string {
return e.uniqueIndices
return e.uniqueIndexesColumns
}

func (e *entitySchema) GetSchemaChanges(orm ORM) (alters []Alter, has bool) {
Expand Down Expand Up @@ -392,34 +394,25 @@ func (e *entitySchema) init(registry *registry, entityType reflect.Type) error {
e.asyncCacheKey = asyncGroup
}
e.asyncTemporaryQueue = xsync.NewMPMCQueueOf[asyncTemporaryQueueEvent](10000)
e.uniqueIndices = make(map[string][]string)
for name, index := range uniqueIndices {
e.uniqueIndices[name] = make([]string, len(index))
e.uniqueIndexes = make(map[string]indexDefinition)
e.cachedIndexes = make(map[string]indexDefinition)
e.cachedUniqueIndexes = make(map[string]indexDefinition)
e.uniqueIndexesColumns = make(map[string][]string)
for indexName, index := range uniqueIndices {
e.uniqueIndexesColumns[indexName] = make([]string, len(index))
for i := 1; i <= len(index); i++ {
e.uniqueIndices[name][i-1] = index[i]
e.uniqueIndexesColumns[indexName][i-1] = index[i]
}
definition := createIndexDefinition(index, e)
e.uniqueIndexes[indexName] = definition
if definition.Cached {
e.cachedUniqueIndexes[indexName] = definition
}
}
for indexName, indexColumns := range indices {
where := ""
for i := 0; i < len(indexColumns); i++ {
if i > 0 {
where += " AND "
}
where += "`" + indexColumns[i+1] + "`=?"
}
cached := false
tags, hasTag := e.tags[indexColumns[1]]
if hasTag {
cached = tags["cached"] == "true"
}
columnsList := make([]string, len(indexColumns))
for j := 0; j < len(indexColumns); j++ {
columnsList[j] = indexColumns[j+1]
}

definition := indexDefinition{Where: where, Cached: cached, Columns: columnsList}
definition := createIndexDefinition(indexColumns, e)
e.indexes[indexName] = definition
if cached {
if definition.Cached {
e.cachedIndexes[indexName] = definition
}
}
Expand All @@ -439,6 +432,28 @@ func (e *entitySchema) init(registry *registry, entityType reflect.Type) error {
return nil
}

func createIndexDefinition(indexColumns map[int]string, e *entitySchema) indexDefinition {
where := ""
for i := 0; i < len(indexColumns); i++ {
if i > 0 {
where += " AND "
}
where += "`" + indexColumns[i+1] + "`=?"
}
cached := false
tags, hasTag := e.tags[indexColumns[1]]
if hasTag {
cached = tags["cached"] == "true"
}
columnsList := make([]string, len(indexColumns))
for j := 0; j < len(indexColumns); j++ {
columnsList[j] = indexColumns[j+1]
}

definition := indexDefinition{Where: where, Cached: cached, Columns: columnsList}
return definition
}

func (e *entitySchema) validateIndexes(uniqueIndices map[string]map[int]string, indices map[string]map[int]string) error {
all := make(map[string]map[int]string)
for k, v := range uniqueIndices {
Expand Down
39 changes: 19 additions & 20 deletions flush.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package beeorm

import (
"fmt"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -56,13 +57,22 @@ func (orm *ormImplementation) flush(async bool) error {
}
}
}
var err error
if !async {
func() {
var transactions []DBTransaction
defer func() {
for _, tx := range transactions {
tx.Rollback(orm)
}
if rec := recover(); rec != nil {
asErr, isErr := rec.(error)
if isErr {
err = asErr
return
}
err = fmt.Errorf("%v", rec)
}
}()
for code, actions := range orm.flushDBActions {
var d DBBase
Expand Down Expand Up @@ -91,7 +101,7 @@ func (orm *ormImplementation) flush(async bool) error {
orm.flushDBActions = nil
orm.flushPostActions = orm.flushPostActions[0:0]
orm.redisPipeLines = nil
return nil
return err
}

func (orm *ormImplementation) ClearFlush() {
Expand Down Expand Up @@ -287,20 +297,15 @@ func (orm *ormImplementation) handleInserts(async bool, schema *entitySchema, op
}
}
}
uniqueIndexes := schema.GetUniqueIndexes()
uniqueIndexes := schema.cachedUniqueIndexes
if len(uniqueIndexes) > 0 {
cache := orm.Engine().Redis(schema.getForcedRedisCode())
for indexName, indexColumns := range uniqueIndexes {
for indexName, definition := range uniqueIndexes {
hSetKey := schema.getCacheKey() + ":" + indexName
hField, hasKey := buildUniqueKeyHSetField(schema, indexColumns, bind, nil)
hField, hasKey := buildUniqueKeyHSetField(schema, definition.Columns, bind, nil)
if !hasKey {
continue
}
previousID, inUse := cache.HGet(orm, hSetKey, hField)
if inUse {
idAsUint, _ := strconv.ParseUint(previousID, 10, 64)
return &DuplicatedKeyBindError{Index: indexName, ID: idAsUint, Columns: indexColumns}
}
orm.RedisPipeLine(cache.GetConfig().GetCode()).HSet(hSetKey, hField, strconv.FormatUint(insert.ID(), 10))
}
}
Expand Down Expand Up @@ -445,12 +450,11 @@ func (orm *ormImplementation) handleUpdates(async bool, schema *entitySchema, op
}
}
}
uniqueIndexes := schema.GetUniqueIndexes()
if len(uniqueIndexes) > 0 {
if len(schema.cachedUniqueIndexes) > 0 {
cache := orm.Engine().Redis(schema.getForcedRedisCode())
for indexName, indexColumns := range uniqueIndexes {
for indexName, definition := range schema.cachedUniqueIndexes {
indexChanged := false
for _, column := range indexColumns {
for _, column := range definition.Columns {
_, changed := newBind[column]
if changed {
indexChanged = true
Expand All @@ -461,16 +465,11 @@ func (orm *ormImplementation) handleUpdates(async bool, schema *entitySchema, op
continue
}
hSetKey := schema.getCacheKey() + ":" + indexName
hField, hasKey := buildUniqueKeyHSetField(schema, indexColumns, newBind, forcedNew)
hField, hasKey := buildUniqueKeyHSetField(schema, definition.Columns, newBind, forcedNew)
if hasKey {
previousID, inUse := cache.HGet(orm, hSetKey, hField)
if inUse {
idAsUint, _ := strconv.ParseUint(previousID, 10, 64)
return &DuplicatedKeyBindError{Index: indexName, ID: idAsUint, Columns: indexColumns}
}
orm.RedisPipeLine(cache.GetConfig().GetCode()).HSet(hSetKey, hField, strconv.FormatUint(update.ID(), 10))
}
hFieldOld, hasKey := buildUniqueKeyHSetField(schema, indexColumns, oldBind, forcedOld)
hFieldOld, hasKey := buildUniqueKeyHSetField(schema, definition.Columns, oldBind, forcedOld)
if hasKey {
orm.RedisPipeLine(cache.GetConfig().GetCode()).HDel(hSetKey, hFieldOld)
}
Expand Down
56 changes: 25 additions & 31 deletions flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func testFlushInsert(t *testing.T, async, local, redis bool) {
newEntity.ReferenceRequired = Reference[flushEntityReference](reference.ID)
newEntity.Name = "Name"
assert.NotEmpty(t, newEntity.ID)
firstEntityID := newEntity.ID
assert.NoError(t, testFlush(orm, async))
loggerDB.Clear()

Expand Down Expand Up @@ -640,29 +639,25 @@ func testFlushInsert(t *testing.T, async, local, redis bool) {
assert.Equal(t, "TimeWithTimeNullable", err.(*BindError).Field)
orm.ClearFlush()

// duplicated key
newEntity = NewEntity[flushEntity](orm)
newEntity.City = "Another city "
newEntity.Name = "Name"
newEntity.ReferenceRequired = Reference[flushEntityReference](reference.ID)
err = testFlush(orm, async)
assert.EqualError(t, err, "duplicated value for unique index 'name'")
assert.Equal(t, uint64(firstEntityID), err.(*DuplicatedKeyBindError).ID)
assert.Equal(t, "name", err.(*DuplicatedKeyBindError).Index)
assert.Equal(t, []string{"Name"}, err.(*DuplicatedKeyBindError).Columns)
orm.ClearFlush()
if !async {

orm.Engine().Redis(DefaultPoolCode).FlushDB(orm)
LoadUniqueKeys(orm, false)
newEntity = NewEntity[flushEntity](orm)
newEntity.Name = "Name"
newEntity.ReferenceRequired = Reference[flushEntityReference](reference.ID)
err = testFlush(orm, async)
assert.EqualError(t, err, "duplicated value for unique index 'name'")
assert.Equal(t, uint64(firstEntityID), err.(*DuplicatedKeyBindError).ID)
assert.Equal(t, "name", err.(*DuplicatedKeyBindError).Index)
assert.Equal(t, []string{"Name"}, err.(*DuplicatedKeyBindError).Columns)
orm.ClearFlush()
// duplicated key
newEntity = NewEntity[flushEntity](orm)
newEntity.City = "Another city "
newEntity.Name = "Name"
newEntity.ReferenceRequired = Reference[flushEntityReference](reference.ID)
err = testFlush(orm, async)
assert.EqualError(t, err, "Error 1062 (23000): Duplicate entry 'Name' for key 'flushEntity.name'")
orm.ClearFlush()

orm.Engine().Redis(DefaultPoolCode).FlushDB(orm)
newEntity = NewEntity[flushEntity](orm)
newEntity.Name = "Name"
newEntity.ReferenceRequired = Reference[flushEntityReference](reference.ID)
err = testFlush(orm, async)
assert.EqualError(t, err, "Error 1062 (23000): Duplicate entry 'Name' for key 'flushEntity.name'")
orm.ClearFlush()
}
}

func testFlushDelete(t *testing.T, async, local, redis bool) {
Expand Down Expand Up @@ -1153,14 +1148,13 @@ func testFlushUpdate(t *testing.T, async, local, redis bool) {
newEntity.Name = "Name 2"
assert.NoError(t, testFlush(orm, async))

editedEntity = EditEntity(orm, editedEntity)
editedEntity.Name = "Name 2"
err = testFlush(orm, async)
assert.EqualError(t, err, "duplicated value for unique index 'name'")
assert.Equal(t, uint64(newEntity.ID), err.(*DuplicatedKeyBindError).ID)
assert.Equal(t, "name", err.(*DuplicatedKeyBindError).Index)
assert.Equal(t, []string{"Name"}, err.(*DuplicatedKeyBindError).Columns)
orm.ClearFlush()
if !async {
editedEntity = EditEntity(orm, editedEntity)
editedEntity.Name = "Name 2"
err = testFlush(orm, async)
assert.EqualError(t, err, "Error 1062 (23000): Duplicate entry 'Name 2' for key 'flushEntity.name'")
orm.ClearFlush()
}

editedEntity = EditEntity(orm, newEntity)
editedEntity.Name = "Name 3"
Expand Down
Loading

0 comments on commit a4364a6

Please sign in to comment.