Skip to content


Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaoussanis committed Jun 22, 2013
2 parents 22f28d1 + 220e51c commit 569f26d
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 86 deletions.
15 changes: 7 additions & 8 deletions
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Current [semantic]( version:

[com.taoensso/faraday "0.8.0"] ; Alpha - subject to change
[com.taoensso/faraday "0.9.0"] ; Alpha - subject to change

# Faraday, a Clojure DynamoDB client
Expand All @@ -28,7 +28,7 @@ DynamoDB's done a fantastic job of hiding (in a good way) a lot of the complexit
Add the necessary dependency to your [Leiningen]( `project.clj` and `require` the library in your ns:

[com.taoensso/faraday "0.8.0"] ; project.clj
[com.taoensso/faraday "0.9.0"] ; project.clj
(ns my-app (:require [taoensso.faraday :as far])) ; ns

Expand All @@ -49,13 +49,12 @@ First thing is to make sure you've got an **[AWS DynamoDB account](
Well that was easy. How about we create a table? (This is actually one of the most complicated parts of working with DynamoDB since it requires understanding how DynamoDB [provisions capacity]( and how its [primary keys]( work. Anyway, we can safely ignore the specifics for now).

(far/create-table my-creds
{:name :my-table
:throughput {:read 1 :write 1} ; Read & write capacity (units/sec)
:hash-keydef {:name :id :type :n}} ; Primary key (:n => number type)
(far/create-table my-creds :my-table
[:id :n] ; Primary key named "id", (:n => number type)
{:throughput {:read 1 :write 1} ; Read & write capacity (units/sec)

;; Wait a minute for the table to be created... maybe grab a tasty sandwich?
;; Wait a minute for the table to be created... got a sandwich handy?

(far/list-tables my-creds)
=> [:my-table] ; There's our new table!
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.taoensso/faraday "0.8.0"
(defproject com.taoensso/faraday "0.9.0"
:description "Clojure DynamoDB client"
:url ""
:license {:name "Eclipse Public License"
Expand Down
185 changes: 112 additions & 73 deletions src/taoensso/faraday.clj
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,6 @@
ScanResult (as-map [r] (am-query|scan-result r
{:scanned-count (.getScannedCount r)}))

CreateTableResult (as-map [r] r) ; TODO
UpdateTableResult (as-map [r] r) ; TODO
DeleteTableResult (as-map [r] r) ; TODO

(as-map [r]
{:items (utils/keyword-map as-map (.getResponses r))
Expand All @@ -220,24 +216,28 @@
{:unprocessed (.getUnprocessedItems r)
:cc-units (cc-units (.getConsumedCapacity r))})

(as-map [r]
(let [t (.getTable r)]
{:name (keyword (.getTableName t))
:creation-date (.getCreationDateTime t)
:item-count (.getItemCount t)
:size (.getTableSizeBytes t)
:throughput (as-map (.getProvisionedThroughput t))
:indexes (as-map (.getLocalSecondaryIndexes t))
:status (utils/un-enum (.getTableStatus t))
(let [schema (as-map (.getKeySchema t))
defs (as-map (.getAttributeDefinitions t))]
(merge-with merge
(reduce-kv (fn [m k v] (assoc m (:name v) {:key-type (:type v)}))
{} schema)
(reduce-kv (fn [m k v] (assoc m (:name v) {:data-type (:type v)}))
{} defs)))}))
(as-map [d]
{:name (keyword (.getTableName d))
:creation-date (.getCreationDateTime d)
:item-count (.getItemCount d)
:size (.getTableSizeBytes d)
:throughput (as-map (.getProvisionedThroughput d))
:indexes (as-map (.getLocalSecondaryIndexes d))
:status (utils/un-enum (.getTableStatus d))
(let [schema (as-map (.getKeySchema d))
defs (as-map (.getAttributeDefinitions d))]
(merge-with merge
(reduce-kv (fn [m k v] (assoc m (:name v) {:key-type (:type v)}))
{} schema)
(reduce-kv (fn [m k v] (assoc m (:name v) {:data-type (:type v)}))
{} defs)))})

DescribeTableResult (as-map [r] (as-map (.getTable r)))
CreateTableResult (as-map [r] (as-map (.getTableDescription r)))
UpdateTableResult (as-map [r] (as-map (.getTableDescription r)))
DeleteTableResult (as-map [r] (as-map (.getTableDescription r)))

(as-map [d] {:name (keyword (.getIndexName d))
Expand Down Expand Up @@ -267,14 +267,14 @@
(defn block-while-table-status
"BLOCKS to poll for a change to table's status. On status change, polling will
terminate and the table's new description will be returned."
[creds table-name status & [{:keys [poll-ms timeout-ms timeout-val]
:or {poll-ms 5000}}]]
[creds table status & [{:keys [poll-ms timeout-ms timeout-val]
:or {poll-ms 5000}}]]
(assert (#{:creating :updating :deleting :active} (utils/un-enum status))
(str "Invalid table status: " status))
(let [polling-future
(loop []
(let [current-descr (describe-table creds table-name)]
(let [current-descr (describe-table creds table)]
(if-not (= (:status current-descr) (utils/un-enum status))
(do (Thread/sleep poll-ms)
Expand All @@ -287,13 +287,9 @@
(do (future-cancel polling-future)

(create-table mc {:name "delete-me5"
:throughput {:read 1 :write 1}
:hash-keydef {:name :id :type :s}})
(def descr (describe-table mc "delete-me5"))
(block-while-table-status mc "delete-me5" :creating) ; ~53000ms
(comment (create-table mc "delete-me5" [:id :s])
(block-while-table-status mc "delete-me5" :creating) ; ~53000ms
(def descr (describe-table mc "delete-me5")))

(defn- key-schema-element "Returns a new KeySchemaElement object."
[key-name key-type]
Expand All @@ -305,9 +301,9 @@
"Returns a [{<hash-key> KeySchemaElement}], or
[{<hash-key> KeySchemaElement} {<range-key> KeySchemaElement}]
vector for use as a table/index primary key."
[hash-keydef & [range-keydef]]
(cond-> [(key-schema-element (:name hash-keydef) :hash)]
range-keydef (conj (key-schema-element (:name range-keydef) :range))))
[[hname _ :as hash-keydef] & [[rname _ :as range-keydef]]]
(cond-> [(key-schema-element hname :hash)]
range-keydef (conj (key-schema-element rname :range))))

(defn- provisioned-throughput "Returns a new ProvisionedThroughput object."
[{read-units :read write-units :write :as throughput}]
Expand All @@ -317,14 +313,16 @@
(.setReadCapacityUnits (long read-units))
(.setWriteCapacityUnits (long write-units))))

(defn- attribute-defs "[{:name _ :type _} ...] defs -> [AttributeDefinition ...]"
(defn- keydefs "[<name> <type>] defs -> [AttributeDefinition ...]"
[hash-keydef range-keydef indexes]
(let [defs (->> (conj [] hash-keydef range-keydef)
(concat (mapv :range-keydef indexes))
(filterv identity))]
(fn [{key-name :name key-type :type :as def}]
(assert (and key-name key-type) (str "Malformed def: " def))
(fn [[key-name key-type :as def]]
(assert (and key-name key-type) (str "Malformed keydef: " def))
(assert (#{:s :n :ss :ns :b :bs} key-type)
(str "Invalid keydef type: " key-type))
(doto (AttributeDefinition.)
(.setAttributeName (name key-name))
(.setAttributeType (utils/enum key-type))))
Expand Down Expand Up @@ -353,55 +351,99 @@

(defn create-table
"Creates a table with options:
:name - Table name.
hash-keydef - [<name> <#{:s :n :ss :ns :b :bs}>].
:range-keydef - [<name> <#{:s :n :ss :ns :b :bs}>].
:throughput - {:read <units> :write <units>}.
:hash-keydef - {:name _ :type <#{:s :n :ss :ns :b :bs}>}.
:range-keydef - {:name _ :type <#{:s :n :ss :ns :b :bs}>}.
:indexes - [{:name _ :range-keydef _
:projection #{:all :keys-only [<attr> ...]}}].
:block? - Block for table to actually be active?"
[creds {table-name :name
:keys [throughput hash-keydef range-keydef indexes block?]
:or {throughput {:read 1 :write 1}}}]

(let [request-result
[creds table-name hash-keydef
& [{:keys [range-keydef throughput indexes block?]
:or {throughput {:read 1 :write 1}}}]]
(let [result
(.createTable (db-client creds)
(doto (CreateTableRequest.)
(.setTableName (name table-name))
(.setKeySchema (key-schema hash-keydef range-keydef))
(.setProvisionedThroughput (provisioned-throughput throughput))
(.setAttributeDefinitions (attribute-defs hash-keydef range-keydef indexes))
(.setAttributeDefinitions (keydefs hash-keydef range-keydef indexes))
(.setLocalSecondaryIndexes (local-indexes hash-keydef indexes)))))]

(if-not block?
(do (block-while-table-status creds table-name :creating)

(time (create-table mc {:name "delete-me7"
:throughput {:read 1 :write 1}
:hash-keydef {:name :id :type :s}
:block? true})))
(comment (time (create-table mc "delete-me7" [:id :s] {:block? true})))

(defn ensure-table "Creates a table iff it doesn't already exist."
[creds {table-name :name :as opts}]
(when-not (describe-table creds table-name) (create-table creds opts)))
[creds table-name & opts]
(when-not (describe-table creds table-name)
(apply create-table creds table-name opts)))

(defn- throughput-steps
"Dec by any amount, inc by <= 2x current amount, Ref.
x - start, x' - current, x* - goal."
[[r w] [r* w*]]
(let [step (fn [x* x'] (if (< x* x') x* (min x* (* 2 x'))))]
(loop [steps [] [r' w'] [r w]]
(if (and (>= r' r*) (>= w' w*))
(let [[r' w' :as this-step] [(step r* r') (step w* w')]]
(recur (conj steps this-step) this-step))))))

(comment (throughput-steps [1 1] [1 1])
(throughput-steps [1 1] [12 12])
(throughput-steps [3 3] [27 27])
(throughput-steps [17 8] [3 22])
(throughput-steps [1 1] [300 300]))

(defn update-table
"Updates a table. Ref. for important throughput
upgrade/downgrade limits."
[creds {table-name :name :keys [throughput]}]
(.updateTable (db-client creds)
(doto-maybe (UpdateTableRequest.) g
table-name (.setTableName (name g))
throughput (.setProvisionedThroughput (provisioned-throughput g))))))
"Updates a table. Allows automatic multi-step adjustments to conform to
update limits, Ref.
Returns a promise to which the final resulting table description will be
delivered. Deref this promise to block until update (all steps) complete."
[creds table throughput & [{:keys [max-reqs]
:or {max-reqs 5}}]]
(let [{read* :read write* :write} throughput
{:keys [status throughput]} (describe-table creds table)
{:keys [read write num-decreases-today]} throughput

decreasing? (or (< read* read) (< write* write))
steps (throughput-steps [read write] [read* write*])
nsteps (count steps)]
(when-not (empty? steps)
(cond (not= status :active)
(throw (Exception. (str "Invalid table status: " status)))
(and decreasing? (>= num-decreases-today 4)) ; API limit
(throw (Exception. (str "Max 4 decreases per 24hr period")))
(> nsteps max-reqs)
(throw (Exception. (str "`max-reqs` too low, needs reqs: " nsteps)))
(letfn [(run1 [[r' w']]
(.updateTable (db-client creds)
(doto (UpdateTableRequest.)
(.setTableName (name table))
(.setProvisionedThroughput (provisioned-throughput
{:read r' :write w'})))))
;; Returns _new_ descr when ready:
(block-while-table-status creds table :updating))]

(let [p (promise)]
(future (deliver p (peek (mapv run1 steps))))

(def dt (describe-table creds :faraday.tests.main))
(let [p (update-table creds :faraday.tests.main {:read 1 :write 1})]

(defn delete-table "Deletes a table, go figure."
[creds table-name]
(as-map (.deleteTable (db-client creds) (DeleteTableRequest. (name table-name)))))
[creds table]
(as-map (.deleteTable (db-client creds) (DeleteTableRequest. (name table)))))

;;;; API - items

Expand Down Expand Up @@ -713,7 +755,7 @@

;;;; Misc. helpers

;; TODO Automatic throughput (and > 2x) adjustment tools
;; TODO Automatic throughput adjustment tools

(defn items-by-attrs
"Groups one or more items by one or more attributes, returning a map of form
Expand Down Expand Up @@ -752,12 +794,9 @@
:secret-key ""})

(far/list-tables my-creds)

(far/create-table my-creds
{:name :my-table
:throughput {:read 1 :write 1}
:hash-key {:name :id
:type :n}})
(far/create-table my-creds :my-table [:id :n]
{:throughput {:read 1 :write 1}

(far/put-item my-creds
Expand Down
6 changes: 2 additions & 4 deletions test/taoensso/faraday/tests/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@

(defn- setup {:expectations-options :before-run} []
(println "Setting up testing environment...")
(far/ensure-table creds
{:name ttable
:hash-keydef {:name :id :type :n}
:throughput {:read 1 :write 1}
(far/ensure-table creds ttable {:name :id :type :n}
{:throughput {:read 1 :write 1}
:block? true})
(println "Ready to roll..."))

Expand Down

0 comments on commit 569f26d

Please sign in to comment.