Onyx plugin for Datomic
Clojure Shell
Permalink
Failed to load latest commit information.
scripts Fix release script Jul 1, 2016
src/onyx Use new plugin interface. Feb 22, 2017
test-resources Don't include config.edn in the uberjar Aug 2, 2016
test/onyx/plugin trigger/id is now required. Feb 10, 2017
.gitignore
LICENSE Onyx datomic adapter. Apr 15, 2014
README.md Support 0.10.0 and resume points Dec 31, 2016
changes.md Support 0.10.0 and resume points Dec 31, 2016
circle.yml cosmetic fixes Mar 15, 2016
project.clj Release version 0.10.0.0-beta5. Feb 22, 2017
start_datomic.sh No need to slurp ip any more - works on circle Aug 30, 2015

README.md

onyx-datomic

Onyx plugin providing read and write facilities for batch processing a Datomic database.

Installation

In your project file:

[org.onyxplatform/onyx-datomic "0.9.15.1-SNAPSHOT"]

In your peer boot-up namespace:

(:require [onyx.plugin.datomic])

ABS ISSUES

  • no backoff in log reader
  • Test crashing
  • checkpointing for log reader can't be global / savepoints
    • once savepoints work, fix onyx.plugin.input-log-test

Functions

read-datoms

Reads datoms out of a Datomic database via datomic.api/datoms.

Catalog entry:

{:onyx/name :read-datoms
 :onyx/plugin :onyx.plugin.datomic/read-datoms
 :onyx/type :input
 :onyx/medium :datomic
 :datomic/uri db-uri
 :datomic/t t
 :datomic/datoms-index :eavt
 :datomic/datoms-components []
 :datomic/datoms-per-segment 20
 :onyx/max-peers 1
 :onyx/batch-size batch-size
 :onyx/doc "Reads a sequence of datoms from the d/datoms API"}

:datomic/datoms-components may be used to filter by a datomic index. See the Clojure Cookbook for examples.

Lifecycle entry:

{:lifecycle/task :read-datoms
 :lifecycle/calls :onyx.plugin.datomic/read-datoms-calls}
Attributes
key type description
:datomic/uri string The URI of the datomic database to connect to
:datomic/t integer The t-value of the database to read from
:datomic/datoms-index keyword datomic index to use in datomic.api/datoms call
:datomic/datoms-components keyword components of the datomic index to use (see datomic.api/datoms documentation)
:datomic/datoms-per-segment integer The number of datoms to compress into a single segment
:datomic/read-buffer integer The number of segments to buffer after partitioning, default is 1000
read-index-range

Reads datoms from an indexed attribute via datomic.api/index-range.

Catalog entry:

{:onyx/name :read-index-datoms
 :onyx/plugin :onyx.plugin.datomic/read-index-range
 :onyx/type :input
 :onyx/medium :datomic
 :datomic/uri db-uri
 :datomic/t t
 :datomic/index-attribute :your-indexed-attribute
 :datomic/index-range-start <<INDEX_START_VALUE>>
 :datomic/index-range-end <<INDEX_END_VALUE>>
 :datomic/datoms-per-segment 20
 :onyx/max-peers 1
 :onyx/batch-size batch-size
 :onyx/doc "Reads a range of datoms from the d/index-range API"}

Lifecycle entry:

{:lifecycle/task :read-index-datoms
 :lifecycle/calls :onyx.plugin.datomic/read-index-range-calls}
Attributes
key type description
:datomic/uri string The URI of the datomic database to connect to
:datomic/t integer The t-value of the database to read from
:datomic/index-attribute keyword datomic indexed attribute
:datomic/index-range-start any inclusive start value for the index range
:datomic/index-range-end any exclusive end value for the index range
:datomic/datoms-per-segment integer The number of datoms to compress into a single segment
:datomic/read-buffer integer The number of segments to buffer after partitioning, default is 1000
read-log

Reads the transaction log via repeated chunked calls of d/tx-range. Continues to read transactions until :datomic/log-end-tx is reached, or forever if :datomic/log-end-tx is nil.

Catalog entry:

{:onyx/name :read-log
 :onyx/plugin :onyx.plugin.datomic/read-log
 :onyx/type :input
 :onyx/medium :datomic
 :datomic/uri db-uri
 :datomic/log-start-tx <<OPTIONAL_TX_START_INDEX>>
 :datomic/log-end-tx <<OPTIONAL_TX_END_INDEX>>
 :onyx/max-peers 1
 :onyx/batch-size batch-size
 :onyx/doc "Reads a sequence of datoms from the d/log API"}

Lifecycle entry:

{:lifecycle/task :read-log
 :lifecycle/calls :onyx.plugin.datomic/read-log-calls}

Task will emit a sentinel :done when it reaches the tx log-end-tx (exclusive).

Segments will be read in the form {:t tx-id :data [[e a v t added] [e a v t added]]}.

FIXME FIXME FIXME ADD DOCUMENTATION.

