Skip to content

Commit

Permalink
Merge pull request #110 from replikativ/91-datomic-migration
Browse files Browse the repository at this point in the history
91 datomic migration
  • Loading branch information
kordano committed May 7, 2020
2 parents 3de63d9 + f160f6f commit 6eeb592
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 214 deletions.
39 changes: 25 additions & 14 deletions dev/sandbox.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,37 @@

(d/delete-database uri)

(def schema [{:db/ident :name
(def schema [{:db/ident :name
:db/cardinality :db.cardinality/one
:db/index true
:db/unique :db.unique/identity
:db/valueType :db.type/string}
{:db/ident :sibling
:db/index true
:db/unique :db.unique/identity
:db/valueType :db.type/string}
{:db/ident :sibling
:db/cardinality :db.cardinality/many
:db/valueType :db.type/ref}
{:db/ident :age
:db/valueType :db.type/ref}
{:db/ident :age
:db/cardinality :db.cardinality/one
:db/valueType :db.type/long}])

:db/valueType :db.type/long}])
(d/create-database uri :initial-tx schema)

(def conn (d/connect uri))

(def result (d/transact conn [{:name "Alice", :age 25}
{:name "Bob", :age 35}
{:name "Charlie", :age 45 :sibling [[:name "Alice"] [:name "Bob"]]}]))
(def result (d/transact conn [{:name "Alice"
:age 25}
{:name "Bob"
:age 35}
{:name "Charlie"
:age 45
:sibling [[:name "Alice"] [:name "Bob"]]}]))

(d/q '[:find ?e ?a ?v ?t :in $ ?a :where [?e :name ?v ?t] [?e :age ?a]] @conn 35)

(d/q '[:find ?e ?v ?t :where [?e :name ?v ?t]] @conn)
(d/q {:query '{:find [?e ?n]
:in [$ ?a]
:where [[?e :name ?n] (not [?e :age ?a])]}
:args [@conn 35]
:limit 1
:offset 0})

)
)
30 changes: 18 additions & 12 deletions src/datahike/api.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@
transact!
dc/transact!)

(def ^{:arglists '([conn tx-data])
:doc "Load entities directly"}
load-entities
dc/load-entities)


(def ^{:arglists '([conn])
:doc "Releases a database connection"}
release dc/release)
Expand Down Expand Up @@ -215,17 +221,17 @@

(defmethod q clojure.lang.PersistentVector
[query & inputs]
(apply dq/q query inputs))
(dq/q {:query query :args inputs}))

(defmethod q clojure.lang.PersistentArrayMap
[query-map & arg-list]
(let [query (if (contains? query-map :query)
(:query query-map)
query-map)
args (if (contains? query-map :args)
(:args query-map)
arg-list)]
(apply dq/q query args)))
[{:keys [query args limit offset] :as query-map} & arg-list]
(let [query (or query query-map)
args (or args arg-list)]
(dq/q {:query query
:args args
:limit limit
:offset offset})))

(defn datoms
"Index lookup. Returns a sequence of datoms (lazy iterator over actual DB index) which components (e, a, v) match passed arguments.
Expand Down Expand Up @@ -471,7 +477,7 @@
[db]
(if (db/-temporal-index? db)
(HistoricalDB. db)
(throw (ex-info "as-of is only allowed on temporal indexed databases." {:config (db/-config db)}))))
(throw (ex-info "history is only allowed on temporal indexed databases." {:config (db/-config db)}))))

(defn- date? [d]
#?(:cljs (instance? js/Date d)
Expand All @@ -482,7 +488,7 @@
[db date]
{:pre [(or (int? date) (date? date))]}
(if (db/-temporal-index? db)
(AsOfDB. db (if (date? date) date (java.util.Date. ^long date)))
(AsOfDB. db date)
(throw (ex-info "as-of is only allowed on temporal indexed databases." {:config (db/-config db)}))))

