Skip to content

SQL Bulk Insert Upsert Destination

Thibaut Barrère edited this page May 26, 2020 · 6 revisions

Kiba Pro SQLBulkInsert destination tightly integrates with the Sequel rubygem in order to provide an efficient & flexible way to insert large batches of row.

You can also achieve batch upserts on Postgres (via INSERT ON CONFLICT) and on MYSQL (via ON DUPLICATE KEY UPDATE). More information below.

Currently tested against: PostgreSQL 9.5+, MySQL 5.5+, Ruby 2.4-2.7.

Requirements: make sure to add those to your Gemfile:

  • sequel gem
  • pg gem (if you connect to Postgres)
  • mysql2 gem (if you connect to MySQL)

Important note: all the rows must be Hash instances with exactly the same keys. An error will be raised otherwise.

Typical use

require 'kiba-pro/destinations/sql_bulk_insert'

destination Kiba::Pro::Destinations::SQLBulkInsert,
  # NOTE: a live Sequel connection must be passed here
  database: db,
  table: :products,
  buffer_size: 20_000

You can tweak the buffer size - make sure to benchmark your setup before doing so!

row_pre_processor option

You can massage the row before its insertion using the row_pre_processor callback.

destination Kiba::Pro::Destinations::SQLBulkInsert,
  row_pre_processor: -> (row) { xxx }

Where xxx can be:

  • some recomputation of the row (e.g.: row.except(:created_at))
  • nil to indicate that the row should not be inserted
  • an Array of rows (e.g.: [row.fetch(:first_sub_row), row.fetch(:second_sub_row)]

This feature also allows you to target different databases or tables in a single Kiba pass:

destination Kiba::Pro::Destinations::SQLBulkInsert,
  # SNIP
  table: 'orders',
  row_pre_processor: -> (row) { row.except(:line_items }
destination Kiba::Pro::Destinations::SQLBulkInsert,
  # SNIP
  table: 'order_line_items',
  row_pre_processor: -> (row) { row.fetch(:line_items }

Note: Hash#except is available via require 'active_support/core_ext/hash/except'.

When targeting tables with relationships, you can leverage before_flush and after_initialize to ensure the parent table will be flushed first, to ensure foreign keys presence is honoured first.

before_flush and after_initialize options

These callbacks are mostly useful when you write to 2 destinations at once, with foreign key constraints requiring that the first destination is always flushed before the second:

first_destination = nil

destination Kiba::Pro::Destinations::SQLBulkInsert,
  after_initialize: -> (d) { first_destination = d }

destination Kiba::Pro::Destinations::SQLBulkInsert,
  before_flush: -> { first_destination.flush }

MySQL "ON DUPLICATE KEY UPDATE" support

You can use the dataset keyword parameter to dynamically adjust the target dataset and use ON DUPLICATE KEY UPDATE:

destination Kiba::Pro::Destinations::SQLBulkInsert,
  dataset: -> (dataset) {
    dataset.on_duplicate_key_update(:column_name)
  }

See Sequel MySQL related documentation for full syntax.

Postgres "INSERT ON CONFLICT" support

You can use the dataset keyword parameter to dynamically adjust the target dataset:

destination Kiba::Pro::Destinations::SQLBulkInsert,
  dataset: -> (dataset) {
    # manipulate the dataset here then return it, e.g.:
    dataset.insert_conflict(target: :id)
  }

See Sequel postgres documentation for full syntax.

To stop the insert if a row with the id already exists, use:

dataset.insert_conflict(target: :id)

One can also work with a given constraint, instead of a target:

dataset.insert_conflict(constraint: xyz)

To replace only a few columns with new values in case of update, you must explicitly refer to new values with Sequel[:excluded]:

dataset.insert_conflict(
  target: :id,
  update: {
    price: Sequel[:excluded][:price],
    quantity: Sequel[:excluded][:quantity]
  }
)

To replace all columns except a few, here is a nice trick:

# NOTE: this requires Ruby 2.6+
update_clause = (db[:items].columns - [:id]).to_h { |c| [c, Sequel[:excluded][c] }

source Kiba::Pro::Destinations::SQLBulkInsert,
  # SNIP
  dataset: -> (dataset) {
    dataset.insert_conflict(
      target: :id,
      update: update_clause
    )
  }

One can also refer to both replaced and previous values, e.g.:

dataset: -> (dataset) {
  dataset.insert_conflict(
    target: :id,
    update: {
      count: Sequel[:excluded][:count] + Sequel[:counters][:count]
    }
  )
}

Finally, one can conditionally decide to update using the update_where Sequel keyword:

dataset: -> (dataset) {
  dataset.insert_conflict(
    target: :id,
    update: { ... },
    update_where: {Sequel[:excluded][:updated_at] >= Sequel[:items][:updated_at]}