diff --git a/ChangeLog.md b/ChangeLog.md index f672bc4..c312a74 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,13 @@ +## Changes between Welle 2.0.x and 3.0 + +Welle 2.0 has [breaking API changes](http://blog.clojurewerkz.org/blog/2014/04/26/major-breaking-public-api-changes-coming-in-our-projects/) in most namespaces. + +### HTTPComponents 4.3 + +Welle now excludes HTTPComponents dependency for Riak client and instead +uses version 4.3 which `clj-http` depends on. + + ## Changes between Welle 1.5.0 and 2.0 Welle 2.0 has **breaking API changes** in `welle.kv` functions. diff --git a/src/clojure/clojurewerkz/welle/buckets.clj b/src/clojure/clojurewerkz/welle/buckets.clj index bf45a66..2df3da6 100644 --- a/src/clojure/clojurewerkz/welle/buckets.clj +++ b/src/clojure/clojurewerkz/welle/buckets.clj @@ -11,7 +11,8 @@ (:refer-clojure :exclude [list]) (:require [clojurewerkz.welle.core :refer :all] [clojurewerkz.welle.conversion :refer :all]) - (:import [com.basho.riak.client IRiakClient IRiakObject] + (:import com.basho.riak.client.raw.RawClient + [com.basho.riak.client IRiakClient IRiakObject] [com.basho.riak.client.bucket Bucket WriteBucket] [com.basho.riak.client.http.response BucketResponse ListBucketsResponse] [com.basho.riak.client.operations StoreObject FetchObject] @@ -26,9 +27,9 @@ (defn fetch "Fetches bucket properties" - [^String bucket-name] + [^RawClient client ^String bucket-name] (merge {:name bucket-name} - (from-bucket-properties (.fetchBucket *riak-client* bucket-name)))) + (from-bucket-properties (.fetchBucket client bucket-name)))) (defn update "Updates bucket properties. @@ -56,15 +57,17 @@ * big-vclock * young-vclock * old-vclock" - [^String bucket-name &{ :keys [allow-siblings last-write-wins n-val ^String backend - small-vclock big-vclock young-vclock old-vclock - r pr w dw pw rw - ^Boolean not-found-ok ^Boolean basic-quorum ^Boolean enable-search - pre-commit-hooks - post-commit-hooks] :as options}] - (.updateBucket *riak-client* bucket-name (to-bucket-properties (or options {}))) - (merge {:name bucket-name} - (from-bucket-properties (.fetchBucket *riak-client* bucket-name)))) + ([^RawClient client ^String bucket-name] + (update client bucket-name {})) + ([^RawClient client ^String bucket-name { :keys [allow-siblings last-write-wins n-val ^String backend + small-vclock big-vclock young-vclock old-vclock + r pr w dw pw rw + ^Boolean not-found-ok ^Boolean basic-quorum ^Boolean enable-search + pre-commit-hooks + post-commit-hooks] :as options}] + (.updateBucket client bucket-name (to-bucket-properties (or options {}))) + (merge {:name bucket-name} + (from-bucket-properties (.fetchBucket client bucket-name))))) (defn ^{:deprecated true} create "The same as update. This name reveals the intent a bit better in some cases. @@ -74,12 +77,12 @@ (defn list "Returns buckets in the cluster as a set" - [] - (set (.listBuckets *riak-client*))) + [^RawClient client] + (set (.listBuckets client))) (defn keys-in "Returns list of keys in the bucket. With any non-trivial number of keys, this is a VERY EXPENSIVE operation and typically should be avoided" - [^String bucket-name] - (.listKeys *riak-client* bucket-name)) + [^RawClient client ^String bucket-name] + (.listKeys client bucket-name)) diff --git a/src/clojure/clojurewerkz/welle/cache.clj b/src/clojure/clojurewerkz/welle/cache.clj index 6fa1bf0..f288bc0 100644 --- a/src/clojure/clojurewerkz/welle/cache.clj +++ b/src/clojure/clojurewerkz/welle/cache.clj @@ -11,7 +11,8 @@ "clojure.core.cache implementation(s) on top of Riak." (:require [clojurewerkz.welle.kv :as kv] [clojure.core.cache :as cache]) - (:import clojure.core.cache.CacheProtocol + (:import com.basho.riak.client.raw.RawClient + clojure.core.cache.CacheProtocol com.basho.riak.client.http.util.Constants)) ;; @@ -27,31 +28,31 @@ ;; API ;; -(cache/defcache BasicWelleCache [^String bucket ^String content-type ^Integer w] +(cache/defcache BasicWelleCache [^RawClient client ^String bucket ^String content-type ^Integer w] cache/CacheProtocol (lookup [c k] - (get-in (kv/fetch-one (.bucket c) k) [:result :value])) + (get-in (kv/fetch-one (.client c) (.bucket c) k) [:result :value])) (has? [c k] - (:has-value? (kv/fetch (.bucket c) k :head-only true))) + (:has-value? (kv/fetch (.client c) (.bucket c) k {:head-only true}))) (hit [this k] this) (miss [c k v] - (kv/store (.bucket c) k v :content-type (.content-type c) :w (.w c)) + (kv/store (.client c) (.bucket c) k v {:content-type (.content-type c) :w (.w c)}) c) (evict [c k] - (kv/delete (.bucket c) k :w (.w c)) + (kv/delete (.client c) (.bucket c) k {:w (.w c)}) c) (seed [c m] (doseq [[k v] m] - (kv/store (.bucket c) k v :content-type (.content-type c) :w (.w c))) + (kv/store (.client c) (.bucket c) k v {:content-type (.content-type c) :w (.w c)})) c)) (defn basic-welle-cache-factory - ([] - (BasicWelleCache. default-cache-bucket default-content-type 1)) - ([^String bucket] - (BasicWelleCache. bucket default-content-type 1)) - ([^String bucket ^String content-type ^Integer w] - (BasicWelleCache. bucket content-type w)) - ([^String bucket base ^String content-type ^Integer w] - (cache/seed (BasicWelleCache. bucket content-type w) base))) + ([^RawClient client] + (BasicWelleCache. client default-cache-bucket default-content-type 1)) + ([^RawClient client ^String bucket] + (BasicWelleCache. client bucket default-content-type 1)) + ([^RawClient client ^String bucket ^String content-type ^Integer w] + (BasicWelleCache. client bucket content-type w)) + ([^RawClient client ^String bucket base ^String content-type ^Integer w] + (cache/seed (BasicWelleCache. client bucket content-type w) base))) diff --git a/src/clojure/clojurewerkz/welle/core.clj b/src/clojure/clojurewerkz/welle/core.clj index 41120f8..d4bd95c 100644 --- a/src/clojure/clojurewerkz/welle/core.clj +++ b/src/clojure/clojurewerkz/welle/core.clj @@ -28,12 +28,10 @@ (def ^{:private true :const true} default-url "http://127.0.0.1:8098/riak") -(def ^:dynamic ^RawClient *riak-client*) - (def ^{:const true} default-cluster-connection-limit 32) -(defn ^clojurewerkz.welle.HTTPClient +(defn ^HTTPClient connect "Creates an HTTP client for a given URL, optionally with a custom client ID. With no arguments, connects to localhost on the default Riak port." @@ -47,18 +45,7 @@ (.setClientId c client-id) c))) -(defn connect! - "Creates an HTTP client for a given URL, and sets the global variable - *riak-client*. All Welle functions which are not passed a client will use - this client by default." - ([] - (alter-var-root (var *riak-client*) (constantly (connect)))) - ([^String url] - (alter-var-root (var *riak-client*) (constantly (connect url)))) - ([^String url ^String client-id] - (alter-var-root (var *riak-client*) (constantly (connect url client-id))))) - -(defn connect-via-pb +(defn ^PBClientAdapter connect-via-pb "Creates a Protocol Buffers client for the given host and port, or, by default, to localhost on the default Riak PB port." ([] @@ -67,15 +54,6 @@ (doto (PBClientAdapter. (com.basho.riak.pbc.RiakClient. host port)) (.generateAndSetClientId)))) -(defn connect-via-pb! - "Creates a Protocol Buffers client for the given host and port, and sets the - global variable *riak-client*. All Welle functions which are not passed a - client will use this client by default." - ([] - (alter-var-root (var *riak-client*) (constantly (connect-via-pb)))) - ([host port] - (alter-var-root (var *riak-client*) (constantly (connect-via-pb host port))))) - (defprotocol HTTPClusterConfigurator (http-cluster-config-from [self])) @@ -93,20 +71,13 @@ (.build)))) res))) -(defn ^com.basho.riak.client.raw.RawClient +(defn ^RawClient connect-to-cluster "Creates an HTTP cluster client." [endpoints] (let [^ClusterConfig cc (http-cluster-config-from endpoints)] (HTTPClusterClient. cc))) -(defn connect-to-cluster! - "Creates an HTTP cluster client, and sets the global variable *riak-client*. - All Welle functions which are not passed a client will use this client by - default." - [endpoints] - (alter-var-root (var *riak-client*) (constantly (connect-to-cluster endpoints)))) - (defprotocol PBClusterConfigurator (pbc-cluster-config-from [self])) @@ -128,7 +99,7 @@ (.build))))) res))) -(defn ^com.basho.riak.client.raw.RawClient +(defn ^PBClusterClient connect-to-cluster-via-pb "Creates a Protocol Buffers cluster client given a sequence of string endpoints." @@ -136,51 +107,28 @@ (let [^ClusterConfig cc (pbc-cluster-config-from endpoints)] (PBClusterClient. cc))) -(defn connect-to-cluster-via-pb! - "Creates a Protocol Buffers cluster client given a sequence of string - endpoints, and sets the global variable *riak-client*. All Welle functions - which are not passed a client will use this client by default." - [endpoints] - (alter-var-root (var *riak-client*) (constantly (connect-to-cluster-via-pb endpoints)))) - - - -(defmacro with-client - "Evaluates body within an implicit do, with the Welle client *riak-client* - bound to the given Riak client." - [client & forms] - `(binding [*riak-client* ~client] - (do ~@forms))) - - (defn ping "Pings a client." - ([] - (.ping *riak-client*)) - ([^RawClient client] - (.ping client))) + [^RawClient client] + (.ping client)) (defn shutdown "Shuts down a client." - ([] - (.shutdown *riak-client*)) - ([^RawClient client] - (.shutdown client))) + [^RawClient client] + (.shutdown client)) (defn get-client-id "The client ID used by a given client." - [] - (.getClientId *riak-client*)) + [^RawClient client] + (.getClientId client)) (defn stats "Returns statistics for a client." - [] - (.stats *riak-client*)) + [^RawClient client] + (.stats client)) (defn get-base-url "Returns base HTTP transport URL (e.g. http://127.0.0.1:8098)" - ([] - (.getBaseUrl ^HTTPClient *riak-client*)) - ([^HTTPClient client] - (.getBaseUrl client))) + [^HTTPClient client] + (.getBaseUrl client)) diff --git a/src/clojure/clojurewerkz/welle/counters.clj b/src/clojure/clojurewerkz/welle/counters.clj index aa6a513..5b0d8da 100644 --- a/src/clojure/clojurewerkz/welle/counters.clj +++ b/src/clojure/clojurewerkz/welle/counters.clj @@ -8,8 +8,7 @@ ;; You must not remove this notice, or any other, from this software. (ns clojurewerkz.welle.counters - (:require [clojurewerkz.welle.core :refer [*riak-client*]] - [clojurewerkz.welle.conversion :refer :all] + (:require [clojurewerkz.welle.conversion :refer :all] [clojurewerkz.welle.kv :refer [default-retrier]]) (:import [com.basho.riak.client.raw StoreMeta FetchMeta DeleteMeta RawClient RiakResponse] [com.basho.riak.client.cap Retrier DefaultRetrier ConflictResolver])) @@ -26,19 +25,21 @@ `:value` (default 1): value to increment by `:timeout`: query timeout " - [^String bucket-name ^String counter-name &{ :keys [w dw pw - ^long value - ^Boolean return-body - ^Integer timeout - ^Retrier retrier] - :or {value 1 - return-body true - retrier default-retrier}}] - (let [^StoreMeta md (to-store-meta w dw pw return-body nil nil timeout) - ^Long value (or value 1) - ^Long result (.attempt retrier ^Callable (fn [] - (.incrementCounter *riak-client* bucket-name counter-name value md)))] - result)) + ([^RawClient client ^String bucket-name ^String counter-name] + (increment-counter client bucket-name counter-name {})) + ([^RawClient client ^String bucket-name ^String counter-name {:keys [w dw pw + ^long value + ^Boolean return-body + ^Integer timeout + ^Retrier retrier] + :or {value 1 + return-body true + retrier default-retrier}}] + (let [^StoreMeta md (to-store-meta w dw pw return-body nil nil timeout) + ^Long value (or value 1) + ^Long result (.attempt retrier ^Callable (fn [] + (.incrementCounter client bucket-name counter-name value md)))] + result))) (defn fetch-counter "Fetches Riak counter. @@ -49,13 +50,15 @@ `:if-modified-vclock`: a vclock instance to use for conditional get. Only supported by Protocol Buffers transport. `:timeout`: query timeout " - [^String bucket-name ^String counter &{:keys [r pr not-found-ok basic-quorum - return-deleted-vclock - if-modified-since if-modified-vclock - ^Retrier retrier - ^Integer timeout] - :or {retrier default-retrier}}] - (let [^FetchMeta md (to-fetch-meta r pr not-found-ok basic-quorum nil nil if-modified-since if-modified-vclock timeout) - ^Long result (.attempt retrier ^Callable (fn [] - (.fetchCounter *riak-client* bucket-name counter md)))] - result)) + ([^RawClient client ^String bucket-name ^String counter] + (fetch-counter client bucket-name counter {})) + ([^RawClient client ^String bucket-name ^String counter {:keys [r pr not-found-ok basic-quorum + return-deleted-vclock + if-modified-since if-modified-vclock + ^Retrier retrier + ^Integer timeout] + :or {retrier default-retrier}}] + (let [^FetchMeta md (to-fetch-meta r pr not-found-ok basic-quorum nil nil if-modified-since if-modified-vclock timeout) + ^Long result (.attempt retrier ^Callable (fn [] + (.fetchCounter client bucket-name counter md)))] + result))) diff --git a/src/clojure/clojurewerkz/welle/kv.clj b/src/clojure/clojurewerkz/welle/kv.clj index 65506ed..2be9703 100644 --- a/src/clojure/clojurewerkz/welle/kv.clj +++ b/src/clojure/clojurewerkz/welle/kv.clj @@ -5,10 +5,10 @@ [clojure.walk :refer [stringify-keys]]) (:import [com.basho.riak.client IRiakClient IRiakObject] [com.basho.riak.client.raw StoreMeta - FetchMeta - DeleteMeta - RawClient - RiakResponse] + FetchMeta + DeleteMeta + RawClient + RiakResponse] com.basho.riak.client.http.util.Constants [com.basho.riak.client.raw RiakResponse] [com.basho.riak.client.cap Retrier DefaultRetrier ConflictResolver] @@ -29,55 +29,47 @@ ;; (defn store - "Stores an object in Riak. Example: - - (kv/store bucket \"users\" \"mk\" - {:name \"Michael Klishnin\" - :langs [\"Clojure\" \"Node.js\"]} - :content-type \"application/clojure\") - - => ()" - [^String bucket-name - ^String key - value - & {:keys [w dw pw indexes links vclock ^String vtag ^Date last-modified - ^Boolean return-body ^Boolean if-none-match ^Boolean - if-not-modified ^Integer timeout content-type metadata ^Retrier - retrier ^ConflictResolver resolver] - :or {content-type Constants/CTYPE_OCTET_STREAM - metadata {} - retrier default-retrier}}] - - (let [v (serialize value content-type) - ^StoreMeta md (to-store-meta w dw pw return-body if-none-match - if-not-modified timeout) - ^IRiakObject ro (to-riak-object - {:bucket bucket-name - :key key - :value v - :content-type content-type - :metadata (stringify-keys metadata) - :indexes indexes - :links links - :vclock vclock - :vtag vtag - :last-modified last-modified}) - ;; implements Iterable. MK. - ^RiakResponse xs (.attempt retrier - ^Callable #(.store *riak-client* ro md)) - mf (if return-body - (comp deserialize-value from-riak-object) - from-riak-object) - ys (map mf xs) - result (if resolver - (.resolve resolver ys) - ys)] - {:vclock (.getVclock xs) - :has-siblings? (.hasSiblings xs) - :has-value? (.hasValue xs) - :deleted? (.isDeleted xs) - :modified? (not (.isUnmodified xs)) - :result result})) + "Stores an object in Riak." + ([^RawClient client ^String bucket-name ^String key value] + (store client bucket-name key value {})) + ([^RawClient client ^String bucket-name ^String key value + {:keys [w dw pw indexes links vclock ^String vtag ^Date last-modified + ^Boolean return-body ^Boolean if-none-match ^Boolean + if-not-modified ^Integer timeout content-type metadata ^Retrier + retrier ^ConflictResolver resolver] + :or {content-type Constants/CTYPE_OCTET_STREAM + metadata {} + retrier default-retrier}}] + (let [v (serialize value content-type) + ^StoreMeta md (to-store-meta w dw pw return-body if-none-match + if-not-modified timeout) + ^IRiakObject ro (to-riak-object + {:bucket bucket-name + :key key + :value v + :content-type content-type + :metadata (stringify-keys metadata) + :indexes indexes + :links links + :vclock vclock + :vtag vtag + :last-modified last-modified}) + ;; implements Iterable. MK. + ^RiakResponse xs (.attempt retrier + ^Callable #(.store client ro md)) + mf (if return-body + (comp deserialize-value from-riak-object) + from-riak-object) + ys (map mf xs) + result (if resolver + (.resolve resolver ys) + ys)] + {:vclock (.getVclock xs) + :has-siblings? (.hasSiblings xs) + :has-value? (.hasValue xs) + :deleted? (.isDeleted xs) + :modified? (not (.isUnmodified xs)) + :result result}))) (declare tombstone?) (defn fetch @@ -112,50 +104,38 @@ metadata, not its value? `:skip-deserialize` (true or false): should the deserialization of the value - be skipped? - - (fetch \"users\" \"mk\") - - => {:metadata {}, - :deleted? false, - :content-type \"application/clojure\", - :vtag \"53PBvB5tCAx6LWmXgviUKy\", - :vclock #, - :indexes {}, - :links (), - :last-modified #inst \"2014-02-07T23:52:09.620-00:00\", - :value {:name \"Michael Klishnin\" - :langs [\"Clojure\" \"Node.js\"]}}" - [^String bucket-name - ^String key - & {:keys [r pr not-found-ok basic-quorum head-only return-deleted-vclock - if-modified-since if-modified-vclock skip-deserialize ^Integer - timeout ^Retrier retrier ^ConflictResolver resolver] - :or {retrier default-retrier}}] - (let [^FetchMeta md (to-fetch-meta r pr not-found-ok basic-quorum - head-only return-deleted-vclock - if-modified-since if-modified-vclock - timeout) - ^RiakResponse res (.attempt retrier - ^Callable #(.fetch *riak-client* - bucket-name key md)) - ;; return-deleted-vclock = we should return tombstones. See - ;; https://github.com/basho/riak-java-client/commit/416a901ff1de8e4eb559db21ac5045078d278e86 for more info. MK. - ros (if return-deleted-vclock - res - (remove #(.isDeleted ^IRiakObject %) res)) - xs (if skip-deserialize - (map from-riak-object ros) - (map (comp deserialize-value from-riak-object) ros)) - result (if resolver - (.resolve resolver xs) - xs)] - {:vclock (.getVclock res) - :has-siblings? (.hasSiblings res) - :has-value? (> (count ros) 0) - :deleted? (.isDeleted res) - :modified? (not (.isUnmodified res)) - :result result})) + be skipped?" + ([^RawClient client ^String bucket-name ^String key] + (fetch client bucket-name key {})) + ([^RawClient client ^String bucket-name ^String key + {:keys [r pr not-found-ok basic-quorum head-only return-deleted-vclock + if-modified-since if-modified-vclock skip-deserialize ^Integer + timeout ^Retrier retrier ^ConflictResolver resolver] + :or {retrier default-retrier}}] + (let [^FetchMeta md (to-fetch-meta r pr not-found-ok basic-quorum + head-only return-deleted-vclock + if-modified-since if-modified-vclock + timeout) + ^RiakResponse res (.attempt retrier + ^Callable #(.fetch client + bucket-name key md)) + ;; return-deleted-vclock = we should return tombstones. See + ;; https://github.com/basho/riak-java-client/commit/416a901ff1de8e4eb559db21ac5045078d278e86 for more info. MK. + ros (if return-deleted-vclock + res + (remove #(.isDeleted ^IRiakObject %) res)) + xs (if skip-deserialize + (map from-riak-object ros) + (map (comp deserialize-value from-riak-object) ros)) + result (if resolver + (.resolve resolver xs) + xs)] + {:vclock (.getVclock res) + :has-siblings? (.hasSiblings res) + :has-value? (> (count ros) 0) + :deleted? (.isDeleted res) + :modified? (not (.isUnmodified res)) + :result result}))) (defn fetch-one "Fetches a single object. If siblings are found, passes them on to the @@ -184,39 +164,30 @@ Takes the same options as clojurewerkz.welle.kv/fetch and clojurewerkz.welle.kv/store. - Returns the same results as clojurewerkz.welle.kv/store. - - (kv/store bucket \"test\" {:age 34} - :content-type \"application/clojure\") - (kv/modify bucket \"test\" #(update-in % [:value :age] inc))" - [^String bucket-name - ^String key - f - & {:keys [r pr not-found-ok basic-quorum head-only return-deleted-vclock - if-modified-since if-modified-vclock skip-deserialize ^Retrier - retrier ^ConflictResolver resolver w dw pw indexes links ^String - vtag ^Date last-modified ^Boolean return-body ^Boolean - if-none-match ^Boolean if-not-modified content-type metadata] - :or {retrier default-retrier}}] + Returns the same results as clojurewerkz.welle.kv/store." + [^RawClient client ^String bucket-name ^String key f + {:keys [r pr not-found-ok basic-quorum head-only return-deleted-vclock + if-modified-since if-modified-vclock skip-deserialize ^Retrier + retrier ^ConflictResolver resolver w dw pw indexes links ^String + vtag ^Date last-modified ^Boolean return-body ^Boolean + if-none-match ^Boolean if-not-modified content-type metadata] + :or {retrier default-retrier}}] (let [{:keys [result vclock] :as m} - (fetch bucket-name - key - :r r - :pr pr - :not-found-ok not-found-ok - :basic-quorum basic-quorum - :head-only head-only - :return-deleted-vclock return-deleted-vclock - :if-modified-since if-modified-since - :if-modified-vclock if-modified-vclock - :skip-deserialize skip-deserialize - :retrier retrier - :resolver resolver) + (fetch client bucket-name key + {:r r + :pr pr + :not-found-ok not-found-ok + :basic-quorum basic-quorum + :head-only head-only + :return-deleted-vclock return-deleted-vclock + :if-modified-since if-modified-since + :if-modified-vclock if-modified-vclock + :skip-deserialize skip-deserialize + :retrier retrier + :resolver resolver}) m' (f result)] - (store bucket-name - key - (:value m') - :w w + (store client bucket-name key (:value m') + {:w w :dw dw :pw pw :indexes (get m' :indexes indexes) @@ -230,7 +201,7 @@ :content-type (get m' :content-type content-type) :metadata (get m' :metadata metadata) :retrier retrier - :resolver resolver))) + :resolver resolver}))) (defn index-query "Performs a secondary index (2i) query. Provided value can be either @@ -238,30 +209,29 @@ value query is performed. In the latter case, a range query is performed. Learn more in Riak's documentation on secondary indexes at http://docs.basho.com/riak/latest/dev/using/2i/" - [^String bucket-name field value] + [^RawClient client ^String bucket-name field value] (.attempt default-retrier - ^Callable (fn [] (.fetchIndex *riak-client* + ^Callable (fn [] (.fetchIndex client (to-index-query value bucket-name field))))) (defn delete "Deletes an object" - ([^String bucket-name ^String key] + ([^RawClient client ^String bucket-name ^String key] (.attempt default-retrier ^Callable (fn [] - (.delete *riak-client* bucket-name key)))) - ([^String bucket-name - ^String key - &{:keys [r pr w dw pw rw vclock timeout ^Retrier retrier] - :or {retrier default-retrier}}] - (.attempt retrier - ^Callable (fn [] - (.delete *riak-client* - bucket-name - key - (to-delete-meta r pr w dw pw rw vclock - timeout)))))) + (.delete client bucket-name key)))) + ([^RawClient client ^String bucket-name ^String key + {:keys [r pr w dw pw rw vclock timeout ^Retrier retrier] + :or {retrier default-retrier}}] + (.attempt retrier + ^Callable (fn [] + (.delete client + bucket-name + key + (to-delete-meta r pr w dw pw rw vclock + timeout)))))) (defn delete-all "Deletes multiple objects. This function relies on clojure.core/pmap to @@ -269,23 +239,23 @@ potential race conditions between individual delete operations is a problem. For deleting a very large number of keys (say, thousands), consider using map/reduce" - ([^String bucket-name keys] + ([^RawClient client ^String bucket-name keys] (doall (pmap (fn [^String k] - (delete bucket-name k)) + (delete client bucket-name k)) keys))) - ([^String bucket-name keys & rest] + ([^RawClient client ^String bucket-name keys & rest] (doall (pmap (fn [^String k] - (apply delete (concat [bucket-name k] rest))) + (apply delete (concat [client bucket-name k] rest))) keys)))) (defn delete-all-via-2i "Concurrently deletes multiple objects with keys retrieved via a secondary index (2i) query." - ([^String bucket-name field value] - (delete-all bucket-name (set (index-query bucket-name field value)))) - ([^String bucket-name field value & rest] - (let [keys (set (index-query bucket-name field value))] - (apply delete-all (concat [bucket-name keys] rest))))) + ([^RawClient client ^String bucket-name field value] + (delete-all client bucket-name (set (index-query bucket-name field value)))) + ([^RawClient client ^String bucket-name field value & rest] + (let [keys (set (index-query client bucket-name field value))] + (apply delete-all (concat [client bucket-name keys] rest))))) (defn tombstone? "Returns true if a given Riak object is a tombstone (was deleted but not yet diff --git a/src/clojure/clojurewerkz/welle/links.clj b/src/clojure/clojurewerkz/welle/links.clj index 4d3d3c2..e2b1dfa 100644 --- a/src/clojure/clojurewerkz/welle/links.clj +++ b/src/clojure/clojurewerkz/welle/links.clj @@ -8,10 +8,10 @@ ;; You must not remove this notice, or any other, from this software. (ns clojurewerkz.welle.links - (:require [clojurewerkz.welle.conversion :refer :all] - [clojurewerkz.welle.core :refer [*riak-client*]]) + (:require [clojurewerkz.welle.conversion :refer :all]) (:import com.basho.riak.client.raw.query.LinkWalkSpec - java.util.LinkedList)) + java.util.LinkedList + com.basho.riak.client.raw.RawClient)) ;; ;; Implementation @@ -56,9 +56,9 @@ (walk (start-at \"people\" \"peter\") (step \"people\" \"friend\" true))" - [starting-point & steps] + [^RawClient client starting-point & steps] (let [[bucket-name key] starting-point lws (LinkWalkSpec. (to-linked-list steps) bucket-name key) ;; iterable over a collection of linked lists, each of those has IRiakObjects - raw-result (.linkWalk *riak-client* lws)] + raw-result (.linkWalk client lws)] (map map-links (into [] raw-result)))) diff --git a/src/clojure/clojurewerkz/welle/mr.clj b/src/clojure/clojurewerkz/welle/mr.clj index ab73a58..0e076cf 100644 --- a/src/clojure/clojurewerkz/welle/mr.clj +++ b/src/clojure/clojurewerkz/welle/mr.clj @@ -8,9 +8,9 @@ ;; You must not remove this notice, or any other, from this software. (ns clojurewerkz.welle.mr - (:require [cheshire.custom :as json] - [clojurewerkz.welle.core :refer :all]) - (:import com.basho.riak.client.raw.query.MapReduceSpec)) + (:require [cheshire.custom :as json]) + (:import com.basho.riak.client.raw.RawClient + com.basho.riak.client.raw.query.MapReduceSpec)) ;; @@ -19,6 +19,6 @@ (defn map-reduce "Runs a map/reduce query" - [query] - (let [result (.mapReduce *riak-client* (MapReduceSpec. (json/encode query)))] + [^RawClient client query] + (let [result (.mapReduce client (MapReduceSpec. (json/encode query)))] (json/decode (.getResultRaw result) true))) diff --git a/src/clojure/clojurewerkz/welle/ring/session_store.clj b/src/clojure/clojurewerkz/welle/ring/session_store.clj index e6fe512..e311cea 100644 --- a/src/clojure/clojurewerkz/welle/ring/session_store.clj +++ b/src/clojure/clojurewerkz/welle/ring/session_store.clj @@ -11,7 +11,8 @@ (:require [ring.middleware.session.store :as ringstore] [clojurewerkz.welle.kv :as kv]) (:import [java.util UUID Date] - com.basho.riak.client.http.util.Constants)) + com.basho.riak.client.http.util.Constants + com.basho.riak.client.raw.RawClient)) ;; ;; Implementation @@ -32,14 +33,14 @@ ;; API ;; -(defrecord RiakSessionStore [^String bucket-name r w content-type]) +(defrecord RiakSessionStore [^RawClient client ^String bucket-name r w content-type]) (extend-protocol ringstore/SessionStore RiakSessionStore (read-session [store key] (if-let [m (and key - (let [reply (kv/fetch-one (.bucket-name store) key :r (.r store)) + (let [reply (kv/fetch-one (.client store) (.bucket-name store) key {:r (.r store)}) result (:result reply)] (when result (:value result))))] @@ -48,23 +49,23 @@ (write-session [store key data] (let [key (or key (str (UUID/randomUUID)))] - (kv/store (.bucket-name store) key (assoc data :date (Date.)) - :content-type (.content-type store) :w (.w store)) + (kv/store (.client store) (.bucket-name store) key (assoc data :date (Date.)) + {:content-type (.content-type store) :w (.w store)}) key)) (delete-session [store key] - (kv/delete (.bucket-name store) key :w (.w store)) + (kv/delete (.client store) (.bucket-name store) key {:w (.w store)}) nil)) (defn welle-store - ([] - (RiakSessionStore. default-session-store-bucket default-r default-w Constants/CTYPE_JSON_UTF8)) - ([^String bucket-name] - (RiakSessionStore. bucket-name default-r default-w Constants/CTYPE_JSON_UTF8)) - ([^String bucket-name r] - (RiakSessionStore. bucket-name r default-w Constants/CTYPE_JSON_UTF8)) - ([^String bucket-name r w] - (RiakSessionStore. bucket-name r w Constants/CTYPE_JSON_UTF8)) - ([^String bucket-name r w ^String content-type] - (RiakSessionStore. bucket-name r w content-type))) + ([^RawClient client] + (RiakSessionStore. client default-session-store-bucket default-r default-w Constants/CTYPE_JSON_UTF8)) + ([^RawClient client ^String bucket-name] + (RiakSessionStore. client bucket-name default-r default-w Constants/CTYPE_JSON_UTF8)) + ([^RawClient client ^String bucket-name r] + (RiakSessionStore. client bucket-name r default-w Constants/CTYPE_JSON_UTF8)) + ([^RawClient client ^String bucket-name r w] + (RiakSessionStore. client bucket-name r w Constants/CTYPE_JSON_UTF8)) + ([^RawClient client ^String bucket-name r w ^String content-type] + (RiakSessionStore. client bucket-name r w content-type))) diff --git a/src/clojure/clojurewerkz/welle/solr.clj b/src/clojure/clojurewerkz/welle/solr.clj index 53830b3..7024bc1 100644 --- a/src/clojure/clojurewerkz/welle/solr.clj +++ b/src/clojure/clojurewerkz/welle/solr.clj @@ -7,10 +7,10 @@ ;; the terms of this license. ;; You must not remove this notice, or any other, from this software. -(ns ^{:doc "Provides access to Riak Search via the Solr API. +(ns clojurewerkz.welle.solr + "Provides access to Riak Search via the Solr API. - Only HTTP transport is supported."} - clojurewerkz.welle.solr + Only HTTP transport is supported." (:require [clojurewerkz.welle.core :as wc] [clj-http.client :as http] [cheshire.core :as json] @@ -23,24 +23,18 @@ (defn- get-base-solr-url "Returns base Sorl API URL (e.g. http://127.0.0.1:8098/solr)" - ([] - (get-base-solr-url wc/*riak-client*)) - ([^HTTPClient client] - (str (.getBaseUrl client) "/solr"))) + [^HTTPClient client] + (str (.getBaseUrl client) "/solr")) (defn- get-solr-query-url "Returns Sorl query endpoint URL for the given index (e.g. http://127.0.0.1:8098/solr/production_index/select)" - ([^String index] - (get-solr-query-url wc/*riak-client* index)) - ([^HTTPClient client ^String index] - (str (get-base-solr-url wc/*riak-client*) "/" index "/select"))) + [^HTTPClient client ^String index] + (str (get-base-solr-url client) "/" index "/select")) (defn- get-solr-update-url "Returns Sorl update (index, delete, etc) endpoint URL for the given index (e.g. http://127.0.0.1:8098/solr/production_index/update)" - ([^String index] - (get-solr-update-url wc/*riak-client* index)) ([^HTTPClient client ^String index] - (str (get-base-solr-url wc/*riak-client*) "/" index "/update"))) + (str (get-base-solr-url client) "/" index "/update"))) (defn- delete-via-query-body [^String query] @@ -80,40 +74,44 @@ ;; (defn delete-via-query - ([^String query] - (let [url (get-solr-update-url) + ([^HTTPClient client ^String query] + (let [url (get-solr-update-url client) {:keys [body]} (http/post url {:content-type application-xml :body (delete-via-query-body query)})] nil)) - ([^String index ^String query] - (let [url (get-solr-update-url index) + ([^HTTPClient client ^String index ^String query] + (let [url (get-solr-update-url client index) {:keys [body]} (http/post url {:content-type application-xml :body (delete-via-query-body query)})] ;; looks like the response is always empty nil))) (defn index - ([doc] + ([^HTTPClient client doc] (let [url (get-solr-update-url) {:keys [body]} (http/post url {:content-type application-xml :body (index-document-body doc)})] doc)) - ([^String idx doc] + ([^HTTPClient client ^String idx doc] (let [url (get-solr-update-url idx) {:keys [body]} (http/post url {:content-type application-xml :body (index-document-body doc)})] ;; looks like the response is always empty doc))) (defn search - [^String index ^String query & {:as options}] - (let [url (get-solr-query-url index) - qp (merge options {"wt" "json" "q" query}) - {:keys [body]} (http/get url {:query-params qp})] - (json/parse-string body true))) + ([^HTTPClient client ^String index ^String query] + (search client index query {})) + ([^HTTPClient client ^String index ^String query {:as options}] + (let [url (get-solr-query-url client index) + qp (merge options {"wt" "json" "q" query}) + {:keys [body]} (http/get url {:query-params qp})] + (json/parse-string body true)))) (defn search-across-all-indexes - [^String query & {:as options}] - (let [url (get-solr-query-url) - qp (merge options {"wt" "json" "q" query}) - {:keys [body]} (http/get url {:query-params qp})] - (json/parse-string body true))) + ([^HTTPClient client ^String query] + (search-across-all-indexes client query {})) + ([^HTTPClient client ^String query {:as options}] + (let [url (get-solr-query-url client) + qp (merge options {"wt" "json" "q" query}) + {:keys [body]} (http/get url {:query-params qp})] + (json/parse-string body true)))) (defn total-hits [response] diff --git a/src/clojure/clojurewerkz/welle/testkit.clj b/src/clojure/clojurewerkz/welle/testkit.clj index 9e71480..1d98eb4 100644 --- a/src/clojure/clojurewerkz/welle/testkit.clj +++ b/src/clojure/clojurewerkz/welle/testkit.clj @@ -7,11 +7,12 @@ ;; the terms of this license. ;; You must not remove this notice, or any other, from this software. -(ns ^{:doc "Utility functions useful for unit and integration testing of applications - that use Welle"} - clojurewerkz.welle.testkit +(ns clojurewerkz.welle.testkit + "Utility functions useful for unit and integration testing of applications + that use Welle" (:require [clojurewerkz.welle.kv :as kv] - [clojurewerkz.welle.buckets :as wb])) + [clojurewerkz.welle.buckets :as wb]) + (:import com.basho.riak.client.raw.RawClient)) ;; ;; API @@ -21,6 +22,7 @@ "Drains the bucket with the provided name by deleting all the keys in it. For buckets with a large number of keys this may be a very expensive operation because it involves listing keys in the bucket." - [^String bucket-name] - (doseq [k (wb/keys-in bucket-name)] - (kv/delete bucket-name k :w 1))) + [^RawClient client ^String bucket-name] + (doseq [k (wb/keys-in client bucket-name)] + (kv/delete client bucket-name k {:w 1}))) + diff --git a/test/clojurewerkz/welle/test/buckets_test.clj b/test/clojurewerkz/welle/test/buckets_test.clj index fdc867b..a432972 100644 --- a/test/clojurewerkz/welle/test/buckets_test.clj +++ b/test/clojurewerkz/welle/test/buckets_test.clj @@ -1,81 +1,82 @@ (ns clojurewerkz.welle.test.buckets-test - (:require [clojure.test :refer :all] [clojurewerkz.welle.conversion :refer :all]) (:require [clojurewerkz.welle.core :as wc] [clojurewerkz.welle.buckets :as wb] - [clojurewerkz.welle.kv :as kv]) + [clojurewerkz.welle.kv :as kv] + [clojure.test :refer :all] + [clojurewerkz.welle.conversion :refer :all] + [clojurewerkz.welle.test.test-helpers :as th]) (:import [com.basho.riak.client IRiakClient IRiakObject] [com.basho.riak.client.bucket Bucket WriteBucket] [com.basho.riak.client.operations StoreObject FetchObject])) -(wc/connect!) - (defn- has-bucket-props [props] (doseq [prop [:allow-siblings :last-write-wins :r :w :pr :dw :rw :pw :search :n-val :backend :not-found-ok :small-vclock :big-vclock :young-vclock :old-vclock]] (is (contains? props prop)))) -;; -;; buckets/update -;; +(let [c (th/connect)] + ;; + ;; buckets/update + ;; -(deftest test-create-a-new-bucket-with-default-options - (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-1" - bucket-props (wb/update bucket-name)] - (has-bucket-props bucket-props))) + (deftest test-create-a-new-bucket-with-default-options + (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-1" + bucket-props (wb/update c bucket-name)] + (has-bucket-props bucket-props))) -(deftest test-create-a-new-bucket-with-allow-siblings - (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-2" - bucket-props (wb/update bucket-name :allow-siblings true)] - (has-bucket-props bucket-props) - (is (:allow-siblings bucket-props)))) + (deftest test-create-a-new-bucket-with-allow-siblings + (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-2" + bucket-props (wb/update c bucket-name {:allow-siblings true})] + (has-bucket-props bucket-props) + (is (:allow-siblings bucket-props)))) -(deftest test-create-a-new-bucket-with-last-write-wins - (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-3" - bucket-props (wb/update bucket-name :last-write-wins true)] - (has-bucket-props bucket-props) - (is (:last-write-wins bucket-props)))) + (deftest test-create-a-new-bucket-with-last-write-wins + (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-3" + bucket-props (wb/update c bucket-name {:last-write-wins true})] + (has-bucket-props bucket-props) + (is (:last-write-wins bucket-props)))) -(deftest test-create-a-new-bucket-with-explicitly-set-n-val - (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-4" - bucket-props (wb/update bucket-name :n-val 4)] - (has-bucket-props bucket-props) - (is (= 4 (:n-val bucket-props))))) + (deftest test-create-a-new-bucket-with-explicitly-set-n-val + (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-4" + bucket-props (wb/update c bucket-name {:n-val 4})] + (has-bucket-props bucket-props) + (is (= 4 (:n-val bucket-props))))) -(deftest test-create-a-new-bucket-with-explicitly-cap-values - (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-5" - bucket-props (wb/update bucket-name :r 1 :pr 2 :w 3 :dw 4 :pw 5 :rw 6)] - (has-bucket-props bucket-props) - (are [k v] (is (= (to-quorum v) (k bucket-props))) - :r 1 :pr 2 :w 3 :dw 4 :pw 5 :rw 6))) + (deftest test-create-a-new-bucket-with-explicitly-cap-values + (let [bucket-name "clojurewerkz.welle.buckets/create-bucket-5" + bucket-props (wb/update c bucket-name {:r 1 :pr 2 :w 3 :dw 4 :pw 5 :rw 6})] + (has-bucket-props bucket-props) + (are [k v] (is (= (to-quorum v) (k bucket-props))) + :r 1 :pr 2 :w 3 :dw 4 :pw 5 :rw 6))) -;; -;; buckets/list -;; + ;; + ;; buckets/list + ;; -(deftest test-listing-buckets - (wb/update "welle.test.a-bucket" :r 1) - ;; Riak does not seem to actually create a bucket until you store something in it. MK. - (kv/store "welle.test.a-bucket" "key" "value" :content-type "text/plain") - (let [buckets (wb/list)] - (is (set? buckets)) - (is (buckets "welle.test.a-bucket")))) + (deftest test-listing-buckets + (wb/update c "welle.test.a-bucket" {:r 1}) + ;; Riak does not seem to actually create a bucket until you store something in it. MK. + (kv/store c "welle.test.a-bucket" "key" "value" {:content-type "text/plain"}) + (let [buckets (wb/list c)] + (is (set? buckets)) + (is (buckets "welle.test.a-bucket")))) -;; -;; buckets/fetch, buckets/update -;; + ;; + ;; buckets/fetch, buckets/update + ;; -(deftest test-fetching-bucket-properties - (let [bucket-name "welle.test.a-bucket"] - (wb/update bucket-name :r 1 :last-write-wins true) - (kv/store bucket-name "key" "value" :content-type "text/plain") - ;; stricter true/false assertions because of the way properties builder treats nils. MK. - (is (true? (:last-write-wins (wb/fetch bucket-name)))) - (wb/update bucket-name :last-write-wins false) - (is (false? (:last-write-wins (wb/fetch bucket-name)))))) + (deftest test-fetching-bucket-properties + (let [bucket-name "welle.test.a-bucket"] + (wb/update c bucket-name {:r 1 :last-write-wins true}) + (kv/store c bucket-name "key" "value" {:content-type "text/plain"}) + ;; stricter true/false assertions because of the way properties builder treats nils. MK. + (is (true? (:last-write-wins (wb/fetch c bucket-name)))) + (wb/update c bucket-name {:last-write-wins false}) + (is (false? (:last-write-wins (wb/fetch c bucket-name))))))) diff --git a/test/clojurewerkz/welle/test/cache_test.clj b/test/clojurewerkz/welle/test/cache_test.clj index 93d5617..7250302 100644 --- a/test/clojurewerkz/welle/test/cache_test.clj +++ b/test/clojurewerkz/welle/test/cache_test.clj @@ -8,8 +8,6 @@ (:import [clojure.core.cache BasicCache FIFOCache LRUCache TTLCache] java.util.UUID)) -(wc/connect!) - ;; ;; Playground/Tests. These were necessary because clojure.core.cache has ;; little documentation, incomplete test suite and @@ -86,37 +84,39 @@ ;; (def ^{:private true :const true} bucket-name "welle.test.cache_entries") -(use-fixtures :each (fn [f] - (wb/update bucket-name :last-write-wins true :r 1 :w 1) - (f) - (drain bucket-name))) +(let [conn (wc/connect)] + (use-fixtures :each (fn [f] + (wb/update conn bucket-name {:last-write-wins true :r 1 :w 1}) + (f) + (drain conn bucket-name))) -(deftest ^{:cache true} - test-has?-with-basic-welle-cache - (testing "that has? returns false for misses" - (let [c (basic-welle-cache-factory bucket-name)] - (is (not (has? c (str (UUID/randomUUID))))) - (is (not (has? c (str (UUID/randomUUID))))))) - (testing "that has? returns true for hits" - (let [c (basic-welle-cache-factory bucket-name {"a" 1 "b" "cache" "c" 3/4} "application/json" 1)] - (is (has? c "a")) - (is (has? c "b")) - (is (has? c "c")) - (is (not (has? c "d")))))) + (deftest ^{:cache true} + test-has?-with-basic-welle-cache + (testing "that has? returns false for misses" + (let [c (basic-welle-cache-factory conn bucket-name)] + (is (not (has? c (str (UUID/randomUUID))))) + (is (not (has? c (str (UUID/randomUUID))))))) + (testing "that has? returns true for hits" + (let [c (basic-welle-cache-factory conn bucket-name {"a" 1 "b" "cache" "c" 3/4} "application/json" 1)] + (is (has? c "a")) + (is (has? c "b")) + (is (has? c "c")) + (is (not (has? c "d")))))) -(deftest ^{:cache true} - test-lookup-with-basic-welle-cache - (testing "that lookup returns nil for misses" - (let [c (basic-welle-cache-factory bucket-name)] - (are [v] (is (nil? (lookup c v))) - (str (UUID/randomUUID)) - "missing-key" - (str (gensym "missing-key"))))) - (testing "that lookup returns cached values for hits" - (let [l (Long/valueOf 10000) - c (basic-welle-cache-factory bucket-name {"skey" "Value" "lkey" l} "application/json" 1)] - (are [k v] (is (= v (lookup c k))) - "skey" "Value" - "lkey" l )))) + + (deftest ^{:cache true} + test-lookup-with-basic-welle-cache + (testing "that lookup returns nil for misses" + (let [c (basic-welle-cache-factory conn bucket-name)] + (are [v] (is (nil? (lookup c v))) + (str (UUID/randomUUID)) + "missing-key" + (str (gensym "missing-key"))))) + (testing "that lookup returns cached values for hits" + (let [l (Long/valueOf 10000) + c (basic-welle-cache-factory conn bucket-name {"skey" "Value" "lkey" l} "application/json" 1)] + (are [k v] (is (= v (lookup c k))) + "skey" "Value" + "lkey" l ))))) diff --git a/test/clojurewerkz/welle/test/conversion_test.clj b/test/clojurewerkz/welle/test/conversion_test.clj index 88a3558..d9d3756 100644 --- a/test/clojurewerkz/welle/test/conversion_test.clj +++ b/test/clojurewerkz/welle/test/conversion_test.clj @@ -10,8 +10,6 @@ com.basho.riak.client.raw.query.LinkWalkSpec [java.util Date UUID])) -(set! *warn-on-reflection* true) - (defn vclock-for [^String s] (BasicVClock. (CharsetUtils/utf8StringToBytes s))) diff --git a/test/clojurewerkz/welle/test/core_test.clj b/test/clojurewerkz/welle/test/core_test.clj index 4dfb26b..43a7077 100644 --- a/test/clojurewerkz/welle/test/core_test.clj +++ b/test/clojurewerkz/welle/test/core_test.clj @@ -4,73 +4,56 @@ (:require [clojurewerkz.welle.core :as wc] [clojure.test :refer :all])) -(set! *warn-on-reflection* true) - (deftest connect-using-http-client-and-default-host-and-port - (let [^RawClient client (wc/connect)] + (let [c (wc/connect)] (dotimes [x 10] - (.ping client) - (wc/ping client) - (wc/shutdown client)))) - -(deftest connect-using-http-client-default-host-and-port-and-default-client - (wc/connect!) - (dotimes [x 10] - (.ping ^RawClient wc/*riak-client*) - (wc/ping) - (wc/shutdown))) + (.ping c) + (wc/ping c) + (wc/shutdown c)))) (deftest connect-using-clustered-http-client - (let [^RawClient client (wc/connect-to-cluster ["http://127.0.0.1:8098/riak" - "http://localhost:8098/riak"])] + (let [c (wc/connect-to-cluster ["http://127.0.0.1:8098/riak" + "http://localhost:8098/riak"])] (dotimes [x 10] - (.ping client) - (wc/ping client) - (wc/shutdown client)))) + (.ping c) + (wc/ping c) + (wc/shutdown c)))) (deftest connect-using-clustered-http-client-with-port-specified - (let [^RawClient client (wc/connect-to-cluster ["http://127.0.0.1:8098/riak" - "http://localhost:8098/riak"])] + (let [c (wc/connect-to-cluster ["http://127.0.0.1:8098/riak" + "http://localhost:8098/riak"])] (dotimes [x 10] - (.ping client) - (wc/ping client) - (wc/shutdown client)))) + (.ping c) + (wc/ping c) + (wc/shutdown c)))) (deftest connect-using-clustered-http-client-with-config-instance (let [config (doto (HTTPClusterConfig. wc/default-cluster-connection-limit) (.addClient (-> (HTTPClientConfig$Builder.) (.withUrl "http://127.0.0.1:8098/riak") (.build)))) - ^RawClient client (wc/connect-to-cluster config)] + c (wc/connect-to-cluster config)] (dotimes [x 10] - (.ping client) - (wc/ping) - (wc/shutdown)))) - -(deftest connect-using-clustered-http-client-and-default-client - (wc/connect-to-cluster! ["http://127.0.0.1:8098/riak" - "http://localhost:8098/riak"]) - (dotimes [x 10] - (.ping ^RawClient wc/*riak-client*) - (wc/ping) - (wc/shutdown))) + (.ping c) + (wc/ping c) + (wc/shutdown c)))) (deftest connect-using-clustered-http-client-and-default-client-with-port-specified - (wc/connect-to-cluster! ["http://127.0.0.1:8098/riak" - "http://localhost:8098/riak"]) - (dotimes [x 10] - (.ping ^RawClient wc/*riak-client*) - (wc/ping) - (wc/shutdown))) + (let [c (wc/connect-to-cluster ["http://127.0.0.1:8098/riak" + "http://localhost:8098/riak"])] + (dotimes [x 10] + (.ping c) + (wc/ping c) + (wc/shutdown c)))) (deftest connect-using-clustered-http-client-and-default-client-with-config-instance (let [config (doto (HTTPClusterConfig. wc/default-cluster-connection-limit) (.addClient (-> (HTTPClientConfig$Builder.) (.withUrl "http://127.0.0.1:8098/riak") - (.build))))] - (wc/connect-to-cluster! config) + (.build)))) + c (wc/connect-to-cluster config)] (dotimes [x 10] - (.ping ^RawClient wc/*riak-client*) - (wc/ping) - (wc/shutdown)))) + (.ping ^RawClient c) + (wc/ping c) + (wc/shutdown c)))) diff --git a/test/clojurewerkz/welle/test/counters_test.clj b/test/clojurewerkz/welle/test/counters_test.clj index 79b39ec..eb4b9cf 100644 --- a/test/clojurewerkz/welle/test/counters_test.clj +++ b/test/clojurewerkz/welle/test/counters_test.clj @@ -7,19 +7,20 @@ [clojurewerkz.welle.testkit :refer [drain]])) (deftest test-counter - (let [bucket-name "clojurewerkz.welle.kv" + (let [conn (wc/connect) + bucket-name "clojurewerkz.welle.kv" counter "counter1" - bucket (wb/update bucket-name :allow-siblings true) - v1 (cnt/increment-counter bucket-name counter) - v2 (cnt/fetch-counter bucket-name counter) - v3 (cnt/increment-counter bucket-name counter :value 2) - v4 (cnt/fetch-counter bucket-name counter) - v5 (cnt/increment-counter bucket-name counter :value -1) - v6 (cnt/fetch-counter bucket-name counter)] + bucket (wb/update conn bucket-name {:allow-siblings true}) + v1 (cnt/increment-counter conn bucket-name counter) + v2 (cnt/fetch-counter conn bucket-name counter) + v3 (cnt/increment-counter conn bucket-name counter {:value 2}) + v4 (cnt/fetch-counter conn bucket-name counter) + v5 (cnt/increment-counter conn bucket-name counter {:value -1}) + v6 (cnt/fetch-counter conn bucket-name counter)] (is (= 1 v1)) (is (= 1 v2)) (is (= 3 v3)) (is (= 3 v4)) (is (= 2 v5)) (is (= 2 v6)) - (drain bucket-name))) + (drain conn bucket-name))) diff --git a/test/clojurewerkz/welle/test/indices_http_test.clj b/test/clojurewerkz/welle/test/indices_http_test.clj index d56913d..9f56418 100644 --- a/test/clojurewerkz/welle/test/indices_http_test.clj +++ b/test/clojurewerkz/welle/test/indices_http_test.clj @@ -8,84 +8,83 @@ (:import java.util.UUID com.basho.riak.client.http.util.Constants)) -(wc/connect!) - -(deftest ^{:2i true} test-indexes-on-converted-riak-objects +(let [conn (wc/connect)] + (deftest ^{:2i true} test-indexes-on-converted-riak-objects (let [bucket-name "clojurewerkz.welle.test.indices-http-test" - bucket (wb/update bucket-name) + bucket (wb/update conn bucket-name) k (str (UUID/randomUUID)) v "value" indexes {:email #{"john@example.com"} :username #{"johndoe"}} - stored (kv/store bucket-name k v :indexes indexes) - [fetched] (:result (kv/fetch bucket-name k))] + stored (kv/store conn bucket-name k v {:indexes indexes}) + [fetched] (:result (kv/fetch conn bucket-name k))] (is (:indexes fetched)) (is (= indexes (:indexes fetched))) - (kv/delete bucket-name k))) + (kv/delete conn bucket-name k))) (deftest ^{:2i true} test-basic-index-query-with-a-single-string-value (let [bucket-name "clojurewerkz.welle.test.indices-http-test" - bucket (wb/update bucket-name) + bucket (wb/update conn bucket-name) k (str (UUID/randomUUID)) v (.getBytes "value") indexes {:email #{"johndoe@example.com" "timsmith@example.com"}} - stored (kv/store bucket-name k v :indexes indexes :content-type Constants/CTYPE_OCTET_STREAM) - [idx-key] (kv/index-query bucket-name :email "johndoe@example.com") - [fetched] (:result (kv/fetch bucket-name idx-key))] + stored (kv/store conn bucket-name k v {:indexes indexes :content-type Constants/CTYPE_OCTET_STREAM}) + [idx-key] (kv/index-query conn bucket-name :email "johndoe@example.com") + [fetched] (:result (kv/fetch conn bucket-name idx-key))] (is (:indexes fetched)) (is (= (String. ^bytes (:value fetched)) (String. ^bytes v))) (is (= (:indexes fetched) indexes)) - (kv/delete bucket-name k))) + (kv/delete conn bucket-name k))) (deftest ^{:2i true} test-basic-index-query-with-a-range-of-string-values (let [bucket-name "clojurewerkz.welle.test.indices-http-test" - bucket (wb/update bucket-name) + bucket (wb/update conn bucket-name) k1 (str (UUID/randomUUID)) k2 (str (UUID/randomUUID)) k3 (str (UUID/randomUUID)) k4 (str (UUID/randomUUID)) v (.getBytes "value1") - _ (kv/store bucket-name k1 v :indexes {:username #{"abc"}} :content-type Constants/CTYPE_OCTET_STREAM) - _ (kv/store bucket-name k2 v :indexes {:username #{"bcd"}} :content-type Constants/CTYPE_OCTET_STREAM) - _ (kv/store bucket-name k3 v :indexes {:username #{"cde"}} :content-type Constants/CTYPE_OCTET_STREAM) - _ (kv/store bucket-name k4 v :indexes {:username #{"def"}} :content-type Constants/CTYPE_OCTET_STREAM) - keys (set (kv/index-query bucket-name :username ["b" "d"]))] + _ (kv/store conn bucket-name k1 v {:indexes {:username #{"abc"}} :content-type Constants/CTYPE_OCTET_STREAM}) + _ (kv/store conn bucket-name k2 v {:indexes {:username #{"bcd"}} :content-type Constants/CTYPE_OCTET_STREAM}) + _ (kv/store conn bucket-name k3 v {:indexes {:username #{"cde"}} :content-type Constants/CTYPE_OCTET_STREAM}) + _ (kv/store conn bucket-name k4 v {:indexes {:username #{"def"}} :content-type Constants/CTYPE_OCTET_STREAM}) + keys (set (kv/index-query conn bucket-name :username ["b" "d"]))] (is (subset? #{k2 k3} keys)) - (kv/delete-all bucket-name [k1 k2 k3 k4]) - (kv/delete-all bucket-name keys))) + (kv/delete-all conn bucket-name [k1 k2 k3 k4]) + (kv/delete-all conn bucket-name keys))) (deftest ^{:2i true} test-basic-index-query-with-a-single-integer-value (let [bucket-name "clojurewerkz.welle.test.alt-indices-test" - bucket (wb/update bucket-name) + bucket (wb/update conn bucket-name) k (str (UUID/randomUUID)) v "value" indexes {:age 27} - stored (kv/store bucket-name k v :indexes indexes :content-type Constants/CTYPE_TEXT_UTF8) - [idx-key] (kv/index-query bucket-name :age 27) - [fetched] (:result (kv/fetch bucket-name idx-key))] + stored (kv/store conn bucket-name k v {:indexes indexes :content-type Constants/CTYPE_TEXT_UTF8}) + [idx-key] (kv/index-query conn bucket-name :age 27) + [fetched] (:result (kv/fetch conn bucket-name idx-key))] (is (:indexes fetched)) (is (= (:value fetched) v)) (is (= (:indexes fetched) {:age #{27}})) - (kv/delete bucket-name k))) + (kv/delete conn bucket-name k))) (deftest ^{:2i true} test-basic-index-query-with-a-range-of-integer-values (let [bucket-name "clojurewerkz.welle.test.indices-http-test" - bucket (wb/update bucket-name) + bucket (wb/update conn bucket-name) k1 (str (UUID/randomUUID)) k2 (str (UUID/randomUUID)) k3 (str (UUID/randomUUID)) k4 (str (UUID/randomUUID)) v (.getBytes "value1") - _ (kv/store bucket-name k1 v :indexes {:hops #{1 2 3 4}} :content-type Constants/CTYPE_OCTET_STREAM) - _ (kv/store bucket-name k2 v :indexes {:hops #{5 6 7 8}} :content-type Constants/CTYPE_OCTET_STREAM) - _ (kv/store bucket-name k3 v :indexes {:hops #{9 10 11 12}} :content-type Constants/CTYPE_OCTET_STREAM) - _ (kv/store bucket-name k4 v :indexes {:hops #{13 14 18 77}} :content-type Constants/CTYPE_OCTET_STREAM) - keys (set (kv/index-query bucket-name :hops [2 11]))] + _ (kv/store conn bucket-name k1 v {:indexes {:hops #{1 2 3 4}} :content-type Constants/CTYPE_OCTET_STREAM}) + _ (kv/store conn bucket-name k2 v {:indexes {:hops #{5 6 7 8}} :content-type Constants/CTYPE_OCTET_STREAM}) + _ (kv/store conn bucket-name k3 v {:indexes {:hops #{9 10 11 12}} :content-type Constants/CTYPE_OCTET_STREAM}) + _ (kv/store conn bucket-name k4 v {:indexes {:hops #{13 14 18 77}} :content-type Constants/CTYPE_OCTET_STREAM}) + keys (set (kv/index-query conn bucket-name :hops [2 11]))] (is (subset? #{k1 k2 k3} keys)) - (kv/delete-all bucket-name [k1 k2 k3 k4]) - (kv/delete-all bucket-name keys))) + (kv/delete-all conn bucket-name [k1 k2 k3 k4]) + (kv/delete-all conn bucket-name keys)))) diff --git a/test/clojurewerkz/welle/test/indices_pb_test.clj b/test/clojurewerkz/welle/test/indices_pb_test.clj index 4185ed0..a19b065 100644 --- a/test/clojurewerkz/welle/test/indices_pb_test.clj +++ b/test/clojurewerkz/welle/test/indices_pb_test.clj @@ -8,19 +8,18 @@ (:import java.util.UUID com.basho.riak.client.http.util.Constants)) -(wc/connect-via-pb!) - (deftest ^{:2i true :edge-features true} test-basic-index-query-with-a-single-string-value-over-pb - (let [bucket-name "clojurewerkz.welle.test.indices-pb-test" - bucket (wb/update bucket-name) + (let [conn (wc/connect-via-pb) + bucket-name "clojurewerkz.welle.test.indices-pb-test" + bucket (wb/update conn bucket-name) k (str (UUID/randomUUID)) v (.getBytes "value") indexes {:email #{"johndoe@example.com" "timsmith@example.com"}} - stored (kv/store bucket-name k v :indexes indexes :content-type Constants/CTYPE_OCTET_STREAM) - [idx-key] (kv/index-query bucket-name :email "johndoe@example.com") - [fetched] (kv/fetch bucket-name idx-key)] + stored (kv/store conn bucket-name k v {:indexes indexes :content-type Constants/CTYPE_OCTET_STREAM}) + [idx-key] (kv/index-query conn bucket-name :email "johndoe@example.com") + [fetched] (:result (kv/fetch conn bucket-name idx-key))] (is (:indexes fetched)) (is (= (String. ^bytes (:value fetched)) (String. ^bytes v))) (is (= (:indexes fetched) indexes)) - (kv/delete bucket-name k))) + (kv/delete conn bucket-name k))) diff --git a/test/clojurewerkz/welle/test/kv_test.clj b/test/clojurewerkz/welle/test/kv_test.clj index c52cf4a..ef2a184 100644 --- a/test/clojurewerkz/welle/test/kv_test.clj +++ b/test/clojurewerkz/welle/test/kv_test.clj @@ -6,14 +6,13 @@ [cheshire.custom :as json] [clojure.set :as set] [clojure.test :refer :all] - [clojurewerkz.welle.testkit :refer [drain]]) + [clojurewerkz.welle.testkit :refer [drain]] + [clojurewerkz.welle.test.test-helpers :as th]) (:import com.basho.riak.client.http.util.Constants java.util.UUID)) (println (str "Using Clojure version " *clojure-version*)) -(wc/connect!) - (defn- is-riak-object [m] (is (:vclock m)) @@ -25,358 +24,360 @@ (is (:indexes m)) (is (:value m))) -;; -;; Basics -;; - -(deftest test-basic-store-with-all-defaults - (let [bucket-name "clojurewerkz.welle.kv/store-with-all-defaults" - bucket (wb/update bucket-name) - k "store-with-all-defaults" - v "value" - stored (kv/store bucket-name k v) - {:keys [result] :as m} (kv/fetch bucket-name k :r 1) - fetched (first result)] - (is (not (:has-value? stored))) - (is (not (:has-siblings? stored))) - (is (:modified? stored)) - (is (= Constants/CTYPE_OCTET_STREAM (:content-type fetched))) - (is (= {} (:metadata fetched))) - (is (= v (String. ^bytes (:value fetched)))) - (is-riak-object fetched) - (drain bucket-name))) - - -;; -;; Automatic serialization/deserialization for common content types -;; - -(deftest test-basic-store-with-text-utf8-content-type - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k "store-as-utf8-text" - v "value" - stored (kv/store bucket-name k v :content-type Constants/CTYPE_TEXT_UTF8) - {:keys [result] :as m} (kv/fetch bucket-name k) - ;; ex.: {:metadata {}, - ;; :deleted? false, - ;; :content-type "text/plain; charset=UTF-8", - ;; :vtag "\"2IxTVEGoIW8DCRT25rLl83\"", - ;; :vclock #, - ;; :indexes {}, - ;; :links (), - ;; :last-modified #inst "2013-05-22T17:17:30.000-00:00", - ;; :value "value"} - fetched (first result)] - (is (not (:has-value? stored))) - (is (= Constants/CTYPE_TEXT_UTF8 (:content-type fetched))) - (is (= {} (:metadata fetched))) - (is (= v (:value fetched))) - (is-riak-object fetched) - (drain bucket-name))) - - -(deftest test-basic-store-with-json-content-type - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k "store-as-json" - v {:name "Riak" :kind "Data store" :influenced-by #{"Dynamo"}} - stored (kv/store bucket-name k v :content-type Constants/CTYPE_JSON) - {:keys [result] :as m} (kv/fetch bucket-name k) - fetched (first result)] - (is (= Constants/CTYPE_JSON (:content-type fetched))) - (is (= {} (:metadata fetched))) - (is (= {:kind "Data store" :name "Riak" :influenced-by ["Dynamo"]} (:value fetched))) - (is-riak-object fetched) - (drain bucket-name))) - - -(deftest test-basic-store-with-json-utf8-content-type - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k "store-as-utf8-json" - v {:name "Riak" :kind "Data store" :influenced-by #{"Dynamo"}} - stored (kv/store bucket-name k v :content-type Constants/CTYPE_JSON_UTF8) - {:keys [result] :as m} (kv/fetch bucket-name k) - fetched (first result)] - ;; cannot use constant value here see https://github.com/basho/riak-java-client/issues/125 - (is (= "application/json; charset=UTF-8" (:content-type fetched))) - (is (= {:kind "Data store" :name "Riak" :influenced-by ["Dynamo"]} (:value fetched))) - (drain bucket-name))) - - -(deftest test-basic-store-with-jackson-smile-content-type - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - ct "application/jackson-smile" - k "store-as-jackson-smile" - v {:name "Riak" :kind "Data store" :influenced-by #{"Dynamo"}} - stored (kv/store bucket-name k v :content-type ct) - {:keys [result] :as m} (kv/fetch bucket-name k) - fetched (first result)] - (is (= ct (:content-type fetched))) - (is (= {:kind "Data store" :name "Riak" :influenced-by ["Dynamo"]} (:value fetched))) - (drain bucket-name))) - - -(deftest test-basic-store-with-application-clojure-content-type - (let [bucket-name "clojurewerkz.welle.kv2" - bucket (wb/update bucket-name :last-write-wins true) - k "store-as-clojure-data" - v {:city "New York City" :state "NY" :year 2011 :participants #{"johndoe" "timsmith" "michaelblack"} - :venue {:name "Sheraton New York Hotel & Towers" :address "811 Seventh Avenue" :street "Seventh Avenue"}} - ct "application/clojure" - stored (kv/store bucket-name k v :content-type ct) - {:keys [result] :as m} (kv/fetch-one bucket-name k)] - ;; cannot use constant value here see https://github.com/basho/riak-java-client/issues/125 - (is (= ct (:content-type result))) - (is (= v (:value result))) - (drain bucket-name))) - - -(deftest test-basic-store-with-json+gzip-content-type - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k "store-as-gzipped-json" - v {:name "Riak" :kind "Data store" :influenced-by #{"Dynamo"}} - ;; compatible with both HTTP and PB APIs. Content-Encoding would be a better - ;; idea here but PB cannot support it (as of Riak 1.1). MK. - ct "application/json+gzip" - stored (kv/store bucket-name k v :content-type ct) - {:keys [result] :as m} (kv/fetch bucket-name k) - fetched (first result)] - (is (not (:has-value? stored))) - (is (= ct (:content-type fetched))) - (is (= {} (:metadata fetched))) - (is (= {:kind "Data store" :name "Riak" :influenced-by ["Dynamo"]} (:value fetched))) - (is-riak-object fetched) - (drain bucket-name))) - - -(deftest test-basic-store-with-text-utf8-content-type-and-return-body - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k "store-as-utf8-text" - v "value" - {:keys [result]} (kv/store bucket-name k v :content-type Constants/CTYPE_TEXT_UTF8 :return-body true)] - (is (= v (-> result first :value))) - (drain bucket-name))) - - -;; -;; Metadata -;; - -(deftest test-basic-store-with-metadata - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k (str (UUID/randomUUID)) - v "value" - ;; metadata values currently have to be strings. MK. - metadata {:author "Joe" :density "5"} - _ (kv/store bucket-name k v :content-type Constants/CTYPE_TEXT_UTF8 :metadata metadata) - {:keys [result] :as m} (kv/fetch bucket-name k) - fetched (first result)] - (is (= Constants/CTYPE_TEXT_UTF8 (:content-type fetched))) - (is (= {"author" "Joe" "density" "5"} (:metadata fetched))) - (is (= v (:value fetched))) - (is-riak-object fetched) - (drain bucket-name))) - - -;; -;; Links -;; - -(deftest test-basic-store-with-links - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k "store-with-links" - v "value" - links [{:bucket "pages" :key "clojurewerkz.org" :tag "links"}] - _ (kv/store bucket-name k v :content-type Constants/CTYPE_TEXT_UTF8 :links links) - {:keys [result] :as m} (kv/fetch-one bucket-name k)] - (is (= Constants/CTYPE_TEXT_UTF8 (:content-type result))) - (is (= links (:links result))) - (is (= v (:value result))) - (is-riak-object result) - (drain bucket-name))) - - - -;; -;; kv/fetch, kv/fetch-one -;; - -(deftest test-fetching-a-non-existent-object - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - {:keys [result]} (kv/fetch bucket-name (str (UUID/randomUUID)))] - (is (empty? result)))) - -(deftest test-optimistic-fetching-of-a-single-object - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k "optimistic-fetch" - v "value" - _ (kv/store bucket-name k v) - {:keys [result]} (kv/fetch-one bucket-name k :r 1)] - (is (= Constants/CTYPE_OCTET_STREAM (:content-type result))) - (is (= {} (:metadata result))) - (is (= v (String. ^bytes (:value result)))) - (is-riak-object result) - (drain bucket-name))) - - -(deftest test-fetch-one-with-skip-deserialize - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - ct Constants/CTYPE_JSON - k "skip-deserialize-fetch-one" - v {:name "Riak"} - _ (kv/store bucket-name k v :content-type ct) - {:keys [result]} (kv/fetch-one bucket-name k :r 1 :skip-deserialize true)] - (is (= ct (:content-type result))) - (is (= {} (:metadata result))) - (is (= (json/encode v) (String. ^bytes (:value result)))) - (is-riak-object result) - (drain bucket-name))) - -(deftest test-fetch-with-skip-deserialize - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - ct Constants/CTYPE_JSON - k "skip-deserialize-fetch" - v {:name "Riak"} - _ (kv/store bucket-name k v :content-type ct) - {:keys [result]} (kv/fetch bucket-name k :r 1 :skip-deserialize true) - fetched (first result)] - (is (= ct (:content-type fetched))) - (is (= {} (:metadata fetched))) - (is (= (json/encode v) (String. ^bytes (:value fetched)))) - (is-riak-object fetched) - (drain bucket-name))) - - -;; -;; kv/delete -;; - -(deftest test-fetching-deleted-value-with-rw=2 - (let [bucket-name "clojurewerkz.welle.kv3" - bucket (wb/update bucket-name :last-write-wins true) - k "delete-me" - v "another value"] - (drain bucket-name) - (Thread/sleep 150) - (is (not (:has-value? (kv/fetch bucket-name k :r 2)))) - (kv/store bucket-name k v) - (is (:has-value? (kv/fetch bucket-name k))) - (kv/delete bucket-name k :rw 2) - (kv/fetch bucket-name k :r 2) - ;; {:vclock #, - ;; :has-siblings? false, - ;; :has-value? false, - ;; :deleted? false, - ;; :modified? true, - ;; :result ()} - (is (not (:has-value? (kv/fetch bucket-name k :r 2)))))) - - -(deftest test-fetching-deleted-value-with-bucket-settings - (let [bucket-name "clojurewerkz.welle.kv4" - bucket (wb/update bucket-name) - k "delete-me" - v "another value"] - (drain bucket-name) - (Thread/sleep 150) - (is (not (:has-value? (kv/fetch-one bucket-name k :r 2)))) - (kv/store bucket-name k v) - (is (:has-value? (kv/fetch-one bucket-name k))) - (kv/delete bucket-name k :rw 2) - (is (not (:has-value? (kv/fetch-one bucket-name k :r 2)))))) - - -(deftest test-fetching-multiple-deleted-values-with-bucket-settings - (let [bucket-name "clojurewerkz.welle.kv5" - bucket (wb/update bucket-name) - key "delete-me" - value "another value" - key-values (map (fn [i] [(str key i) (str value i)]) (range 10))] - (drain bucket-name) - (Thread/sleep 150) - - (doseq [[k v] key-values] - (is (not (:has-value? (kv/fetch-one bucket-name k)))) - (kv/store bucket-name k v) - (is (:has-value? (kv/fetch-one bucket-name k)))) - (kv/delete-all bucket-name (map first key-values)) - (doseq [[k v] key-values] - (is (not (:has-value? (kv/fetch-one bucket-name k))))))) - -;; -;; kv/modify -;; - -(deftest test-modify-with-json-content-type-and-no-existing-value - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k (str (UUID/randomUUID)) - f (fn [[m]] - (update-in m [:value :influenced-by] set/union #{"Java" "Haskell"})) - updated (kv/modify bucket-name k f :r 1 :w 1 :content-type "application/json") - {:keys [result]} (kv/fetch-one bucket-name k)] - (is (= Constants/CTYPE_JSON (:content-type result))) - (is (= {} (:metadata result))) - (is (= (sort ["Java" "Haskell"]) - (sort (get-in result [:value :influenced-by])))) - (is-riak-object result) - (drain bucket-name))) - - -(deftest test-modify-with-json-content-type-and-one-existing-value - (let [bucket-name "clojurewerkz.welle.kv" - bucket (wb/update bucket-name) - k (str (UUID/randomUUID)) - v {:name "Clojure" :kind "Programming Language" :influenced-by #{"Common Lisp", "C#"}} - stored (kv/store bucket-name k v :content-type Constants/CTYPE_JSON) - f (fn [[m]] - (update-in m [:value :influenced-by] set/union #{"Java" "Haskell"})) - _ (kv/modify bucket-name k f :r 1 :w 1) - {:keys [result]} (kv/fetch-one bucket-name k)] - (is (empty? (:result stored))) - (is (= Constants/CTYPE_JSON (:content-type result))) - (is (= {} (:metadata result))) - (is (= (sort ["C#" "Common Lisp" "Java" "Haskell"]) - (sort (get-in result [:value :influenced-by])))) - (is-riak-object result) - (drain bucket-name))) - -(defn union-resolver - [default] - (conversion/resolver-from - (fn [siblings] - (condp = (count siblings) - 0 [{:value default - :content-type "application/clojure" - :metadata {}}] - 1 siblings - [(-> (first siblings) - (select-keys [:content-type :metadata :links]) - (assoc :value (apply set/union (map :value siblings))))])))) - -(deftest test-modify-vclocks - (let [bucket-name "clojurewerkz.welle.kv.siblings" - bucket (wb/update bucket-name :allow-siblings true) - k (str (UUID/randomUUID)) - append! (fn [x] (kv/modify bucket-name k - (fn [[o]] - (update-in o [:value] conj x)) - :resolver (union-resolver #{}) - :pr 2 - :pw 2)) - adds (doall (map append! (range 10))) - {:keys [result]} (kv/fetch bucket-name k :pr 3)] - ;; There should not be 10 siblings. - (is (< (count result) 4)) - (drain bucket-name))) +(let [conn (th/connect)] + + ;; + ;; Basics + ;; + + (deftest test-basic-store-with-all-defaults + (let [bucket-name "clojurewerkz.welle.kv/store-with-all-defaults" + bucket (wb/update conn bucket-name) + k "store-with-all-defaults" + v "value" + stored (kv/store conn bucket-name k v) + {:keys [result] :as m} (kv/fetch conn bucket-name k {:r 1}) + fetched (first result)] + (is (not (:has-value? stored))) + (is (not (:has-siblings? stored))) + (is (:modified? stored)) + (is (= Constants/CTYPE_OCTET_STREAM (:content-type fetched))) + (is (= {} (:metadata fetched))) + (is (= v (String. ^bytes (:value fetched)))) + (is-riak-object fetched) + (drain conn bucket-name))) + + + ;; + ;; Automatic serialization/deserialization for common content types + ;; + + (deftest test-basic-store-with-text-utf8-content-type + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k "store-as-utf8-text" + v "value" + stored (kv/store conn bucket-name k v {:content-type Constants/CTYPE_TEXT_UTF8}) + {:keys [result] :as m} (kv/fetch conn bucket-name k) + ;; ex.: {:metadata {}, + ;; :deleted? false, + ;; :content-type "text/plain; charset=UTF-8", + ;; :vtag "\"2IxTVEGoIW8DCRT25rLl83\"", + ;; :vclock #, + ;; :indexes {}, + ;; :links (), + ;; :last-modified #inst "2013-05-22T17:17:30.000-00:00", + ;; :value "value"} + fetched (first result)] + (is (not (:has-value? stored))) + (is (= Constants/CTYPE_TEXT_UTF8 (:content-type fetched))) + (is (= {} (:metadata fetched))) + (is (= v (:value fetched))) + (is-riak-object fetched) + (drain conn bucket-name))) + + + (deftest test-basic-store-with-json-content-type + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k "store-as-json" + v {:name "Riak" :kind "Data store" :influenced-by #{"Dynamo"}} + stored (kv/store conn bucket-name k v {:content-type Constants/CTYPE_JSON}) + {:keys [result] :as m} (kv/fetch conn bucket-name k) + fetched (first result)] + (is (= Constants/CTYPE_JSON (:content-type fetched))) + (is (= {} (:metadata fetched))) + (is (= {:kind "Data store" :name "Riak" :influenced-by ["Dynamo"]} (:value fetched))) + (is-riak-object fetched) + (drain conn bucket-name))) + + + (deftest test-basic-store-with-json-utf8-content-type + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k "store-as-utf8-json" + v {:name "Riak" :kind "Data store" :influenced-by #{"Dynamo"}} + stored (kv/store conn bucket-name k v {:content-type Constants/CTYPE_JSON_UTF8}) + {:keys [result] :as m} (kv/fetch conn bucket-name k) + fetched (first result)] + ;; cannot use constant value here see https://github.com/basho/riak-java-client/issues/125 + (is (= "application/json; charset=UTF-8" (:content-type fetched))) + (is (= {:kind "Data store" :name "Riak" :influenced-by ["Dynamo"]} (:value fetched))) + (drain conn bucket-name))) + + + (deftest test-basic-store-with-jackson-smile-content-type + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + ct "application/jackson-smile" + k "store-as-jackson-smile" + v {:name "Riak" :kind "Data store" :influenced-by #{"Dynamo"}} + stored (kv/store conn bucket-name k v {:content-type ct}) + {:keys [result] :as m} (kv/fetch conn bucket-name k) + fetched (first result)] + (is (= ct (:content-type fetched))) + (is (= {:kind "Data store" :name "Riak" :influenced-by ["Dynamo"]} (:value fetched))) + (drain conn bucket-name))) + + + (deftest test-basic-store-with-application-clojure-content-type + (let [bucket-name "clojurewerkz.welle.kv2" + bucket (wb/update conn bucket-name {:last-write-wins true}) + k "store-as-clojure-data" + v {:city "New York City" :state "NY" :year 2011 :participants #{"johndoe" "timsmith" "michaelblack"} + :venue {:name "Sheraton New York Hotel & Towers" :address "811 Seventh Avenue" :street "Seventh Avenue"}} + ct "application/clojure" + stored (kv/store conn bucket-name k v {:content-type ct}) + {:keys [result] :as m} (kv/fetch-one conn bucket-name k)] + ;; cannot use constant value here see https://github.com/basho/riak-java-client/issues/125 + (is (= ct (:content-type result))) + (is (= v (:value result))) + (drain conn bucket-name))) + + + (deftest test-basic-store-with-json+gzip-content-type + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k "store-as-gzipped-json" + v {:name "Riak" :kind "Data store" :influenced-by #{"Dynamo"}} + ;; compatible with both HTTP and PB APIs. Content-Encoding would be a better + ;; idea here but PB cannot support it (as of Riak 1.1). MK. + ct "application/json+gzip" + stored (kv/store conn bucket-name k v {:content-type ct}) + {:keys [result] :as m} (kv/fetch conn bucket-name k) + fetched (first result)] + (is (not (:has-value? stored))) + (is (= ct (:content-type fetched))) + (is (= {} (:metadata fetched))) + (is (= {:kind "Data store" :name "Riak" :influenced-by ["Dynamo"]} (:value fetched))) + (is-riak-object fetched) + (drain conn bucket-name))) + + + (deftest test-basic-store-with-text-utf8-content-type-and-return-body + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k "store-as-utf8-text" + v "value" + {:keys [result]} (kv/store conn bucket-name k v {:content-type Constants/CTYPE_TEXT_UTF8 :return-body true})] + (is (= v (-> result first :value))) + (drain conn bucket-name))) + + + ;; + ;; Metadata + ;; + + (deftest test-basic-store-with-metadata + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k (str (UUID/randomUUID)) + v "value" + ;; metadata values currently have to be strings. MK. + metadata {:author "Joe" :density "5"} + _ (kv/store conn bucket-name k v {:content-type Constants/CTYPE_TEXT_UTF8 :metadata metadata}) + {:keys [result] :as m} (kv/fetch conn bucket-name k) + fetched (first result)] + (is (= Constants/CTYPE_TEXT_UTF8 (:content-type fetched))) + (is (= {"author" "Joe" "density" "5"} (:metadata fetched))) + (is (= v (:value fetched))) + (is-riak-object fetched) + (drain conn bucket-name))) + + + ;; + ;; Links + ;; + + (deftest test-basic-store-with-links + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k "store-with-links" + v "value" + links [{:bucket "pages" :key "clojurewerkz.org" :tag "links"}] + _ (kv/store conn bucket-name k v {:content-type Constants/CTYPE_TEXT_UTF8 :links links}) + {:keys [result] :as m} (kv/fetch-one conn bucket-name k)] + (is (= Constants/CTYPE_TEXT_UTF8 (:content-type result))) + (is (= links (:links result))) + (is (= v (:value result))) + (is-riak-object result) + (drain conn bucket-name))) + + + + ;; + ;; kv/fetch, kv/fetch-one + ;; + + (deftest test-fetching-a-non-existent-object + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + {:keys [result]} (kv/fetch conn bucket-name (str (UUID/randomUUID)))] + (is (empty? result)))) + + (deftest test-optimistic-fetching-of-a-single-object + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k "optimistic-fetch" + v "value" + _ (kv/store conn bucket-name k v) + {:keys [result]} (kv/fetch-one conn bucket-name k {:r 1})] + (is (= Constants/CTYPE_OCTET_STREAM (:content-type result))) + (is (= {} (:metadata result))) + (is (= v (String. ^bytes (:value result)))) + (is-riak-object result) + (drain conn bucket-name))) + + + (deftest test-fetch-one-with-skip-deserialize + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + ct Constants/CTYPE_JSON + k "skip-deserialize-fetch-one" + v {:name "Riak"} + _ (kv/store conn bucket-name k v {:content-type ct}) + {:keys [result]} (kv/fetch-one conn bucket-name k {:r 1 :skip-deserialize true})] + (is (= ct (:content-type result))) + (is (= {} (:metadata result))) + (is (= (json/encode v) (String. ^bytes (:value result)))) + (is-riak-object result) + (drain conn bucket-name))) + + (deftest test-fetch-with-skip-deserialize + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + ct Constants/CTYPE_JSON + k "skip-deserialize-fetch" + v {:name "Riak"} + _ (kv/store conn bucket-name k v {:content-type ct}) + {:keys [result]} (kv/fetch conn bucket-name k {:r 1 :skip-deserialize true}) + fetched (first result)] + (is (= ct (:content-type fetched))) + (is (= {} (:metadata fetched))) + (is (= (json/encode v) (String. ^bytes (:value fetched)))) + (is-riak-object fetched) + (drain conn bucket-name))) + + + ;; + ;; kv/delete + ;; + + (deftest test-fetching-deleted-value-with-rw=2 + (let [bucket-name "clojurewerkz.welle.kv3" + bucket (wb/update conn bucket-name {:last-write-wins true}) + k "delete-me" + v "another value"] + (drain conn bucket-name) + (Thread/sleep 150) + (is (not (:has-value? (kv/fetch conn bucket-name k {:r 2})))) + (kv/store conn bucket-name k v) + (is (:has-value? (kv/fetch conn bucket-name k))) + (kv/delete conn bucket-name k {:rw 2}) + (kv/fetch conn bucket-name k {:r 2}) + ;; {:vclock #, + ;; :has-siblings? false, + ;; :has-value? false, + ;; :deleted? false, + ;; :modified? true, + ;; :result ()} + (is (not (:has-value? (kv/fetch conn bucket-name k {:r 2})))))) + + + (deftest test-fetching-deleted-value-with-bucket-settings + (let [bucket-name "clojurewerkz.welle.kv4" + bucket (wb/update conn bucket-name) + k "delete-me" + v "another value"] + (drain conn bucket-name) + (Thread/sleep 150) + (is (not (:has-value? (kv/fetch-one conn bucket-name k {:r 2})))) + (kv/store conn bucket-name k v) + (is (:has-value? (kv/fetch-one conn bucket-name k))) + (kv/delete conn bucket-name k {:rw 2}) + (is (not (:has-value? (kv/fetch-one conn bucket-name k {:r 2})))))) + + + (deftest test-fetching-multiple-deleted-values-with-bucket-settings + (let [bucket-name "clojurewerkz.welle.kv5" + bucket (wb/update conn bucket-name) + key "delete-me" + value "another value" + key-values (map (fn [i] [(str key i) (str value i)]) (range 10))] + (drain conn bucket-name) + (Thread/sleep 150) + + (doseq [[k v] key-values] + (is (not (:has-value? (kv/fetch-one conn bucket-name k)))) + (kv/store conn bucket-name k v) + (is (:has-value? (kv/fetch-one conn bucket-name k)))) + (kv/delete-all conn bucket-name (map first key-values)) + (doseq [[k v] key-values] + (is (not (:has-value? (kv/fetch-one conn bucket-name k))))))) + + ;; + ;; kv/modify + ;; + + (deftest test-modify-with-json-content-type-and-no-existing-value + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k (str (UUID/randomUUID)) + f (fn [[m]] + (update-in m [:value :influenced-by] set/union #{"Java" "Haskell"})) + updated (kv/modify conn bucket-name k f {:r 1 :w 1 :content-type "application/json"}) + {:keys [result]} (kv/fetch-one conn bucket-name k)] + (is (= Constants/CTYPE_JSON (:content-type result))) + (is (= {} (:metadata result))) + (is (= (sort ["Java" "Haskell"]) + (sort (get-in result [:value :influenced-by])))) + (is-riak-object result) + (drain conn bucket-name))) + + + (deftest test-modify-with-json-content-type-and-one-existing-value + (let [bucket-name "clojurewerkz.welle.kv" + bucket (wb/update conn bucket-name) + k (str (UUID/randomUUID)) + v {:name "Clojure" :kind "Programming Language" :influenced-by #{"Common Lisp", "C#"}} + stored (kv/store conn bucket-name k v {:content-type Constants/CTYPE_JSON}) + f (fn [[m]] + (update-in m [:value :influenced-by] set/union #{"Java" "Haskell"})) + _ (kv/modify conn bucket-name k f {:r 1 :w 1}) + {:keys [result]} (kv/fetch-one conn bucket-name k)] + (is (empty? (:result stored))) + (is (= Constants/CTYPE_JSON (:content-type result))) + (is (= {} (:metadata result))) + (is (= (sort ["C#" "Common Lisp" "Java" "Haskell"]) + (sort (get-in result [:value :influenced-by])))) + (is-riak-object result) + (drain conn bucket-name))) + + (defn union-resolver + [default] + (conversion/resolver-from + (fn [siblings] + (condp = (count siblings) + 0 [{:value default + :content-type "application/clojure" + :metadata {}}] + 1 siblings + [(-> (first siblings) + (select-keys [:content-type :metadata :links]) + (assoc :value (apply set/union (map :value siblings))))])))) + + (deftest test-modify-vclocks + (let [bucket-name "clojurewerkz.welle.kv.siblings" + bucket (wb/update conn bucket-name {:allow-siblings true}) + k (str (UUID/randomUUID)) + append! (fn [x] (kv/modify conn bucket-name k + (fn [[o]] + (update-in o [:value] conj x)) + {:resolver (union-resolver #{}) + :pr 2 + :pw 2})) + adds (doall (map append! (range 10))) + {:keys [result]} (kv/fetch conn bucket-name k {:pr 3})] + ;; There should not be 10 siblings. + (is (< (count result) 4)) + (drain conn bucket-name)))) diff --git a/test/clojurewerkz/welle/test/links_test.clj b/test/clojurewerkz/welle/test/links_test.clj index 53deab3..f6f2f56 100644 --- a/test/clojurewerkz/welle/test/links_test.clj +++ b/test/clojurewerkz/welle/test/links_test.clj @@ -1,26 +1,23 @@ (ns clojurewerkz.welle.test.links-test - (:require [clojurewerkz.welle.core :as wc] - [clojurewerkz.welle.kv :as kv] + (:require [clojurewerkz.welle.kv :as kv] [clojurewerkz.welle.buckets :as wb] [clojurewerkz.welle.mr :as mr] [clojure.test :refer :all] [clojurewerkz.welle.testkit :refer [drain]] + [clojurewerkz.welle.test.test-helpers :as th] [clojurewerkz.welle.links :refer :all])) -(wc/connect!) - - (deftest ^{:links true} test-link-walking-with-a-single-step - (let [bucket-name "people" - _ (wb/update bucket-name)] - (drain bucket-name) - (kv/store bucket-name "joe" {:name "Joe" :age 30} :content-type "application/clojure") - (kv/store bucket-name "jane" {:name "Jane" :age 32} - :content-type "application/clojure" - :links [{:bucket bucket-name :key "joe" :tag "friend"}]) - (let [result (walk - (start-at "people" "jane") - (step "people" "friend" true))] + (let [conn (th/connect) + bucket-name "people" + _ (wb/update conn bucket-name)] + (drain conn bucket-name) + (kv/store conn bucket-name "joe" {:name "Joe" :age 30} {:content-type "application/clojure"}) + (kv/store conn bucket-name "jane" {:name "Jane" :age 32} + {:content-type "application/clojure" + :links [{:bucket bucket-name :key "joe" :tag "friend"}]}) + (let [result (walk conn + (start-at "people" "jane") + (step "people" "friend" true))] (is (= {:name "Joe" :age 30} (:value (ffirst result))))) - (drain bucket-name))) - \ No newline at end of file + (drain conn bucket-name))) diff --git a/test/clojurewerkz/welle/test/mr_test.clj b/test/clojurewerkz/welle/test/mr_test.clj index a604b5b..e0ebc2c 100644 --- a/test/clojurewerkz/welle/test/mr_test.clj +++ b/test/clojurewerkz/welle/test/mr_test.clj @@ -5,96 +5,95 @@ [clojurewerkz.welle.mr :as mr] [clojurewerkz.support.js :as js] [clojure.test :refer :all] - [clojurewerkz.welle.testkit :refer [drain]]) + [clojurewerkz.welle.testkit :refer [drain]] + [clojurewerkz.welle.test.test-helpers :as th]) (:import com.basho.riak.client.http.util.Constants)) -(wc/connect!) +(let [conn (th/connect)] + (deftest ^{:mr true} test-basic-map-reduce-with-erlang-builtins + (let [bucket-name "clojurewerkz.welle.mr1" + bucket (wb/update conn bucket-name {:last-write-wins true}) + store-fn #(kv/store conn bucket-name (str "java_" %) (str %) {:content-type "text/plain"})] + ;; Make sure we supply unordered values + (doseq [l (range 100 200)] (store-fn l)) + (doseq [l (range 0 100)] (store-fn l)) + (let [result (mr/map-reduce conn {:inputs bucket-name + :query [{:map {:language "erlang" + :module "riak_kv_mapreduce" + :function "map_object_value", + :keep false}} + {:reduce {:language "erlang" + :module "riak_kv_mapreduce" + :function "reduce_string_to_integer"}} + {:reduce {:language "erlang" + :module "riak_kv_mapreduce" + :function "reduce_sort"}}]})] + (is (= (vec (range 0 200)) result))) + (drain conn bucket-name))) -(deftest ^{:mr true} test-basic-map-reduce-with-erlang-builtins - (let [bucket-name "clojurewerkz.welle.mr1" - bucket (wb/update bucket-name :last-write-wins true) - store-fn #(kv/store bucket-name (str "java_" %) (str %) :content-type "text/plain")] - ;; Make sure we supply unordered values - (doseq [l (range 100 200)] (store-fn l)) - (doseq [l (range 0 100)] (store-fn l)) - (let [result (mr/map-reduce {:inputs bucket-name - :query [{:map {:language "erlang" - :module "riak_kv_mapreduce" - :function "map_object_value", - :keep false}} - {:reduce {:language "erlang" - :module "riak_kv_mapreduce" - :function "reduce_string_to_integer"}} - {:reduce {:language "erlang" - :module "riak_kv_mapreduce" - :function "reduce_sort"}}]})] - (is (= (vec (range 0 200)) result))) - (drain bucket-name))) + (deftest ^{:mr true} test-basic-map-reduce-with-mixed-builtins + (let [bucket-name "clojurewerkz.welle.mr2" + bucket (wb/update conn bucket-name)] + (doseq [l (range 0 200)] + (kv/store conn bucket-name (str "java_" l) (str l) {:content-type "text/plain"})) + (let [result (mr/map-reduce conn {:inputs bucket-name + :query [{:map {:language "erlang" + :module "riak_kv_mapreduce" + :function "map_object_value", + :keep false}} + {:reduce {:language "erlang" + :module "riak_kv_mapreduce" + :function "reduce_string_to_integer"}} + {:reduce {:language "javascript" + :name "Riak.reduceSum"}}]})] + (is (= [19900] result))) + (drain conn bucket-name))) -(deftest ^{:mr true} test-basic-map-reduce-with-mixed-builtins - (let [bucket-name "clojurewerkz.welle.mr2" - bucket (wb/update bucket-name)] - (doseq [l (range 0 200)] - (kv/store bucket-name (str "java_" l) (str l) :content-type "text/plain")) - (let [result (mr/map-reduce {:inputs bucket-name - :query [{:map {:language "erlang" - :module "riak_kv_mapreduce" - :function "map_object_value", - :keep false}} - {:reduce {:language "erlang" - :module "riak_kv_mapreduce" - :function "reduce_string_to_integer"}} - {:reduce {:language "javascript" - :name "Riak.reduceSum"}}]})] - (is (= [19900] result))) - (drain bucket-name))) + (deftest ^{:mr true} test-map-reduce-with-a-source-js-function + (let [bucket-name "clojurewerkz.welle.mr3" + _ (wb/update conn bucket-name)] + (drain conn bucket-name) + (kv/store conn bucket-name "1" {:state "CA" :quantity 1 :price 199.00} {:content-type Constants/CTYPE_JSON_UTF8}) + (kv/store conn bucket-name "2" {:state "NY" :quantity 2 :price 199.00} {:content-type Constants/CTYPE_JSON_UTF8}) + (kv/store conn bucket-name "3" {:state "NY" :quantity 1 :price 299.00} {:content-type Constants/CTYPE_JSON_UTF8}) + (kv/store conn bucket-name "4" {:state "IL" :quantity 2 :price 11.50 } {:content-type Constants/CTYPE_JSON_UTF8}) + (kv/store conn bucket-name "5" {:state "CA" :quantity 2 :price 2.95 } {:content-type Constants/CTYPE_JSON_UTF8}) + (kv/store conn bucket-name "6" {:state "IL" :quantity 3 :price 5.50 } {:content-type Constants/CTYPE_JSON_UTF8}) + (let [result (mr/map-reduce conn {:inputs bucket-name + :query [{:map {:language "javascript" :source (js/load-resource "js/fn1") :keep false}} + {:reduce {:language "javascript" :name "Riak.reduceSum"}}]})] + (is (= [941.4] result))) + (kv/delete-all conn bucket-name ["1" "2" "3" "4" "5" "6"]))) -(deftest ^{:mr true} test-map-reduce-with-a-source-js-function - (let [bucket-name "clojurewerkz.welle.mr3" - _ (wb/update bucket-name)] - (drain bucket-name) - (kv/store bucket-name "1" {:state "CA" :quantity 1 :price 199.00} :content-type Constants/CTYPE_JSON_UTF8) - (kv/store bucket-name "2" {:state "NY" :quantity 2 :price 199.00} :content-type Constants/CTYPE_JSON_UTF8) - (kv/store bucket-name "3" {:state "NY" :quantity 1 :price 299.00} :content-type Constants/CTYPE_JSON_UTF8) - (kv/store bucket-name "4" {:state "IL" :quantity 2 :price 11.50 } :content-type Constants/CTYPE_JSON_UTF8) - (kv/store bucket-name "5" {:state "CA" :quantity 2 :price 2.95 } :content-type Constants/CTYPE_JSON_UTF8) - (kv/store bucket-name "6" {:state "IL" :quantity 3 :price 5.50 } :content-type Constants/CTYPE_JSON_UTF8) - (let [result (mr/map-reduce {:inputs bucket-name - :query [{:map {:language "javascript" :source (js/load-resource "js/fn1") :keep false}} - {:reduce {:language "javascript" :name "Riak.reduceSum"}}]})] - (is (= [941.4] result))) - (kv/delete-all bucket-name ["1" "2" "3" "4" "5" "6"]))) + ;; this tests needs Riak Search to be enabled + (deftest ^{:mr true :search true} test-map-reduce-with-search-input-and-a-source-js-function + (let [bucket-name "clojurewerkz.welle.mr4" + _ (wb/update conn bucket-name {:enable-search true})] + (drain conn bucket-name) + (kv/store conn bucket-name "1" {:field "one"} {:content-type Constants/CTYPE_JSON}) + (kv/store conn bucket-name "2" {:field "two"} {:content-type Constants/CTYPE_JSON}) + (kv/store conn bucket-name "3" {:field "three"} {:content-type Constants/CTYPE_JSON}) + (kv/store conn bucket-name "4" {:field "four"} {:content-type Constants/CTYPE_JSON}) + (kv/store conn bucket-name "5" {:field "five"} {:content-type Constants/CTYPE_JSON}) + (kv/store conn bucket-name "6" {:field "six"} {:content-type Constants/CTYPE_JSON}) + (let [result (mr/map-reduce conn {:inputs {:bucket bucket-name + :query "field:five"} + :query [{:map {:language "javascript" :source "function(v) { return [v]; }" :keep true}}]})] + (is (= bucket-name (-> result first :bucket))) + (is (= "5" (-> result first :key)))) + (kv/delete-all conn bucket-name ["1" "2" "3" "4" "5" "6"]))) -;; this tests needs Riak Search to be enabled -(deftest ^{:mr true :search true} test-map-reduce-with-search-input-and-a-source-js-function - (let [bucket-name "clojurewerkz.welle.mr4" - _ (wb/update bucket-name :enable-search true)] - (drain bucket-name) - (kv/store bucket-name "1" {:field "one"} :content-type Constants/CTYPE_JSON) - (kv/store bucket-name "2" {:field "two"} :content-type Constants/CTYPE_JSON) - (kv/store bucket-name "3" {:field "three"} :content-type Constants/CTYPE_JSON) - (kv/store bucket-name "4" {:field "four"} :content-type Constants/CTYPE_JSON) - (kv/store bucket-name "5" {:field "five"} :content-type Constants/CTYPE_JSON) - (kv/store bucket-name "6" {:field "six"} :content-type Constants/CTYPE_JSON) - (let [result (mr/map-reduce {:inputs {:bucket bucket-name - :query "field:five"} - :query [{:map {:language "javascript" :source "function(v) { return [v]; }" :keep true}}]})] - (is (= bucket-name (-> result first :bucket))) - (is (= "5" (-> result first :key)))) - (kv/delete-all bucket-name ["1" "2" "3" "4" "5" "6"]))) - -(deftest ^{:mr true} test-map-reduce-with-reduce-slice - (let [bucket-name "clojurewerkz.welle.mr5" - bucket (wb/update bucket-name :last-write-wins true)] - (dotimes [i 20] - (kv/store bucket-name (str i) {:field i} :content-type Constants/CTYPE_JSON)) - (let [result (mr/map-reduce {:inputs bucket-name - :query [ - {:map {:language "javascript" :name "Riak.mapValuesJson"}} - {:reduce {:language "javascript" :name "Riak.reduceSort" :arg "function (a,b) {return parseInt(a.field) > parseInt(b.field)}"}} - {:reduce {:language "javascript", :name "Riak.reduceSlice", :arg [0, 2]}}]})] - (is (= [{:field 0} {:field 1}] result))) - (drain bucket-name))) \ No newline at end of file + (deftest ^{:mr true} test-map-reduce-with-reduce-slice + (let [bucket-name "clojurewerkz.welle.mr5" + bucket (wb/update conn bucket-name {:last-write-wins true})] + (dotimes [i 20] + (kv/store conn bucket-name (str i) {:field i} {:content-type Constants/CTYPE_JSON})) + (let [result (mr/map-reduce conn {:inputs bucket-name + :query [{:map {:language "javascript" :name "Riak.mapValuesJson"}} + {:reduce {:language "javascript" :name "Riak.reduceSort" :arg "function (a,b) {return parseInt(a.field) > parseInt(b.field)}"}} + {:reduce {:language "javascript", :name "Riak.reduceSlice", :arg [0, 2]}}]})] + (is (= [{:field 0} {:field 1}] result))) + (drain conn bucket-name)))) diff --git a/test/clojurewerkz/welle/test/ring/session_store_test.clj b/test/clojurewerkz/welle/test/ring/session_store_test.clj index 462fb28..a9966ca 100644 --- a/test/clojurewerkz/welle/test/ring/session_store_test.clj +++ b/test/clojurewerkz/welle/test/ring/session_store_test.clj @@ -3,30 +3,30 @@ [clojurewerkz.welle.kv :as kv] [clojure.test :refer :all] [clojurewerkz.welle.testkit :refer [drain]] + [clojurewerkz.welle.test.test-helpers :as th] [ring.middleware.session.store :refer :all] [clojurewerkz.welle.ring.session-store :refer :all])) -(wc/connect!) - -(defn purge-sessions +(let [conn (th/connect)] + (defn purge-sessions [f] - (drain "web_sessions") - (drain "sessions") + (drain conn "web_sessions") + (drain conn "sessions") (f) - (drain "web_sessions") - (drain "sessions")) + (drain conn "web_sessions") + (drain conn "sessions")) (use-fixtures :each purge-sessions) (deftest test-reading-a-session-that-does-not-exist - (let [store (welle-store)] + (let [store (welle-store conn)] (is (= {} (read-session store "a-missing-key-1228277"))))) (deftest test-reading-a-session-that-does-exist - (let [store (welle-store) + (let [store (welle-store conn) sk (write-session store nil {:library "Welle"}) m (read-session store sk)] (is sk) @@ -36,7 +36,7 @@ (deftest test-updating-a-session - (let [store (welle-store "sessions") + (let [store (welle-store conn "sessions") sk1 (write-session store nil {:library "Welle"}) sk2 (write-session store sk1 {:library "Ring"}) m (read-session store sk2)] @@ -48,7 +48,7 @@ (deftest test-deleting-a-session - (let [store (welle-store "sessions") + (let [store (welle-store conn "sessions") sk (write-session store nil {:library "Welle"})] (is (nil? (delete-session store sk))) - (is (= {} (read-session store sk))))) + (is (= {} (read-session store sk)))))) diff --git a/test/clojurewerkz/welle/test/search_test.clj b/test/clojurewerkz/welle/test/search_test.clj index 495fe2e..5729b93 100644 --- a/test/clojurewerkz/welle/test/search_test.clj +++ b/test/clojurewerkz/welle/test/search_test.clj @@ -7,19 +7,20 @@ [clojurewerkz.welle.testkit :refer [drain]]) (:import com.basho.riak.client.http.util.Constants)) -(wc/connect!) + (deftest ^{:search true} test-term-query-via-the-solr-api - (let [bucket-name "clojurewerkz.welle.solr.tweets" - bucket (wb/update bucket-name :last-write-wins true :enable-search true) + (let [conn (wc/connect) + bucket-name "clojurewerkz.welle.solr.tweets" + bucket (wb/update conn bucket-name {:last-write-wins true :enable-search true}) doc {:username "clojurewerkz" :text "Elastisch beta3 is out, several more @elasticsearch features supported github.com/clojurewerkz/elastisch, improved docs http://clojureelasticsearch.info #clojure" :timestamp "20120802T101232+0100" :id 1}] - (drain bucket-name) - (kv/store bucket-name "a-key" doc :content-type "application/json") - (let [result (wsolr/search bucket-name "username:clojurewerkz") + (drain conn bucket-name) + (kv/store conn bucket-name "a-key" doc {:content-type "application/json"}) + (let [result (wsolr/search conn bucket-name "username:clojurewerkz") hits (wsolr/hits-from result)] (is (= "a-key" (-> hits first :id))) (is (> (count hits) 0))) - (drain bucket-name))) + (drain conn bucket-name))) diff --git a/test/clojurewerkz/welle/test/test_helpers.clj b/test/clojurewerkz/welle/test/test_helpers.clj new file mode 100644 index 0000000..dda281c --- /dev/null +++ b/test/clojurewerkz/welle/test/test_helpers.clj @@ -0,0 +1,11 @@ +(ns clojurewerkz.welle.test.test-helpers + (:require [clojurewerkz.welle.core :as wc]) + (:import java.util.Random)) + +(defn connect + "Connects using either HTTP or PB transport (randomly chosen)" + [] + (let [rnd (Random.)] + (if (.nextBoolean rnd) + (wc/connect) + (wc/connect-via-pb))))