Skip to content

Commit

Permalink
Merge branch 'master' of github.com:surt666/dynamo4clj
Browse files Browse the repository at this point in the history
  • Loading branch information
surt666 committed Apr 24, 2012
2 parents 2b6b1fa + 52f8b02 commit 1e1af99
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 37 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject dynamo4clj "1.0.3"
(defproject dynamo4clj "1.0.5"
:description "Amazon DynamoDB API"
:dependencies [[org.clojure/clojure "1.4.0"]
[com.amazonaws/aws-java-sdk "1.3.8"]
Expand Down
93 changes: 57 additions & 36 deletions src/dynamo4clj/core.clj
Original file line number Diff line number Diff line change
@@ -1,43 +1,63 @@
(ns dynamo4clj.core
(:use [clojure.algo.generic.functor :only (fmap)]
[clojure.walk :only (stringify-keys keywordize-keys)])
(:import [com.amazonaws.auth AWSCredentials BasicAWSCredentials PropertiesCredentials]
(:import [com.amazonaws.auth STSSessionCredentialsProvider AWSCredentials BasicAWSCredentials PropertiesCredentials]
[com.amazonaws.services.dynamodb AmazonDynamoDBClient]
[com.amazonaws.services.dynamodb.model AttributeValue AttributeValueUpdate AttributeAction PutItemRequest QueryRequest Key GetItemRequest DeleteItemRequest ScanRequest UpdateItemRequest ReturnValue Condition ComparisonOperator KeysAndAttributes BatchGetItemRequest BatchGetItemResult BatchResponse BatchWriteItemRequest WriteRequest PutRequest DeleteRequest]
[com.amazonaws AmazonServiceException ClientConfiguration Protocol]
[java.util HashMap Properties]))

(defn get-client
(^AmazonDynamoDBClient []
"Configure client from aws.properties and config.properties"
(def refresh (* 1000 60 40)) ; 40 minutes

(defn get-client
([]
"Configures a client from aws.properties and config.properties"
(let [credstream (.getResourceAsStream (clojure.lang.RT/baseLoader) "aws.properties")
configstream (.getResourceAsStream (clojure.lang.RT/baseLoader) "config.properties")
creds (PropertiesCredentials. credstream)
config (ClientConfiguration.)
provider-config (ClientConfiguration.)
props (Properties.)]
(. props (load configstream))
(if (= (. props (getProperty "protocol")) "https") (. config (setProtocol Protocol/HTTPS)) (. config (setProtocol Protocol/HTTP)))
(. config (setMaxErrorRetry 3))
(when-not (nil? (. props (getProperty "proxy-host"))) (. config (setProxyHost (. props (getProperty "proxy-host")))))
(when-not (nil? (. props (getProperty "proxy-port"))) (. config (setProxyPort (Integer/parseInt (. props (getProperty "proxy-port"))))))
(doto (AmazonDynamoDBClient. creds config) (.setEndpoint (. props (getProperty "region"))))))
(^AmazonDynamoDBClient [{:keys [access-key secret-key proxy-host proxy-port region protocol] :as config}]
(. provider-config (setMaxErrorRetry 3))
(. provider-config (setProtocol Protocol/HTTPS)) ; provider-config must use https
(when-not (nil? (. props (getProperty "proxy-host"))) (. config (setProxyHost (. props (getProperty "proxy-host"))))(. provider-config (setProxyHost (. props (getProperty "proxy-host")))))
(when-not (nil? (. props (getProperty "proxy-port"))) (. config (setProxyPort (Integer/parseInt (. props (getProperty "proxy-port")))))(. provider-config (setProxyPort (Integer/parseInt (. props (getProperty "proxy-port"))))) )
(if (= (. props (getProperty "protocol")) "https") (. config (setProtocol Protocol/HTTPS)) (. config (setProtocol Protocol/HTTP)))
(let [provider (STSSessionCredentialsProvider. (PropertiesCredentials. credstream) provider-config)
client-map {:session-provider provider :time (System/currentTimeMillis) :client (doto (AmazonDynamoDBClient. provider config) (.setEndpoint (. props (getProperty "region"))))}]
(atom client-map ))))

([{:keys [access-key secret-key proxy-host proxy-port protocol region] :as configuration}]
"Configures a client
:access-key mandatory
:secret-key mandatory
:region mandatory (ex. for europe dynamodb.eu-west-1.amazonaws.com )
:proxy-host optional
:proxy-port integer optional
:protocol http|https optional
"
(let [creds (BasicAWSCredentials. access-key secret-key)
config (ClientConfiguration.)]
(let [config (ClientConfiguration.)
provider-config (ClientConfiguration.)]
(if (= protocol "https") (. config (setProtocol Protocol/HTTPS)) (. config (setProtocol Protocol/HTTP)))
(. config (setMaxErrorRetry 3))
(when proxy-host (. config (setProxyHost proxy-host )))
(when (number? proxy-port) (. config (setProxyPort proxy-port)))
(doto (AmazonDynamoDBClient. creds config) (.setEndpoint region )))))
(. provider-config (setProtocol Protocol/HTTPS)) ; provider-config must use https
(. provider-config (setMaxErrorRetry 3))
(when proxy-host (. config (setProxyHost proxy-host ))(. provider-config (setProxyHost proxy-host )))
(when (number? proxy-port) (. config (setProxyPort proxy-port))(. provider-config (setProxyPort proxy-port)))
(let [provider (STSSessionCredentialsProvider. (BasicAWSCredentials. access-key secret-key) provider-config)
client-map {:session-provider provider
:time (System/currentTimeMillis)
:client (doto (AmazonDynamoDBClient. provider config) (.setEndpoint region))}]
(atom client-map )))))

(defn- ^AmazonDynamoDBClient refresh-client [client-atom]
(let [{:keys [client time session-provider] :as client-map} @client-atom
now (System/currentTimeMillis)]
(when (> now (+ time refresh))
(.refresh ^STSSessionCredentialsProvider session-provider)
(swap! client-atom #(assoc % :time now )))
client))

;
;
Expand Down Expand Up @@ -81,11 +101,12 @@
(fmap get-value (into {} item))))


(defn get-item [^AmazonDynamoDBClient client table hash-key & [range-key]]
(defn get-item [client table hash-key & [range-key]]
"Retrieve an item from a table by its hash key."
(let [key (if range (item-key-range hash-key range-key) (item-key hash-key))
ires (. client (getItem (doto (GetItemRequest.) (.withTableName table) (.withKey key))))]
(with-meta (keywordize-keys (to-map (.getItem ires))) {:consumed-capacity-units (.getConsumedCapacityUnits ires)})))
ires (. (refresh-client client) (getItem (doto (GetItemRequest.) (.withTableName table) (.withKey key))))
item (keywordize-keys (to-map (.getItem ires)))]
(when item (with-meta item {:consumed-capacity-units (.getConsumedCapacityUnits ires)}))))