Attributes
key type description
:datomic/uri string The URI of the datomic database to connect to
:datomic/log-start-tx integer optional starting tx (inclusive) for log read
:datomic/log-end-tx integer optional ending tx (inclusive) for log read. Sentinel will emitted when this tx is passed.
:datomic/read-buffer integer The number of segments to buffer after partitioning, default is 1000
commit-tx

Writes new entity maps to datomic. Will automatically assign tempid's for the partition if a value for :datomic/partition is supplied and datomic transaction data is in map form. tx-data returned by datomic.api/transact is injected into the pipeline event map under :datomic/written.

Catalog entry:

{:onyx/name :write-datoms
 :onyx/plugin :onyx.plugin.datomic/write-datoms
 :onyx/type :output
 :onyx/medium :datomic
 :datomic/uri db-uri
 :datomic/partition :my.database/optional-partition-name
 :onyx/batch-size batch-size
 :onyx/doc "Transacts segments to storage"}

Lifecycle entry:

{:lifecycle/task :write-datoms
 :lifecycle/calls :onyx.plugin.datomic/write-tx-calls}
Attributes
key type description
:datomic/uri string The URI of the datomic database to connect to
:datomic/partition keyword Optional keyword. When supplied, :db/id tempids are added using this partition.
commit-bulk-tx (asynchronous)

Writes transactions via the :tx segment key to a Datomic database. The value of :tx should be as if it were ready for (d/transact uri tx). This lets you perform retractions and arbitrary db functions. tx-data returned by datomic.api/transact is injected into the pipeline event map under :datomic/written. Takes advantage of the Datomic transactor's ability to pipeline transactions by asynchronously transacting :onyx/batch-size transactions at once. Transaction futures are then derefed one by one after. Parallelism can thus be controlled by modifying the batch size appropriately. This is the recommended way to transact in bulk.

Catalog entry:

{:onyx/name :write-bulk-datoms-async
 :onyx/plugin :onyx.plugin.datomic/write-bulk-datoms-async
 :onyx/type :output
 :onyx/medium :datomic
 :datomic/uri db-uri
 :datomic/partition :my.database/partition
 :onyx/batch-size batch-size
 :onyx/doc "Transacts segments to storage"}

An example value of :tx would look like the following:

(require '[datomic.api :as d])

{:tx [[:db/add (d/tempid :db.part/user) :db/doc "Hello world"]]}

Lifecycle entry:

{:lifecycle/task :write-bulk-datoms-async
 :lifecycle/calls :onyx.plugin.datomic/write-bulk-tx-async-calls}
Attributes
key type description
:datomic/uri string The URI of the datomic database to connect to
:datomic/partition keyword Optional keyword. When supplied, :db/id tempids are added using this partition.
commit-bulk-tx (synchronous)

Exactly the same as commit-bulk-tx (asynchronous), but transacts each tx completely (blocking on the returned future) before proceeding to the next. You should generally prefer the async version.

Catalog entry:

{:onyx/name :write-bulk-datoms
 :onyx/plugin :onyx.plugin.datomic/write-bulk-datoms
 :onyx/type :output
 :onyx/medium :datomic
 :datomic/uri db-uri
 :datomic/partition :my.database/partition
 :onyx/batch-size batch-size
 :onyx/doc "Transacts segments to storage"}

An example value of :tx would look like the following:

(require '[datomic.api :as d])

{:tx [[:db/add (d/tempid :db.part/user) :db/doc "Hello world"]]}

Lifecycle entry:

{:lifecycle/task :write-bulk-datoms
 :lifecycle/calls :onyx.plugin.datomic/write-bulk-tx-calls}
Attributes
key type description
:datomic/uri string The URI of the datomic database to connect to
:datomic/partition keyword Optional keyword. When supplied, :db/id tempids are added using this partition.
Datomic Params Injection via Lifecycles

The datomic params lifecycles inject datomic dbs or conns into :onyx/fn params.

inject-conn

Injects a datomic conn into the event map. Will also inject as an :onyx/fn param if :onyx/param? is true.

{:lifecycle/task :use-conn-task
 :lifecycle/calls :onyx.plugin.datomic/inject-conn-calls
 :datomic/uri db-uri
 :onyx/param? true
 :lifecycle/doc "Initialises datomic conn as a :onyx.core/param"}
inject-db

Injects a datomic db into the event map. Will also inject as an :onyx/fn param if :onyx/param? is true.

:datomic/basis-t is optional, and if supplied it calls datomic.api/as-of on the db using :datomic/basis-t.

{:lifecycle/task :use-db-task
 :lifecycle/calls :onyx.plugin.datomic/inject-db-calls
 :datomic/uri db-uri
 :datomic/basis-t optional-basis-t
 :onyx/param? true
 :lifecycle/doc "Initialises datomic db as a :onyx.core/param"}

Contributing

Pull requests into the master branch are welcomed.

License

Copyright © 2015 Michael Drogalis

Distributed under the Eclipse Public License, the same as Clojure.