Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cursors can also implement IReduceInit so that people can transduce over them #1477

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 22 additions & 8 deletions crux-core/src/crux/io.clj
Expand Up @@ -3,22 +3,23 @@
[clojure.java.io :as io]
[clojure.java.shell :as sh]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[clojure.string :as str]
[clojure.tools.logging :as log]
[taoensso.nippy :as nippy])
(:import [crux.api ICursor]
[java.io Closeable DataInputStream DataOutputStream File IOException Reader]
[java.lang AutoCloseable]
[java.lang.ref PhantomReference ReferenceQueue]
(:import clojure.lang.IReduceInit
crux.api.ICursor
[java.io DataInputStream DataOutputStream File IOException Reader]
java.lang.AutoCloseable
[java.lang.management BufferPoolMXBean ManagementFactory]
[java.lang.ref PhantomReference ReferenceQueue]
java.net.ServerSocket
[java.nio.file Files FileVisitResult SimpleFileVisitor]
java.nio.file.attribute.FileAttribute
java.text.SimpleDateFormat
java.time.Duration
[java.util Collections Comparator Date IdentityHashMap Iterator Map PriorityQueue Properties]
[java.util.concurrent ThreadFactory]
java.util.concurrent.locks.StampedLock))
java.util.concurrent.locks.StampedLock
java.util.concurrent.ThreadFactory))

(s/def ::port (s/int-in 1 65536))

Expand Down Expand Up @@ -271,7 +272,20 @@
(.hasNext lazy-seq-iterator))

(close [_]
(close-fn)))
(close-fn))

IReduceInit
(reduce [this f init]
(try
(loop [acc init]
(if (.hasNext this)
(let [res (f acc (.next this))]
(if (reduced? res)
@res
(recur res)))
acc))
(finally
(.close this)))))

(defn ->cursor [close-fn ^Iterable sq]
(->Cursor close-fn (.iterator (lazy-seq sq))))
Expand Down
20 changes: 20 additions & 0 deletions crux-test/test/crux/query_test.clj
Expand Up @@ -3403,6 +3403,26 @@
[e :ticker/market m2]
[m2 :se/currency :currency/usd]]}))))

(t/deftest test-open-q-transducers
(fix/transact! *api* [{:crux.db/id :one, :val 1}
{:crux.db/id :two, :val 2}
{:crux.db/id :three, :val 3}])
(t/is (= #{:one :three}
(->> (api/open-q (api/db *api*) '{:find [id val],
:keys [id val]
:where [[id :val val]]})
(into #{} (comp (filter (comp odd? :val))
(map :id))))))

(t/is (= #{:one}
(->> (api/open-q (api/db *api*) '{:find [id val],
:keys [id val]
:where [[id :val val]]
:order-by [[val :asc]]})
(into #{} (comp (filter (comp odd? :val))
(map :id)
(take 1)))))))

(t/deftest test-order-by-when-not-specified-in-return-418
(fix/transact! *api* [{:crux.db/id :one
:val 1}
Expand Down