(defn- create-keys-and-attributes [keys]
(let [ka (KeysAndAttributes.)]
Expand All @@ -99,10 +120,10 @@
res
(recur (rest r) (assoc res ((first r) 0) (create-keys-and-attributes ((first r) 1)))))))

(defn get-batch-items [^AmazonDynamoDBClient client requests]
(defn get-batch-items [client requests]
"requests is a vector of vectors of the following form [[table1 [hash1 hash3]] [table2 [[hash1 range1] [hash4 range4]]]]"
(let [ri (get-request-items requests)
batchresult (. client (batchGetItem (doto (BatchGetItemRequest.) (.withRequestItems ri))))
batchresult (. (refresh-client client)(batchGetItem (doto (BatchGetItemRequest.) (.withRequestItems ri))))
tables (map #(first %) requests)]
(loop [t tables res {}]
(if (empty? t)
Expand All @@ -113,30 +134,30 @@

;;TODO specify returnvalues for all calls

(defn delete-item [^AmazonDynamoDBClient client table hash-key & [range-key]]
(defn delete-item [client table hash-key & [range-key]]
"Delete an item from a table by its hash key and optional range-key. Return old value"
(let [req (if range-key
(doto (DeleteItemRequest.) (.withTableName table) (.withKey (item-key-range hash-key range-key)) (.withReturnValues ReturnValue/ALL_OLD))
(doto (DeleteItemRequest.) (.withTableName table) (.withKey (item-key hash-key)) (.withReturnValues ReturnValue/ALL_OLD)))
dres (. client (deleteItem req))
dres (. (refresh-client client)(deleteItem req))
attr (.getAttributes dres)]
(with-meta (keywordize-keys (to-map attr)) {:consumed-capacity-units (.getConsumedCapacityUnits dres)})))


(defn insert-item [^AmazonDynamoDBClient client table item]
(defn insert-item [client table item]
"Insert item (map) in table. Returns empty map if new key, else returns the old value"
(let [req (doto (PutItemRequest.) (.withTableName table) (.withItem (fmap to-attr-value (stringify-keys item))) (.withReturnValues ReturnValue/ALL_OLD))
pres (. client (putItem req))
pres (. (refresh-client client) (putItem req))
attr (.getAttributes pres)]
(with-meta (if attr (keywordize-keys (to-map attr)) {}) {:consumed-capacity-units (.getConsumedCapacityUnits pres)})))


(defn update-item [^AmazonDynamoDBClient client table hash-key attr & [range-key]]
(defn update-item [client table hash-key attr & [range-key]]
"Update item (map) in table with optional attributes"
(let [key (if range-key (item-key-range hash-key range-key) (item-key hash-key))
attrupd (fmap to-attr-value-update (stringify-keys attr))
req (doto (UpdateItemRequest.) (.withTableName table) (.withKey key) (.withReturnValues ReturnValue/ALL_NEW) (.withAttributeUpdates attrupd))
ures (. client (updateItem req))]
ures (. (refresh-client client) (updateItem req))]
(with-meta (keywordize-keys (to-map (.getAttributes ures))) {:consumed-capacity-units (.getConsumedCapacityUnits ures)})))

(defn- create-condition [c]
Expand All @@ -158,23 +179,23 @@

(defn find-items
"Find items with key and optional range. Range has the form [operator param1 param2] or [operator param1], and return-attributes is a vector of attributes to return as in [attr1 attr2]"
([^AmazonDynamoDBClient client table key consistent range-condition return-attributes]
([client table key consistent range-condition return-attributes]
(let [condition (create-condition range-condition)
reqq (cond
(empty? range-condition) (doto (QueryRequest.) (.withTableName table) (.withHashKeyValue (to-attr-value key)) (.withConsistentRead consistent))
(not (empty? range-condition)) (doto (QueryRequest.) (.withTableName table) (.withHashKeyValue (to-attr-value key)) (.withRangeKeyCondition condition) (.withConsistentRead consistent)))
req (if (empty? return-attributes) reqq (doto reqq (.withAttributesToGet ^java.util.Collection (map #(name %) return-attributes))))]
(let [qres (. client (query req))]
(let [qres (. (refresh-client client) (query req))]
(with-meta (keywordize-keys (map to-map (.getItems qres)))
{:consumed-capacity-units (.getConsumedCapacityUnits qres) :count (.getCount qres) :last-key (.getLastEvaluatedKey qres)}))))
([^AmazonDynamoDBClient client table key consistent range-condition]
([client table key consistent range-condition]
(find-items client table key consistent range-condition nil))
([^AmazonDynamoDBClient client table key consistent]
([client table key consistent]
(find-items client table key consistent nil nil)))

(defn scan
"Return the items in a DynamoDB table. Conditions is vector of tuples like [field operator param1 param2] or [field operator param1]. Return-attributes is a vector of attributes to return as in [attr1 attr2]"
([^AmazonDynamoDBClient client table conditions return-attributes]
([client table conditions return-attributes]
(let [conds (loop [c (first conditions) res {}]
(if (empty? c)
res
Expand All @@ -183,12 +204,12 @@
(empty? conds) (doto (ScanRequest.) (.withTableName table))
(not (empty? conds)) (doto (ScanRequest.) (.withTableName table) (.withScanFilter conds)))
req (if (empty? return-attributes) reqq (doto reqq (.withAttributesToGet ^java.util.Collection (map #(name %) return-attributes))))]
(let [sres (. client (scan req))]
(let [sres (. (refresh-client client) (scan req))]
(with-meta (keywordize-keys (map to-map (.getItems sres)))
{:consumed-capacity-units (.getConsumedCapacityUnits sres) :count (.getCount sres) :last-key (.getLastEvaluatedKey sres)})))))
([^AmazonDynamoDBClient client table conditions]
([client table conditions]
(scan client table conditions nil))
([^AmazonDynamoDBClient client table]
([client table]
(scan client table nil nil)))

(defn- map2attrmap [m]
Expand Down Expand Up @@ -228,4 +249,4 @@
(batch-del-write client write-map "w"))

(defn batch-delete [client delete-map]
(batch-del-write client delete-map "d"))
(batch-del-write client delete-map "d"))
Loading

0 comments on commit 1e1af99

Please sign in to comment.