Skip to content

Commit

Permalink
Merge pull request #28 from michaelklishin/explicit-client-argument
Browse files Browse the repository at this point in the history
Explicit client argument
  • Loading branch information
michaelklishin committed May 18, 2014
2 parents f0236cd + e2200f8 commit a32adb1
Show file tree
Hide file tree
Showing 24 changed files with 925 additions and 999 deletions.
10 changes: 10 additions & 0 deletions ChangeLog.md
@@ -1,3 +1,13 @@
## Changes between Welle 2.0.x and 3.0

Welle 2.0 has [breaking API changes](http://blog.clojurewerkz.org/blog/2014/04/26/major-breaking-public-api-changes-coming-in-our-projects/) in most namespaces.

### HTTPComponents 4.3

Welle now excludes HTTPComponents dependency for Riak client and instead
uses version 4.3 which `clj-http` depends on.


## Changes between Welle 1.5.0 and 2.0

Welle 2.0 has **breaking API changes** in `welle.kv` functions.
Expand Down
35 changes: 19 additions & 16 deletions src/clojure/clojurewerkz/welle/buckets.clj
Expand Up @@ -11,7 +11,8 @@
(:refer-clojure :exclude [list])
(:require [clojurewerkz.welle.core :refer :all]
[clojurewerkz.welle.conversion :refer :all])
(:import [com.basho.riak.client IRiakClient IRiakObject]
(:import com.basho.riak.client.raw.RawClient
[com.basho.riak.client IRiakClient IRiakObject]
[com.basho.riak.client.bucket Bucket WriteBucket]
[com.basho.riak.client.http.response BucketResponse ListBucketsResponse]
[com.basho.riak.client.operations StoreObject FetchObject]
Expand All @@ -26,9 +27,9 @@

(defn fetch
"Fetches bucket properties"
[^String bucket-name]
[^RawClient client ^String bucket-name]
(merge {:name bucket-name}
(from-bucket-properties (.fetchBucket *riak-client* bucket-name))))
(from-bucket-properties (.fetchBucket client bucket-name))))

(defn update
"Updates bucket properties.
Expand Down Expand Up @@ -56,15 +57,17 @@
* big-vclock
* young-vclock
* old-vclock"
[^String bucket-name &{ :keys [allow-siblings last-write-wins n-val ^String backend
small-vclock big-vclock young-vclock old-vclock
r pr w dw pw rw
^Boolean not-found-ok ^Boolean basic-quorum ^Boolean enable-search
pre-commit-hooks
post-commit-hooks] :as options}]
(.updateBucket *riak-client* bucket-name (to-bucket-properties (or options {})))
(merge {:name bucket-name}
(from-bucket-properties (.fetchBucket *riak-client* bucket-name))))
([^RawClient client ^String bucket-name]
(update client bucket-name {}))
([^RawClient client ^String bucket-name { :keys [allow-siblings last-write-wins n-val ^String backend
small-vclock big-vclock young-vclock old-vclock
r pr w dw pw rw
^Boolean not-found-ok ^Boolean basic-quorum ^Boolean enable-search
pre-commit-hooks
post-commit-hooks] :as options}]
(.updateBucket client bucket-name (to-bucket-properties (or options {})))
(merge {:name bucket-name}
(from-bucket-properties (.fetchBucket client bucket-name)))))

(defn ^{:deprecated true} create
"The same as update. This name reveals the intent a bit better in some cases.
Expand All @@ -74,12 +77,12 @@

(defn list
"Returns buckets in the cluster as a set"
[]
(set (.listBuckets *riak-client*)))
[^RawClient client]
(set (.listBuckets client)))


(defn keys-in
"Returns list of keys in the bucket. With any non-trivial number of keys, this is a VERY EXPENSIVE operation
and typically should be avoided"
[^String bucket-name]
(.listKeys *riak-client* bucket-name))
[^RawClient client ^String bucket-name]
(.listKeys client bucket-name))
31 changes: 16 additions & 15 deletions src/clojure/clojurewerkz/welle/cache.clj
Expand Up @@ -11,7 +11,8 @@
"clojure.core.cache implementation(s) on top of Riak."
(:require [clojurewerkz.welle.kv :as kv]
[clojure.core.cache :as cache])
(:import clojure.core.cache.CacheProtocol
(:import com.basho.riak.client.raw.RawClient
clojure.core.cache.CacheProtocol
com.basho.riak.client.http.util.Constants))

;;
Expand All @@ -27,31 +28,31 @@
;; API
;;

