Skip to content

Commit

Permalink
Merge pull request #257 from Antonelli712/joa/single-use-id
Browse files Browse the repository at this point in the history
Fixes #208 Remove ID from put and cas outside of doc
  • Loading branch information
jonpither committed Jun 14, 2019
2 parents 8b3a1a1 + 1174040 commit bdd8986
Show file tree
Hide file tree
Showing 22 changed files with 126 additions and 117 deletions.
2 changes: 0 additions & 2 deletions bench/src/crux_bench/watdiv.clj
Expand Up @@ -402,7 +402,6 @@
(crux/submit-tx
crux
[[:crux.tx/put
::watdiv-ingestion-status
{:crux.db/id ::watdiv-ingestion-status :done? false}]])]
(crux/db crux tx-time tx-time) ;; block until indexed
(crux/db
Expand All @@ -411,7 +410,6 @@
(crux/submit-tx
crux
[[:crux.tx/put
::watdiv-ingestion-status
{:crux.db/id ::watdiv-ingestion-status
:watdiv/ingest-start-time time-before
:watdiv/kafka-ingest-time (- (.getTime kafka-ingest-done) (.getTime time-before))
Expand Down
8 changes: 4 additions & 4 deletions dev/patrik.clj
Expand Up @@ -166,13 +166,13 @@

(crux.api/submit-tx
crux
[[:crux.tx/put :a1
[[:crux.tx/put
{:crux.db/id :a1 :user/name "patrik" :user/post 1 :post/cost 30}]
[:crux.tx/put :a2
[:crux.tx/put
{:crux.db/id :a2 :user/name "patrik" :user/post 2 :post/cost 35}]
[:crux.tx/put :a3
[:crux.tx/put
{:crux.db/id :a3 :user/name "patrik" :user/post 3 :post/cost 5}]
[:crux.tx/put :a4
[:crux.tx/put
{:crux.db/id :a4 :user/name "niclas" :user/post 1 :post/cost 8}]])

(clojure.pprint/pprint
Expand Down
2 changes: 1 addition & 1 deletion docs/crux_node_only_system.clj
Expand Up @@ -37,7 +37,7 @@

(db/submit-tx
(:tx-log (:crux/cluster-node system))
[[:crux.tx/put :dbpedia.resource/Pablo-Picasso
[[:crux.tx/put
{:crux.db/id :dbpedia.resource/Pablo-Picasso
:name "Pablo"
:last-name "Picasso"}
Expand Down
4 changes: 2 additions & 2 deletions docs/examples.clj
Expand Up @@ -28,8 +28,8 @@
;; tag::submit-tx[]
(crux/submit-tx
system
[[:crux.tx/put :dbpedia.resource/Pablo-Picasso ; id for Kafka
{:crux.db/id :dbpedia.resource/Pablo-Picasso ; id for Crux
[[:crux.tx/put
{:crux.db/id :dbpedia.resource/Pablo-Picasso ; id
:name "Pablo"
:last-name "Picasso"}
#inst "2018-05-18T09:20:27.966-00:00"]]) ; valid time
Expand Down
3 changes: 0 additions & 3 deletions docs/get_started.adoc
Expand Up @@ -39,9 +39,6 @@ experiment with the <<rest.adoc#,REST API>>.
----
include::./examples.clj[tags=submit-tx]
----
Note that the ID for the Kafka transaction and the ID within the Crux document
must be the same. Restating the ID within the Crux document is currently necessary
although this may change in the future, as the standard API evolves.

== Querying

Expand Down
5 changes: 2 additions & 3 deletions docs/patterns.adoc
Expand Up @@ -36,7 +36,6 @@ link to it.
txs# (mapv
(fn [old-ent# ent#]
[:crux.tx/cas
(:crux.db/id old-ent#)
old-ent#
ent#])
entities#
Expand Down Expand Up @@ -83,7 +82,7 @@ link to it.
[entity-id new-attrs valid-time]
(let [entity-prev-value (crux/entity (crux/db system) entity-id)]
(crux/submit-tx system
[[:crux.tx/put entity-id
[[:crux.tx/put
(merge entity-prev-value new-attrs)
valid-time]])))

Expand Down Expand Up @@ -160,7 +159,7 @@ link to it.
:delete :crux.tx/delete-op
:cas :crux.tx/cas-op
:evict :crux.tx/evict-op)
[:crux.tx/cas #uuid "6f0232d0-f3f9-4020-a75f-17b067f41203"
[:crux.tx/cas
{:crux.db/id #uuid "6f0232d0-f3f9-4020-a75f-17b067f41203"
:name "John Wayne"
:username "jwa"}
Expand Down
6 changes: 3 additions & 3 deletions docs/rest.adoc
Expand Up @@ -238,7 +238,7 @@ curl -X GET $nodeURL/tx-log
----
({:crux.tx/tx-time #inst "2019-01-07T15:11:13.411-00:00",
:crux.api/tx-ops [[
:crux.tx/put "a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e" "c28f6d258397651106b7cb24bb0d3be234dc8bd1"
:crux.tx/put "c28f6d258397651106b7cb24bb0d3be234dc8bd1"
#inst "2019-01-07T14:57:08.462-00:00"]],
:crux.tx/tx-id 0}
Expand All @@ -255,8 +255,8 @@ Takes a vector of transactions (any combination of `:put`, `:delete`, `:cas` and
----
curl -X POST \
-H "Content-Type: application/edn" \
-d '[[:crux.tx/put :ivan {:crux.db/id :ivan, :name "Ivan" :last-name "Petrov"}],
[:crux.tx/put :boris {:crux.db/id :boris, :name "Boris" :last-name "Petrov"}],
-d '[[:crux.tx/put {:crux.db/id :ivan, :name "Ivan" :last-name "Petrov"}],
[:crux.tx/put {:crux.db/id :boris, :name "Boris" :last-name "Petrov"}],
[:crux.tx/delete :maria #inst "2012-05-07T14:57:08.462-00:00"]]' \
$nodeURL/tx-log
----
Expand Down
26 changes: 11 additions & 15 deletions docs/transactions.adoc
Expand Up @@ -28,7 +28,7 @@ sequence of transaction operations:

[source,clj]
----
[[:crux.tx/put :dbpedia.resource/Pablo-Picasso
[[:crux.tx/put
{:crux.db/id :dbpedia.resource/Pablo-Picasso
:name "Pablo"
:last-name "Picasso"}
Expand Down Expand Up @@ -68,15 +68,13 @@ the supplied `valid time`.
[source,clojure]
----
[:crux.tx/put
:dbpedia.resource/Pablo-Picasso <1>
{:crux.db/id :dbpedia.resource/Pablo-Picasso :first-name :Pablo} <2>
#inst "2018-05-18T09:20:27.966-00:00"] <3>
{:crux.db/id :dbpedia.resource/Pablo-Picasso :first-name :Pablo} <1>
#inst "2018-05-18T09:20:27.966-00:00"] <2>
----

<1> The ID of the document to be used for the transaction log.
<2> The document itself. Note that the ID must also be included as part of the
document (this requirement to state the ID twice may change in future).
<3> `valid time`
<1> The document itself. Note that the ID must be included as part of the
document.
<2> `valid time`

Note that `valid time` is optional and defaults to transaction time,
which is taken from the Kafka log.
Expand All @@ -93,16 +91,14 @@ newer one, if the existing document is as expected.
[source,clojure]
----
[:crux.tx/cas
:dbpedia.resource/Pablo-Picasso <1>
{..} <1>
{..} <2>
{..} <3>
#inst "2018-05-18T09:21:31.846-00:00"] <4>
#inst "2018-05-18T09:21:31.846-00:00"] <3>
----

<1> The ID of the document being written
<2> Expected Document
<3> New document
<4> `valid time`
<1> Expected Document
<2> New document
<3> `valid time`

== Delete

Expand Down
5 changes: 2 additions & 3 deletions example/backup-restore/src/example_backup_restore/main.clj
Expand Up @@ -30,10 +30,10 @@

(api/submit-tx
syst
[[:crux.tx/put :id/jeff
[[:crux.tx/put
{:crux.db/id :id/jeff
:person/name "Jeff"}]
[:crux.tx/put :id/lia
[:crux.tx/put
{:crux.db/id :id/lia
:person/name "Lia"}]])

Expand All @@ -44,4 +44,3 @@
(api/document
(api/db syst #inst "2019-02-02"
#inst "2019-04-16T12:35:05.042-00:00")))

2 changes: 1 addition & 1 deletion example/imdb/src/imdb/main.clj
Expand Up @@ -46,7 +46,7 @@
(update doc key str/split #"\,"))
doc
(filter list-columns? (keys doc)))]
[:crux.tx/put doc-id (assoc doc :crux.db/id doc-id)])))))))
[:crux.tx/put (assoc doc :crux.db/id doc-id)])))))))
(log/infof "completed %s" file-path))))]
(doseq [f futures] @f)))

Expand Down
18 changes: 8 additions & 10 deletions example/repl-walkthrough/walkthrough.clj
Expand Up @@ -6,18 +6,18 @@


(def crux-options
{:kv-backend "crux.kv.memdb.MemKv" ; in-memory, see docs for LMDB/RocksDB storage
{:kv-backend "crux.kv.memdb.MemKv" ; in-memory, see docs for LMDB/RocksDB storage
:db-dir "data/db-dir-1"}) ; :db-dir is ignored when using MemKv


(def system (crux/start-standalone-system crux-options))


; transaction containing a `put` operation, optionally specifying a valid time
; transaction containing a `put` operation, optionally specifying a valid time
(crux/submit-tx
system
[[:crux.tx/put :dbpedia.resource/Pablo-Picasso ; id used for the transaction log
{:crux.db/id :dbpedia.resource/Pablo-Picasso ; same id inside the document
[[:crux.tx/put
{:crux.db/id :dbpedia.resource/Pablo-Picasso ; id
:name "Pablo"
:last-name "Picasso"
:location "Spain"}
Expand All @@ -27,13 +27,12 @@
; transaction containing a `cas` (compare-and-swap) operation
(crux/submit-tx
system
[[:crux.tx/cas
:dbpedia.resource/Pablo-Picasso ; id used for the transaction log
{:crux.db/id :dbpedia.resource/Pablo-Picasso ; old version
[[:crux.tx/cas
{:crux.db/id :dbpedia.resource/Pablo-Picasso ; old version
:name "Pablo"
:last-name "Picasso"
:location "Spain"}
{:crux.db/id :dbpedia.resource/Pablo-Picasso ; new version
{:crux.db/id :dbpedia.resource/Pablo-Picasso ; new version
:name "Pablo"
:last-name "Picasso"
:height 1.63
Expand Down Expand Up @@ -77,8 +76,7 @@
; `put` the new version of the document again
(crux/submit-tx
system
[[:crux.tx/put
:dbpedia.resource/Pablo-Picasso
[[:crux.tx/put
{:crux.db/id :dbpedia.resource/Pablo-Picasso
:name "Pablo"
:last-name "Picasso"
Expand Down
Expand Up @@ -575,7 +575,6 @@
(api/submit-tx
crux
[[:crux.tx/put
id
{:crux.db/id id
:message-post/created created
:message-post/name name
Expand Down Expand Up @@ -614,7 +613,6 @@
(api/submit-tx
crux
[[:crux.tx/put
id
{:crux.db/id id
:message-post/created (instant/read-instant-date created)
:message-post/edited now
Expand Down
2 changes: 0 additions & 2 deletions src/crux/api.clj
Expand Up @@ -16,7 +16,6 @@
(defmulti tx-op first)

(defmethod tx-op :crux.tx/put [_] (s/cat :op #{:crux.tx/put}
:id :crux.db/id
:doc ::doc
:start-valid-time (s/? date?)
:end-valid-time (s/? date?)))
Expand All @@ -27,7 +26,6 @@
:end-valid-time (s/? date?)))

(defmethod tx-op :crux.tx/cas [_] (s/cat :op #{:crux.tx/cas}
:id :crux.db/id
:old-doc (s/nilable ::doc)
:new-doc ::doc
:at-valid-time (s/? date?)))
Expand Down
2 changes: 1 addition & 1 deletion src/crux/codec.clj
Expand Up @@ -32,7 +32,7 @@

; how they work
(comment
(api/submit-tx syst [:crux.tx/put :ids/ivan {:crux.db/id :ids/ivan :name "ivan"}])
(api/submit-tx syst [:crux.tx/put {:crux.db/id :ids/ivan :name "ivan"}])

; [roughly speaking] for queries by attr name and value
; in attribute+value+entity+content-hash-index-id
Expand Down
2 changes: 1 addition & 1 deletion src/crux/rdf.clj
Expand Up @@ -219,7 +219,7 @@
(when (zero? (long (mod n *ntriples-log-size*)))
(log/debug "submitted" n))
(let [tx-ops (vec (for [entity entities]
[:crux.tx/put (:crux.db/id entity) entity]))]
[:crux.tx/put entity]))]
(db/submit-tx tx-log tx-ops))
(+ n (count entities)))
0)))
Expand Down
31 changes: 23 additions & 8 deletions src/crux/tx.clj
Expand Up @@ -176,22 +176,37 @@
{:crux.index/index-version (idx/current-index-version kv)
:crux.tx-log/consumer-state (db/read-index-meta this :crux.tx-log/consumer-state)}))

(defmulti conform-tx-op first)

(defmethod conform-tx-op ::put [tx-op] (let [[op doc & args] tx-op
id (:crux.db/id doc)]
(into [::put id doc] args)))

(defmethod conform-tx-op ::cas [tx-op] (let [[op old-doc new-doc & args] tx-op
new-id (:crux.db/id new-doc)
old-id (:crux.db/id old-doc)]
(if (or (= nil old-id) (= new-id old-id))
(into [::cas new-id old-doc new-doc] args)
(throw (IllegalArgumentException.
(str "CAS, document id's do not match: " old-id " " new-id))))))

(defmethod conform-tx-op :default [tx-op] tx-op)

(defn tx-ops->docs [tx-ops]
(vec (for [[_ id & args] tx-ops
doc (filter map? args)]
(if (= (c/new-id id) (c/new-id (get doc :crux.db/id)))
doc
(throw (IllegalArgumentException.
(str "Document's id does not match the operation id: " (get doc :crux.db/id) " " id)))))))
(let [conformed-tx-ops (into [] (for [tx-op tx-ops] (conform-tx-op tx-op)))]
(vec (for [[op id & args] conformed-tx-ops
doc (filter map? args)]
doc))))

(defn tx-ops->tx-events [tx-ops]
(let [tx-events (mapv (fn [[op id & args]]
(let [conformed-tx-ops (into [] (for [tx-op tx-ops] (conform-tx-op tx-op)))
tx-events (mapv (fn [[op id & args]]
(into [op (str (c/new-id id))]
(for [arg args]
(if (map? arg)
(-> arg c/new-id str)
arg))))
tx-ops)]
conformed-tx-ops)]
(s/assert :crux.tx.event/tx-events tx-events)
tx-events))

Expand Down

0 comments on commit bdd8986

Please sign in to comment.