(defn since
Expand All @@ -491,5 +497,5 @@
[db date]
{:pre [(or (int? date) (date? date))]}
(if (db/-temporal-index? db)
(SinceDB. db (if (date? date) date (java.util.Date. ^long date)))
(SinceDB. db date)
(throw (ex-info "since is only allowed on temporal indexed databases." {:config (db/-config db)}))))
72 changes: 40 additions & 32 deletions src/datahike/connector.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -24,50 +24,58 @@
[connection tx-data]
(transact! connection {:tx-data tx-data}))

(defn update-and-flush-db [connection tx-data update-fn]
(let [{:keys [db-after] :as tx-report} @(update-fn connection tx-data)
{:keys [eavt aevt avet temporal-eavt temporal-aevt temporal-avet schema rschema config max-tx]} db-after
store (:store @connection)
backend (kons/->KonserveBackend store)
eavt-flushed (di/-flush eavt backend)
aevt-flushed (di/-flush aevt backend)
avet-flushed (di/-flush avet backend)
temporal-index? (:temporal-index config)
temporal-eavt-flushed (when temporal-index? (di/-flush temporal-eavt backend))
temporal-aevt-flushed (when temporal-index? (di/-flush temporal-aevt backend))
temporal-avet-flushed (when temporal-index? (di/-flush temporal-avet backend))]
(<?? S (k/assoc-in store [:db]
(merge
{:schema schema
:rschema rschema
:config config
:max-tx max-tx
:eavt-key eavt-flushed
:aevt-key aevt-flushed
:avet-key avet-flushed}
(when temporal-index?
{:temporal-eavt-key temporal-eavt-flushed
:temporal-aevt-key temporal-aevt-flushed
:temporal-avet-key temporal-avet-flushed}))))
(reset! connection (assoc db-after
:eavt eavt-flushed
:aevt aevt-flushed
:avet avet-flushed
:temporal-eavt temporal-eavt-flushed
:temporal-aevt temporal-aevt-flushed
:temporal-avet temporal-avet-flushed))
tx-report))

(defmethod transact! clojure.lang.PersistentArrayMap
[connection {:keys [tx-data]}]
{:pre [(d/conn? connection)]}
(future
(locking connection
(let [{:keys [db-after] :as tx-report} @(d/transact connection tx-data)
{:keys [eavt aevt avet temporal-eavt temporal-aevt temporal-avet schema rschema config max-tx]} db-after
store (:store @connection)
backend (kons/->KonserveBackend store)
eavt-flushed (di/-flush eavt backend)
aevt-flushed (di/-flush aevt backend)
avet-flushed (di/-flush avet backend)
temporal-index? (:temporal-index config)
temporal-eavt-flushed (when temporal-index? (di/-flush temporal-eavt backend))
temporal-aevt-flushed (when temporal-index? (di/-flush temporal-aevt backend))
temporal-avet-flushed (when temporal-index? (di/-flush temporal-avet backend))]
(<?? S (k/assoc-in store [:db]
(merge
{:schema schema
:rschema rschema
:config config
:max-tx max-tx
:eavt-key eavt-flushed
:aevt-key aevt-flushed
:avet-key avet-flushed}
(when temporal-index?
{:temporal-eavt-key temporal-eavt-flushed
:temporal-aevt-key temporal-aevt-flushed
:temporal-avet-key temporal-avet-flushed}))))
(reset! connection (assoc db-after
:eavt eavt-flushed
:aevt aevt-flushed
:avet avet-flushed
:temporal-eavt temporal-eavt-flushed
:temporal-aevt temporal-aevt-flushed
:temporal-avet temporal-avet-flushed))
tx-report))))
(update-and-flush-db connection tx-data d/transact))))

(defn transact [connection tx-data]
(try
(deref (transact! connection tx-data))
(catch Exception e
(throw (.getCause e)))))

(defn load-entities [connection entities]
(future
(locking connection
(update-and-flush-db connection entities d/load-entities))))

(defn release [connection]
(ds/release-store (get-in @connection [:config :storage]) (:store @connection)))

Expand Down

0 comments on commit 6eeb592

Please sign in to comment.