(cache/defcache BasicWelleCache [^String bucket ^String content-type ^Integer w]
(cache/defcache BasicWelleCache [^RawClient client ^String bucket ^String content-type ^Integer w]
cache/CacheProtocol
(lookup [c k]
(get-in (kv/fetch-one (.bucket c) k) [:result :value]))
(get-in (kv/fetch-one (.client c) (.bucket c) k) [:result :value]))
(has? [c k]
(:has-value? (kv/fetch (.bucket c) k :head-only true)))
(:has-value? (kv/fetch (.client c) (.bucket c) k {:head-only true})))
(hit [this k]
this)
(miss [c k v]
(kv/store (.bucket c) k v :content-type (.content-type c) :w (.w c))
(kv/store (.client c) (.bucket c) k v {:content-type (.content-type c) :w (.w c)})
c)
(evict [c k]
(kv/delete (.bucket c) k :w (.w c))
(kv/delete (.client c) (.bucket c) k {:w (.w c)})
c)
(seed [c m]
(doseq [[k v] m]
(kv/store (.bucket c) k v :content-type (.content-type c) :w (.w c)))
(kv/store (.client c) (.bucket c) k v {:content-type (.content-type c) :w (.w c)}))
c))

(defn basic-welle-cache-factory
([]
(BasicWelleCache. default-cache-bucket default-content-type 1))
([^String bucket]
(BasicWelleCache. bucket default-content-type 1))
([^String bucket ^String content-type ^Integer w]
(BasicWelleCache. bucket content-type w))
([^String bucket base ^String content-type ^Integer w]
(cache/seed (BasicWelleCache. bucket content-type w) base)))
([^RawClient client]
(BasicWelleCache. client default-cache-bucket default-content-type 1))
([^RawClient client ^String bucket]
(BasicWelleCache. client bucket default-content-type 1))
([^RawClient client ^String bucket ^String content-type ^Integer w]
(BasicWelleCache. client bucket content-type w))
([^RawClient client ^String bucket base ^String content-type ^Integer w]
(cache/seed (BasicWelleCache. client bucket content-type w) base)))
80 changes: 14 additions & 66 deletions src/clojure/clojurewerkz/welle/core.clj
Expand Up @@ -28,12 +28,10 @@

(def ^{:private true :const true} default-url "http://127.0.0.1:8098/riak")

(def ^:dynamic ^RawClient *riak-client*)

(def ^{:const true} default-cluster-connection-limit 32)


(defn ^clojurewerkz.welle.HTTPClient
(defn ^HTTPClient
connect
"Creates an HTTP client for a given URL, optionally with a custom client ID.
With no arguments, connects to localhost on the default Riak port."
Expand All @@ -47,18 +45,7 @@
(.setClientId c client-id)
c)))

(defn connect!
"Creates an HTTP client for a given URL, and sets the global variable
*riak-client*. All Welle functions which are not passed a client will use
this client by default."
([]
(alter-var-root (var *riak-client*) (constantly (connect))))
([^String url]
(alter-var-root (var *riak-client*) (constantly (connect url))))
([^String url ^String client-id]
(alter-var-root (var *riak-client*) (constantly (connect url client-id)))))

(defn connect-via-pb
(defn ^PBClientAdapter connect-via-pb
"Creates a Protocol Buffers client for the given host and port, or, by
default, to localhost on the default Riak PB port."
([]
Expand All @@ -67,15 +54,6 @@
(doto (PBClientAdapter. (com.basho.riak.pbc.RiakClient. host port))
(.generateAndSetClientId))))

(defn connect-via-pb!
"Creates a Protocol Buffers client for the given host and port, and sets the
global variable *riak-client*. All Welle functions which are not passed a
client will use this client by default."
([]
(alter-var-root (var *riak-client*) (constantly (connect-via-pb))))
([host port]
(alter-var-root (var *riak-client*) (constantly (connect-via-pb host port)))))

(defprotocol HTTPClusterConfigurator
(http-cluster-config-from [self]))

Expand All @@ -93,20 +71,13 @@
(.build))))
res)))

(defn ^com.basho.riak.client.raw.RawClient
(defn ^RawClient
connect-to-cluster
"Creates an HTTP cluster client."
[endpoints]
(let [^ClusterConfig cc (http-cluster-config-from endpoints)]
(HTTPClusterClient. cc)))

(defn connect-to-cluster!
"Creates an HTTP cluster client, and sets the global variable *riak-client*.
All Welle functions which are not passed a client will use this client by
default."
[endpoints]
(alter-var-root (var *riak-client*) (constantly (connect-to-cluster endpoints))))

(defprotocol PBClusterConfigurator
(pbc-cluster-config-from [self]))

Expand All @@ -128,59 +99,36 @@
(.build)))))
res)))

