From 7afea39718ecfeb86f2042ac179f67eb05789a49 Mon Sep 17 00:00:00 2001 From: adrianwit Date: Sat, 19 May 2018 22:22:06 -0700 Subject: [PATCH] added batched inserts --- api.go | 1 + dml_builder.go | 3 + manager.go | 250 ++++++++++++++++++++++++++----------------- sql_dialect.go | 12 +++ test/travelers2.json | 2 +- 5 files changed, 168 insertions(+), 100 deletions(-) diff --git a/api.go b/api.go index 33fa8d0..8ab3303 100755 --- a/api.go +++ b/api.go @@ -25,6 +25,7 @@ type RecordMapper interface { type ParametrizedSQL struct { SQL string //Sql Values []interface{} //binding parameter values + Type int } //DmlProvider represents dml generator, which is responsible for providing parametrized sql, it takes operation type: diff --git a/dml_builder.go b/dml_builder.go index 5373b8f..f98c3c2 100755 --- a/dml_builder.go +++ b/dml_builder.go @@ -47,17 +47,20 @@ func (b *DmlBuilder) GetParametrizedSQL(sqlType int, valueProvider func(column s return &ParametrizedSQL{ SQL: b.InsertSQL, Values: b.readInsertValues(valueProvider), + Type: SQLTypeInsert, } case SQLTypeUpdate: return &ParametrizedSQL{ SQL: b.UpdateSQL, Values: b.readValues(*b.Columns, valueProvider), + Type: SQLTypeUpdate, } case SQLTypeDelete: return &ParametrizedSQL{ SQL: b.DeleteSQL, Values: b.readValues(b.TableDescriptor.PkColumns, valueProvider), + Type: SQLTypeDelete, } } panic(fmt.Sprintf("Unsupprted sqltype:%v", sqlType)) diff --git a/manager.go b/manager.go index f14bdea..01afe31 100755 --- a/manager.go +++ b/manager.go @@ -22,38 +22,38 @@ type AbstractManager struct { } //Config returns a config. -func (am *AbstractManager) Config() *Config { - return am.config +func (m *AbstractManager) Config() *Config { + return m.config } //ConnectionProvider returns a connection provider. -func (am *AbstractManager) ConnectionProvider() ConnectionProvider { - return am.connectionProvider +func (m *AbstractManager) ConnectionProvider() ConnectionProvider { + return m.connectionProvider } //Execute executes passed in sql with parameters. It returns sql result, or an error. -func (am *AbstractManager) Execute(sql string, sqlParameters ...interface{}) (result sql.Result, err error) { +func (m *AbstractManager) Execute(sql string, sqlParameters ...interface{}) (result sql.Result, err error) { var connection Connection - connection, err = am.Manager.ConnectionProvider().Get() + connection, err = m.Manager.ConnectionProvider().Get() if err != nil { return nil, err } defer connection.Close() - return am.Manager.ExecuteOnConnection(connection, sql, sqlParameters) + return m.Manager.ExecuteOnConnection(connection, sql, sqlParameters) } //ExecuteAll passed in SQL. It returns sql result, or an error. -func (am *AbstractManager) ExecuteAll(sqls []string) ([]sql.Result, error) { - connection, err := am.Manager.ConnectionProvider().Get() +func (m *AbstractManager) ExecuteAll(sqls []string) ([]sql.Result, error) { + connection, err := m.Manager.ConnectionProvider().Get() if err != nil { return nil, err } defer connection.Close() - return am.Manager.ExecuteAllOnConnection(connection, sqls) + return m.Manager.ExecuteAllOnConnection(connection, sqls) } //ExecuteAllOnConnection executes passed in SQL on connection. It returns sql result, or an error. -func (am *AbstractManager) ExecuteAllOnConnection(connection Connection, sqls []string) ([]sql.Result, error) { +func (m *AbstractManager) ExecuteAllOnConnection(connection Connection, sqls []string) ([]sql.Result, error) { var result = make([]sql.Result, len(sqls)) @@ -71,7 +71,7 @@ func (am *AbstractManager) ExecuteAllOnConnection(connection Connection, sqls [] }() for i, sql := range sqls { var err error - result[i], err = am.Manager.ExecuteOnConnection(connection, sql, nil) + result[i], err = m.Manager.ExecuteOnConnection(connection, sql, nil) if err != nil { return result, err } @@ -80,34 +80,34 @@ func (am *AbstractManager) ExecuteAllOnConnection(connection Connection, sqls [] } //ReadAllWithHandler executes query with parameters and for each fetch row call reading handler with a scanner, to continue reading next row, scanner needs to return true. -func (am *AbstractManager) ReadAllWithHandler(query string, queryParameters []interface{}, readingHandler func(scanner Scanner) (toContinue bool, err error)) error { - connection, err := am.Manager.ConnectionProvider().Get() +func (m *AbstractManager) ReadAllWithHandler(query string, queryParameters []interface{}, readingHandler func(scanner Scanner) (toContinue bool, err error)) error { + connection, err := m.Manager.ConnectionProvider().Get() if err != nil { return err } defer connection.Close() - return am.Manager.ReadAllOnWithHandlerOnConnection(connection, query, queryParameters, readingHandler) + return m.Manager.ReadAllOnWithHandlerOnConnection(connection, query, queryParameters, readingHandler) } //ReadAll executes query with parameters and fetches all table rows. The row is mapped to result slice pointer with record mapper. -func (am AbstractManager) ReadAll(resultSlicePointer interface{}, query string, queryParameters []interface{}, mapper RecordMapper) error { - connection, err := am.Manager.ConnectionProvider().Get() +func (m AbstractManager) ReadAll(resultSlicePointer interface{}, query string, queryParameters []interface{}, mapper RecordMapper) error { + connection, err := m.Manager.ConnectionProvider().Get() if err != nil { return err } defer connection.Close() - return am.Manager.ReadAllOnConnection(connection, resultSlicePointer, query, queryParameters, mapper) + return m.Manager.ReadAllOnConnection(connection, resultSlicePointer, query, queryParameters, mapper) } //ReadAllOnConnection executes query with parameters on passed in connection and fetches all table rows. The row is mapped to result slice pointer with record mapper. -func (am *AbstractManager) ReadAllOnConnection(connection Connection, resultSlicePointer interface{}, query string, queryParameters []interface{}, mapper RecordMapper) error { +func (m *AbstractManager) ReadAllOnConnection(connection Connection, resultSlicePointer interface{}, query string, queryParameters []interface{}, mapper RecordMapper) error { toolbox.AssertPointerKind(resultSlicePointer, reflect.Slice, "resultSlicePointer") slice := reflect.ValueOf(resultSlicePointer).Elem() if mapper == nil { mapper = NewRecordMapperIfNeeded(mapper, reflect.TypeOf(resultSlicePointer).Elem().Elem()) } - err := am.Manager.ReadAllOnWithHandlerOnConnection(connection, query, queryParameters, func(scannalbe Scanner) (toContinue bool, err error) { + err := m.Manager.ReadAllOnWithHandlerOnConnection(connection, query, queryParameters, func(scannalbe Scanner) (toContinue bool, err error) { mapped, providerError := mapper.Map(scannalbe) if providerError != nil { return false, fmt.Errorf("failed to map row sql: %v due to %v", query, providerError.Error()) @@ -128,17 +128,17 @@ func (am *AbstractManager) ReadAllOnConnection(connection Connection, resultSlic } //ReadSingle executes query with parameters and reads on connection single table row. The row is mapped to result pointer with record mapper. -func (am *AbstractManager) ReadSingle(resultPointer interface{}, query string, queryParameters []interface{}, mapper RecordMapper) (success bool, err error) { - connection, err := am.Manager.ConnectionProvider().Get() +func (m *AbstractManager) ReadSingle(resultPointer interface{}, query string, queryParameters []interface{}, mapper RecordMapper) (success bool, err error) { + connection, err := m.Manager.ConnectionProvider().Get() if err != nil { return false, err } defer connection.Close() - return am.Manager.ReadSingleOnConnection(connection, resultPointer, query, queryParameters, mapper) + return m.Manager.ReadSingleOnConnection(connection, resultPointer, query, queryParameters, mapper) } //ReadSingleOnConnection executes query with parameters on passed in connection and reads single table row. The row is mapped to result pointer with record mapper. -func (am *AbstractManager) ReadSingleOnConnection(connection Connection, resultPointer interface{}, query string, queryParameters []interface{}, mapper RecordMapper) (success bool, err error) { +func (m *AbstractManager) ReadSingleOnConnection(connection Connection, resultPointer interface{}, query string, queryParameters []interface{}, mapper RecordMapper) (success bool, err error) { toolbox.AssertKind(resultPointer, reflect.Ptr, "resultStruct") if mapper == nil { mapper = NewRecordMapperIfNeeded(mapper, reflect.TypeOf(resultPointer).Elem()) @@ -147,7 +147,7 @@ func (am *AbstractManager) ReadSingleOnConnection(connection Connection, resultP var mapped interface{} var elementType = reflect.TypeOf(resultPointer).Elem() - err = am.Manager.ReadAllOnWithHandlerOnConnection(connection, query, queryParameters, func(scanner Scanner) (toContinue bool, err error) { + err = m.Manager.ReadAllOnWithHandlerOnConnection(connection, query, queryParameters, func(scanner Scanner) (toContinue bool, err error) { mapped, err = mapper.Map(scanner) if err != nil { @@ -189,8 +189,8 @@ func (am *AbstractManager) ReadSingleOnConnection(connection Connection, resultP //PersistAll persists all table rows, dmlProvider is used to generate insert or update statement. It returns number of inserted, updated or error. //If driver allows this operation is executed in one transaction. -func (am *AbstractManager) PersistAll(dataPoiner interface{}, table string, provider DmlProvider) (int, int, error) { - connection, err := am.Manager.ConnectionProvider().Get() +func (m *AbstractManager) PersistAll(dataPoiner interface{}, table string, provider DmlProvider) (int, int, error) { + connection, err := m.Manager.ConnectionProvider().Get() if err != nil { return 0, 0, err } @@ -201,33 +201,33 @@ func (am *AbstractManager) PersistAll(dataPoiner interface{}, table string, prov err = connection.Begin() if err != nil { - return 0, 0, fmt.Errorf("failed to start transaction on %v due to %v", am.config.Descriptor, err) + return 0, 0, fmt.Errorf("failed to start transaction on %v due to %v", m.config.Descriptor, err) } - inserted, updated, err := am.Manager.PersistAllOnConnection(connection, dataPoiner, table, provider) + inserted, updated, err := m.Manager.PersistAllOnConnection(connection, dataPoiner, table, provider) if err == nil { commitErr := connection.Commit() if commitErr != nil { - return 0, 0, fmt.Errorf("failed to commit on %v due to %v", am.config.Descriptor, commitErr) + return 0, 0, fmt.Errorf("failed to commit on %v due to %v", m.config.Descriptor, commitErr) } } else { rollbackErr := connection.Rollback() if rollbackErr != nil { - return 0, 0, fmt.Errorf("failed to rollback on %v due to %v, %v", am.config.Descriptor, err, rollbackErr) + return 0, 0, fmt.Errorf("failed to rollback on %v due to %v, %v", m.config.Descriptor, err, rollbackErr) } } return inserted, updated, err } //RegisterDescriptorIfNeeded register a table descriptor if there it is not present, returns a pointer to a table descriptor. -func (am *AbstractManager) RegisterDescriptorIfNeeded(table string, instance interface{}) (*TableDescriptor, error) { - if !am.tableDescriptorRegistry.Has(table) { +func (m *AbstractManager) RegisterDescriptorIfNeeded(table string, instance interface{}) (*TableDescriptor, error) { + if !m.tableDescriptorRegistry.Has(table) { descriptor, err := NewTableDescriptor(table, instance) if err != nil { return nil, err } - am.tableDescriptorRegistry.Register(descriptor) + m.tableDescriptorRegistry.Register(descriptor) } - var result = am.tableDescriptorRegistry.Get(table) + var result = m.tableDescriptorRegistry.Get(table) if result != nil { return result, nil } @@ -235,18 +235,18 @@ func (am *AbstractManager) RegisterDescriptorIfNeeded(table string, instance int } //PersistAllOnConnection persists on connection all table rows, dmlProvider is used to generate insert or update statement. It returns number of inserted, updated or error. -func (am *AbstractManager) PersistAllOnConnection(connection Connection, dataPointer interface{}, table string, provider DmlProvider) (inserted int, updated int, err error) { +func (m *AbstractManager) PersistAllOnConnection(connection Connection, dataPointer interface{}, table string, provider DmlProvider) (inserted int, updated int, err error) { toolbox.AssertPointerKind(dataPointer, reflect.Slice, "resultSlicePointer") structType := reflect.TypeOf(dataPointer).Elem().Elem() provider, err = NewDmlProviderIfNeeded(provider, table, structType) if err != nil { return 0, 0, err } - descriptor, err := am.RegisterDescriptorIfNeeded(table, dataPointer) + descriptor, err := m.RegisterDescriptorIfNeeded(table, dataPointer) if err != nil { return 0, 0, err } - insertables, updatables, err := am.Manager.ClassifyDataAsInsertableOrUpdatable(connection, dataPointer, table, provider) + insertables, updatables, err := m.Manager.ClassifyDataAsInsertableOrUpdatable(connection, dataPointer, table, provider) if err != nil { return 0, 0, err } @@ -273,7 +273,7 @@ func (am *AbstractManager) PersistAllOnConnection(connection Connection, dataPoi }) } - inserted, insertErr := am.Manager.PersistData(connection, insertables, table, provider, func(item interface{}) *ParametrizedSQL { + inserted, insertErr := m.Manager.PersistData(connection, insertables, table, provider, func(item interface{}) *ParametrizedSQL { return provider.Get(SQLTypeInsert, item) }) if insertErr != nil { @@ -287,7 +287,7 @@ func (am *AbstractManager) PersistAllOnConnection(connection Connection, dataPoi } } - updated, updateErr := am.Manager.PersistData(connection, updatables, table, provider, func(item interface{}) *ParametrizedSQL { + updated, updateErr := m.Manager.PersistData(connection, updatables, table, provider, func(item interface{}) *ParametrizedSQL { return provider.Get(SQLTypeUpdate, item) }) @@ -299,14 +299,14 @@ func (am *AbstractManager) PersistAllOnConnection(connection Connection, dataPoi } //PersistSingle persists single table row, dmlProvider is used to generate insert or update statement. It returns number of inserted, updated or error. -func (am *AbstractManager) PersistSingle(dataPointer interface{}, table string, provider DmlProvider) (inserted int, updated int, err error) { +func (m *AbstractManager) PersistSingle(dataPointer interface{}, table string, provider DmlProvider) (inserted int, updated int, err error) { slice := convertToTypesSlice(dataPointer) - inserted, updated, err = am.Manager.PersistAll(slice, table, provider) + inserted, updated, err = m.Manager.PersistAll(slice, table, provider) if err != nil { return 0, 0, err } if inserted > 0 { - descriptor, err := am.RegisterDescriptorIfNeeded(table, dataPointer) + descriptor, err := m.RegisterDescriptorIfNeeded(table, dataPointer) if err != nil { return 0, 0, err } @@ -319,65 +319,117 @@ func (am *AbstractManager) PersistSingle(dataPointer interface{}, table string, } //PersistSingleOnConnection persists on connection single table row, dmlProvider is used to generate insert or udpate statement. It returns number of inserted, updated or error. -func (am *AbstractManager) PersistSingleOnConnection(connection Connection, dataPointer interface{}, table string, provider DmlProvider) (inserted int, updated int, err error) { +func (m *AbstractManager) PersistSingleOnConnection(connection Connection, dataPointer interface{}, table string, provider DmlProvider) (inserted int, updated int, err error) { slice := []interface{}{dataPointer} - return am.Manager.PersistAllOnConnection(connection, &slice, table, provider) + return m.Manager.PersistAllOnConnection(connection, &slice, table, provider) } //PersistData persist data on connection on table, keySetter is used to optionally set autoincrement column, sqlProvider handler will generate ParametrizedSQL with Insert or Update statement. -func (am *AbstractManager) PersistData(connection Connection, data []interface{}, table string, keySetter KeySetter, sqlProvider func(item interface{}) *ParametrizedSQL) (int, error) { +func (m *AbstractManager) PersistData(connection Connection, data []interface{}, table string, keySetter KeySetter, sqlProvider func(item interface{}) *ParametrizedSQL) (int, error) { var processed = 0 - for i, item := range data { + dialect := GetDatastoreDialect(m.config.DriverName) + canUseBatch := dialect != nil && dialect.CanPersistBatch() + + var batchControl = struct { + sql string + values []interface{} + firstSeq int64 + dataIndexes []int + }{ + values: []interface{}{}, + dataIndexes: []int{}, + } + + var updateId = func(index int, seq int64) { + if seq == 0 { + return + } + var ptrType = false + dataType := reflect.TypeOf(data[index]) + itemValue := reflect.ValueOf(data[index]) + if dataType.Kind() == reflect.Ptr { + dataType = dataType.Elem() + ptrType = true + } + if itemValue.Kind() == reflect.Ptr { + itemValue = itemValue.Elem() + ptrType = true + } + structPointerValue := reflect.New(dataType) + reflect.Indirect(structPointerValue).Set(itemValue) + keySetter.SetKey(structPointerValue.Interface(), seq) + if ptrType { + data[index] = structPointerValue.Interface() + } else { + data[index] = structPointerValue.Elem().Interface() + } + } + for i, item := range data { parametrizedSQL := sqlProvider(item) - if len(parametrizedSQL.Values) == 1 && strings.HasPrefix(parametrizedSQL.SQL, "UPDATE ") { + if len(parametrizedSQL.Values) == 1 && parametrizedSQL.Type == SQLTypeUpdate { //nothing to udpate, one parameter is ID=? without values to update continue } - result, err := am.Manager.ExecuteOnConnection(connection, parametrizedSQL.SQL, parametrizedSQL.Values) + + if parametrizedSQL.Type == SQLTypeInsert && canUseBatch && batchControl.firstSeq > 0 { + batchControl.dataIndexes = append(batchControl.dataIndexes, i) + if len(batchControl.sql) == 0 { + batchControl.sql = parametrizedSQL.SQL + batchControl.values = parametrizedSQL.Values + continue + } + valuesIndex := strings.Index(parametrizedSQL.SQL, " VALUES") + if valuesIndex != -1 { + batchControl.sql += "," + string(parametrizedSQL.SQL[valuesIndex+7:]) + batchControl.values = append(batchControl.values, parametrizedSQL.Values...) + } + continue + } + + result, err := m.Manager.ExecuteOnConnection(connection, parametrizedSQL.SQL, parametrizedSQL.Values) + if err != nil { + return 0, err + } + affected, err := result.RowsAffected() if err != nil { return 0, err } + processed += int(affected) + seq, _ := result.LastInsertId() + if canUseBatch && batchControl.firstSeq == 0 { + batchControl.firstSeq = seq + } + updateId(i, seq) + } + + if batchControl.sql != "" { + result, err := m.Manager.ExecuteOnConnection(connection, batchControl.sql, batchControl.values) + if err != nil { + return 0, err + } affected, err := result.RowsAffected() if err != nil { return 0, err } processed += int(affected) - seq, lastInsertErr := result.LastInsertId() - - if lastInsertErr == nil && seq > 0 { - var ptrType = false - dataType := reflect.TypeOf(data[i]) - itemValue := reflect.ValueOf(data[i]) - if dataType.Kind() == reflect.Ptr { - dataType = dataType.Elem() - ptrType = true - } - if itemValue.Kind() == reflect.Ptr { - itemValue = itemValue.Elem() - ptrType = true - } - structPointerValue := reflect.New(dataType) - reflect.Indirect(structPointerValue).Set(itemValue) - keySetter.SetKey(structPointerValue.Interface(), seq) - if ptrType { - data[i] = structPointerValue.Interface() - } else { - data[i] = structPointerValue.Elem().Interface() - } + for _, i := range batchControl.dataIndexes { + batchControl.firstSeq++ + updateId(i, batchControl.firstSeq) } } + return processed, nil } -func (am *AbstractManager) fetchDataInBatches(connection Connection, sqlsWihtArguments []*ParametrizedSQL, mapper RecordMapper) (*[][]interface{}, error) { +func (m *AbstractManager) fetchDataInBatches(connection Connection, sqlsWihtArguments []*ParametrizedSQL, mapper RecordMapper) (*[][]interface{}, error) { var rows = make([][]interface{}, 0) for _, sqlWihtArguments := range sqlsWihtArguments { if len(sqlWihtArguments.Values) == 0 { break } - err := am.Manager.ReadAllOnConnection(connection, &rows, sqlWihtArguments.SQL, sqlWihtArguments.Values, mapper) + err := m.Manager.ReadAllOnConnection(connection, &rows, sqlWihtArguments.SQL, sqlWihtArguments.Values, mapper) if err != nil { return nil, err } @@ -385,9 +437,9 @@ func (am *AbstractManager) fetchDataInBatches(connection Connection, sqlsWihtArg return &rows, nil } -func (am *AbstractManager) fetchExistingData(connection Connection, table string, pkValues [][]interface{}, provider DmlProvider) ([][]interface{}, error) { +func (m *AbstractManager) fetchExistingData(connection Connection, table string, pkValues [][]interface{}, provider DmlProvider) ([][]interface{}, error) { var rows = make([][]interface{}, 0) - descriptor := am.tableDescriptorRegistry.Get(table) + descriptor := m.tableDescriptorRegistry.Get(table) if len(pkValues) > 0 { descriptor := TableDescriptor{Table: table, PkColumns: descriptor.PkColumns} @@ -395,7 +447,7 @@ func (am *AbstractManager) fetchExistingData(connection Connection, table string sqlWithArguments := sqlBuilder.BuildBatchedQueryOnPk(descriptor.PkColumns, pkValues, batchSize) var mapper = NewColumnarRecordMapper(false, reflect.TypeOf(rows)) - batched, err := am.fetchDataInBatches(connection, sqlWithArguments, mapper) + batched, err := m.fetchDataInBatches(connection, sqlWithArguments, mapper) if err != nil { return nil, err } @@ -405,7 +457,7 @@ func (am *AbstractManager) fetchExistingData(connection Connection, table string } //ClassifyDataAsInsertableOrUpdatable classifies passed in data as insertable or updatable. -func (am *AbstractManager) ClassifyDataAsInsertableOrUpdatable(connection Connection, dataPointer interface{}, table string, provider DmlProvider) ([]interface{}, []interface{}, error) { +func (m *AbstractManager) ClassifyDataAsInsertableOrUpdatable(connection Connection, dataPointer interface{}, table string, provider DmlProvider) ([]interface{}, []interface{}, error) { if provider == nil { return nil, nil, errors.New("provider was nil") } @@ -423,7 +475,7 @@ func (am *AbstractManager) ClassifyDataAsInsertableOrUpdatable(connection Connec return true }) //fetch all existing pk values into rows to classify as updatable - rows, err := am.fetchExistingData(connection, table, pkValues, provider) + rows, err := m.fetchExistingData(connection, table, pkValues, provider) if err != nil { return nil, nil, fmt.Errorf("failed to fetch existing data: due to:\n\t%v", err.Error()) } @@ -450,25 +502,25 @@ func (am *AbstractManager) ClassifyDataAsInsertableOrUpdatable(connection Connec } //DeleteAll deletes all rows for passed in table, key provider is used to extract primary keys. It returns number of deleted rows or error. -func (am *AbstractManager) DeleteAll(dataPointer interface{}, table string, keyProvider KeyGetter) (deleted int, err error) { - connection, err := am.Manager.ConnectionProvider().Get() +func (m *AbstractManager) DeleteAll(dataPointer interface{}, table string, keyProvider KeyGetter) (deleted int, err error) { + connection, err := m.Manager.ConnectionProvider().Get() if err != nil { return 0, err } err = connection.Begin() if err != nil { - return 0, fmt.Errorf("failed to start transaction on %v due to %v", am.config.Descriptor, err) + return 0, fmt.Errorf("failed to start transaction on %v due to %v", m.config.Descriptor, err) } - deleted, err = am.DeleteAllOnConnection(connection, dataPointer, table, keyProvider) + deleted, err = m.DeleteAllOnConnection(connection, dataPointer, table, keyProvider) if err == nil { commitErr := connection.Commit() if commitErr != nil { - return 0, fmt.Errorf("failed to commit on %v due to %v", am.config.Descriptor, commitErr) + return 0, fmt.Errorf("failed to commit on %v due to %v", m.config.Descriptor, commitErr) } } else { rollbackErr := connection.Rollback() if rollbackErr != nil { - return 0, fmt.Errorf("failed to rollback on %v due to %v, %v", am.config.Descriptor, err, rollbackErr) + return 0, fmt.Errorf("failed to rollback on %v due to %v, %v", m.config.Descriptor, err, rollbackErr) } } return deleted, err @@ -476,7 +528,7 @@ func (am *AbstractManager) DeleteAll(dataPointer interface{}, table string, keyP //DeleteAllOnConnection deletes all rows on connection from table, key provider is used to extract primary keys. It returns number of deleted rows or error. //If driver allows this operation is executed in one transaction. -func (am *AbstractManager) DeleteAllOnConnection(connection Connection, dataPointer interface{}, table string, keyProvider KeyGetter) (deleted int, err error) { +func (m *AbstractManager) DeleteAllOnConnection(connection Connection, dataPointer interface{}, table string, keyProvider KeyGetter) (deleted int, err error) { deleted = 0 structType := toolbox.DiscoverTypeByKind(dataPointer, reflect.Struct) @@ -484,9 +536,9 @@ func (am *AbstractManager) DeleteAllOnConnection(connection Connection, dataPoin if err != nil { return 0, err } - am.RegisterDescriptorIfNeeded(table, dataPointer) + m.RegisterDescriptorIfNeeded(table, dataPointer) - descriptor := am.tableDescriptorRegistry.Get(table) + descriptor := m.tableDescriptorRegistry.Get(table) toolbox.ProcessSlice(dataPointer, func(item interface{}) bool { if err != nil { return false @@ -501,7 +553,7 @@ func (am *AbstractManager) DeleteAllOnConnection(connection Connection, dataPoin } dml := fmt.Sprintf(deleteSQLTemplate, table, where) var result sql.Result - result, err = am.Manager.ExecuteOnConnection(connection, dml, keyProvider.Key(item)) + result, err = m.Manager.ExecuteOnConnection(connection, dml, keyProvider.Key(item)) if err != nil { return false } @@ -519,25 +571,25 @@ func (am *AbstractManager) DeleteAllOnConnection(connection Connection, dataPoin } //DeleteSingle deletes single row from table on for passed in data pointer, key provider is used to extract primary keys. It returns boolean if successful, or error. -func (am *AbstractManager) DeleteSingle(dataPointer interface{}, table string, keyProvider KeyGetter) (bool, error) { - connection, err := am.Manager.ConnectionProvider().Get() +func (m *AbstractManager) DeleteSingle(dataPointer interface{}, table string, keyProvider KeyGetter) (bool, error) { + connection, err := m.Manager.ConnectionProvider().Get() if err != nil { return false, err } err = connection.Begin() if err != nil { - return false, fmt.Errorf("failed to start transaction on %v due to %v", am.config.Descriptor, err) + return false, fmt.Errorf("failed to start transaction on %v due to %v", m.config.Descriptor, err) } - suceess, err := am.DeleteSingleOnConnection(connection, dataPointer, table, keyProvider) + suceess, err := m.DeleteSingleOnConnection(connection, dataPointer, table, keyProvider) if err == nil { commitErr := connection.Commit() if commitErr != nil { - return false, fmt.Errorf("failed to commit on %v due to %v", am.config.Descriptor, commitErr) + return false, fmt.Errorf("failed to commit on %v due to %v", m.config.Descriptor, commitErr) } } else { rollbackErr := connection.Rollback() if rollbackErr != nil { - return false, fmt.Errorf("failed to rollback on %v due to %v, %v", am.config.Descriptor, err, rollbackErr) + return false, fmt.Errorf("failed to rollback on %v due to %v, %v", m.config.Descriptor, err, rollbackErr) } } return suceess, err @@ -554,10 +606,10 @@ func convertToTypesSlice(dataPointer interface{}) interface{} { } //DeleteSingleOnConnection deletes data on connection from table on for passed in data pointer, key provider is used to extract primary keys. It returns true if successful. -func (am *AbstractManager) DeleteSingleOnConnection(connection Connection, dataPointer interface{}, table string, keyProvider KeyGetter) (bool, error) { +func (m *AbstractManager) DeleteSingleOnConnection(connection Connection, dataPointer interface{}, table string, keyProvider KeyGetter) (bool, error) { toolbox.AssertPointerKind(dataPointer, reflect.Struct, "dataPointer") slice := convertToTypesSlice(dataPointer) - deleted, err := am.Manager.DeleteAllOnConnection(connection, slice, table, keyProvider) + deleted, err := m.Manager.DeleteAllOnConnection(connection, slice, table, keyProvider) if err != nil { return false, err } @@ -565,7 +617,7 @@ func (am *AbstractManager) DeleteSingleOnConnection(connection Connection, dataP } //ExpandSQL expands sql with passed in arguments -func (am *AbstractManager) ExpandSQL(sql string, arguments []interface{}) string { +func (m *AbstractManager) ExpandSQL(sql string, arguments []interface{}) string { for _, arg := range arguments { var stringArg = toolbox.AsString(arg) if toolbox.IsString(arg) || toolbox.CanConvertToString(arg) { @@ -577,8 +629,8 @@ func (am *AbstractManager) ExpandSQL(sql string, arguments []interface{}) string } //TableDescriptorRegistry returns a table descriptor registry -func (am *AbstractManager) TableDescriptorRegistry() TableDescriptorRegistry { - return am.tableDescriptorRegistry +func (m *AbstractManager) TableDescriptorRegistry() TableDescriptorRegistry { + return m.tableDescriptorRegistry } //NewAbstractManager create a new abstract manager, it takes config, conneciton provider, and target (sub class) manager diff --git a/sql_dialect.go b/sql_dialect.go index e28fbb7..076e86d 100755 --- a/sql_dialect.go +++ b/sql_dialect.go @@ -361,6 +361,10 @@ type mySQLDialect struct { DatastoreDialect } +func (d mySQLDialect) CanPersistBatch() bool { + return true +} + func newMySQLDialect() mySQLDialect { return mySQLDialect{DatastoreDialect: NewSQLDatastoreDialect(ansiTableListSQL, ansiSequenceSQL, defaultSchemaSQL, ansiSchemaListSQL, ansiPrimaryKeySQL, mysqlDisableForeignCheck, mysqlEnableForeignCheck, defaultAutoincremetSQL, ansiTableInfo, 0)} } @@ -438,6 +442,10 @@ type pgDialect struct { DatastoreDialect } +func (d pgDialect) CanPersistBatch() bool { + return true +} + func newPgDialect() *pgDialect { return &pgDialect{DatastoreDialect: NewSQLDatastoreDialect(pgTableListSQL, "", pgCurrentSchemaSQL, pgSchemaListSQL, pgPrimaryKeySQL, "", "", pgAutoincrementSQL, ansiTableInfo, 0)} } @@ -492,6 +500,10 @@ type oraDialect struct { DatastoreDialect } +func (d oraDialect) CanPersistBatch() bool { + return true +} + //CreateDatastore create a new datastore (database/schema), it takes manager and target datastore func (d oraDialect) CreateDatastore(manager Manager, datastore string) error { var password, ok = manager.Config().Parameters["password"] diff --git a/test/travelers2.json b/test/travelers2.json index 310563f..e099582 100644 --- a/test/travelers2.json +++ b/test/travelers2.json @@ -1 +1 @@ -{"Achievements":["z","g"],"Id":20,"LastVisitTime":"2018-02-25 09:41:12","MostLikedCity":{"City":"Moscow","Souvenirs":["s3","sN"],"Visits":3},"Name":"Robin"} +{"Achievements":["z","g"],"Id":20,"LastVisitTime":"2018-05-19 10:21:29","MostLikedCity":{"City":"Moscow","Souvenirs":["s3","sN"],"Visits":3},"Name":"Robin"}