(defn ^com.basho.riak.client.raw.RawClient
(defn ^PBClusterClient
connect-to-cluster-via-pb
"Creates a Protocol Buffers cluster client given a sequence of string
endpoints."
[endpoints]
(let [^ClusterConfig cc (pbc-cluster-config-from endpoints)]
(PBClusterClient. cc)))

(defn connect-to-cluster-via-pb!
"Creates a Protocol Buffers cluster client given a sequence of string
endpoints, and sets the global variable *riak-client*. All Welle functions
which are not passed a client will use this client by default."
[endpoints]
(alter-var-root (var *riak-client*) (constantly (connect-to-cluster-via-pb endpoints))))



(defmacro with-client
"Evaluates body within an implicit do, with the Welle client *riak-client*
bound to the given Riak client."
[client & forms]
`(binding [*riak-client* ~client]
(do ~@forms)))


(defn ping
"Pings a client."
([]
(.ping *riak-client*))
([^RawClient client]
(.ping client)))
[^RawClient client]
(.ping client))

(defn shutdown
"Shuts down a client."
([]
(.shutdown *riak-client*))
([^RawClient client]
(.shutdown client)))
[^RawClient client]
(.shutdown client))


(defn get-client-id
"The client ID used by a given client."
[]
(.getClientId *riak-client*))
[^RawClient client]
(.getClientId client))

(defn stats
"Returns statistics for a client."
[]
(.stats *riak-client*))
[^RawClient client]
(.stats client))

(defn get-base-url
"Returns base HTTP transport URL (e.g. http://127.0.0.1:8098)"
([]
(.getBaseUrl ^HTTPClient *riak-client*))
([^HTTPClient client]
(.getBaseUrl client)))
[^HTTPClient client]
(.getBaseUrl client))
53 changes: 28 additions & 25 deletions src/clojure/clojurewerkz/welle/counters.clj
Expand Up @@ -8,8 +8,7 @@
;; You must not remove this notice, or any other, from this software.

(ns clojurewerkz.welle.counters
(:require [clojurewerkz.welle.core :refer [*riak-client*]]
[clojurewerkz.welle.conversion :refer :all]
(:require [clojurewerkz.welle.conversion :refer :all]
[clojurewerkz.welle.kv :refer [default-retrier]])
(:import [com.basho.riak.client.raw StoreMeta FetchMeta DeleteMeta RawClient RiakResponse]
[com.basho.riak.client.cap Retrier DefaultRetrier ConflictResolver]))
Expand All @@ -26,19 +25,21 @@
`:value` (default 1): value to increment by
`:timeout`: query timeout
"
[^String bucket-name ^String counter-name &{ :keys [w dw pw
^long value
^Boolean return-body
^Integer timeout
^Retrier retrier]
:or {value 1
return-body true
retrier default-retrier}}]
(let [^StoreMeta md (to-store-meta w dw pw return-body nil nil timeout)
^Long value (or value 1)
^Long result (.attempt retrier ^Callable (fn []
(.incrementCounter *riak-client* bucket-name counter-name value md)))]
result))
([^RawClient client ^String bucket-name ^String counter-name]
(increment-counter client bucket-name counter-name {}))
([^RawClient client ^String bucket-name ^String counter-name {:keys [w dw pw
^long value
^Boolean return-body
^Integer timeout
^Retrier retrier]
:or {value 1
return-body true
retrier default-retrier}}]
(let [^StoreMeta md (to-store-meta w dw pw return-body nil nil timeout)
^Long value (or value 1)
^Long result (.attempt retrier ^Callable (fn []
(.incrementCounter client bucket-name counter-name value md)))]
result)))

(defn fetch-counter
"Fetches Riak counter.
Expand All @@ -49,13 +50,15 @@
`:if-modified-vclock`: a vclock instance to use for conditional get. Only supported by Protocol Buffers transport.
`:timeout`: query timeout
"
[^String bucket-name ^String counter &{:keys [r pr not-found-ok basic-quorum
return-deleted-vclock
if-modified-since if-modified-vclock
^Retrier retrier
^Integer timeout]
:or {retrier default-retrier}}]
(let [^FetchMeta md (to-fetch-meta r pr not-found-ok basic-quorum nil nil if-modified-since if-modified-vclock timeout)
^Long result (.attempt retrier ^Callable (fn []
(.fetchCounter *riak-client* bucket-name counter md)))]
result))
([^RawClient client ^String bucket-name ^String counter]
(fetch-counter client bucket-name counter {}))
([^RawClient client ^String bucket-name ^String counter {:keys [r pr not-found-ok basic-quorum
return-deleted-vclock
if-modified-since if-modified-vclock
^Retrier retrier
^Integer timeout]
:or {retrier default-retrier}}]
(let [^FetchMeta md (to-fetch-meta r pr not-found-ok basic-quorum nil nil if-modified-since if-modified-vclock timeout)
^Long result (.attempt retrier ^Callable (fn []
(.fetchCounter client bucket-name counter md)))]
result)))

0 comments on commit a32adb1

Please sign in to comment.