diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..154837e --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,32 @@ +version: 2 +jobs: + unit-test: + parallelism: 2 + working_directory: ~/orb + docker: + - image: omnyway/vulcan:latest + environment: + steps: + - checkout + - run: vulcan test + release: + parallelism: 1 + working_directory: ~/orb + docker: + - image: omnyway/vulcan:latest + environment: + - VERSION_FILE: ../VERSION + - GOROOT: "" + - GOPATH: "/root/.go" + steps: + - checkout + - run: vulcan next-tag > $VERSION_FILE + - run: export SEMVER=$(cat $VERSION_FILE); github-release release --user omnyway-labs --repo orb --tag $SEMVER --target $CIRCLE_SHA1 +workflows: + version: 2 + build_test_release: + jobs: + - release: + filters: + branches: + only: master diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8881c1c --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +/target +target +/classes +/checkouts +pom.xml +pom.xml.asc +*.jar +*.class +/.lein-* +/.nrepl-port +.hgignore +.hg/ +docker-compose.yml +assets +.cpcache/ +scratch.org \ No newline at end of file diff --git a/README.org b/README.org new file mode 100644 index 0000000..be485ba --- /dev/null +++ b/README.org @@ -0,0 +1,91 @@ +* About + +Clojure Library to create Cloudwatch Events and Rules for Lambdas + +=orb= is an abstraction over Cloudwatch Events that encapsulates given +Lambda Target, the Trigger(Schedule or Resource event) and the Input +to the Target when the Trigger occurs. + +#+BEGIN_SRC clojure +(require '[orb.core :as orb]) +(orb/init! {:auth-type :profile + :profile :qa}) +#+END_SRC + +* Scheduling Events + +Orb rules can be Schedule Events - cron or rate. +https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html + +Below orb sets up a schedule to trigger the given Lambda every 5 mins. +Input is an =edn= map + +#+BEGIN_SRC clojure +(orb/add "test-rule-1" + :lambda "arn:aws:lambda:us-east-1:ACC:function:my-lambda-name" + :rule "rate(5 minutes)" + :input {:cue :hello :param {:name "world"}}) +#+END_SRC + +Below orb sets up a schedule to trigger the given Lambda every +midnight UTC. + +#+BEGIN_SRC clojure +(orb/add "test-rule-2" + :lambda "arn:aws:lambda:us-east-1:ACC:function:my-lambda-name" + :rule "cron(0 0 * * ? *)" + :input {:cue :health.ping :param {}}) +#+END_SRC + +* Resource Events + +#+BEGIN_SRC clojure +(orb/add "test-rule-3" + :lambda "arn:aws:lambda:us-east-1:ACC:function:my-lambda-name" + :rule {:id :s3-bucket-change :bucket "my-s3-bucket"} + :input {:cue :upload-data :param {:path "s3-path"}} +#+END_SRC + + +To list rules + +#+BEGIN_SRC clojure +(orb/list) +=> ({:name "test-rule-1", + :state "ENABLED", + :rule "rate(5 minutes)"} + {:name "test-rule-2", + :state "ENABLED", + :rule "cron(0 0 * * ? *)"} + ...) +#+END_SRC + +To delete the rule + +#+BEGIN_SRC clojure +(orb/delete "test-rule-1") +#+END_SRC + +* Invoking Lambda + +=request= invokes the Lambda and returns the response value with +the associated log. + +#+BEGIN_SRC clojure +(orb/request lambda-name payload) +(orb/request "my-lambda-name" {:cue "health.ping" :param {}}) + +=> {:response "pong", + :log + ["START RequestId: 9d1578f3-d4b6-489e-958e-84e7c7cc30b2 Version: $LATEST" + ":input {:cue health.ping, :param {}}" + "END RequestId: 9d1578f3-d4b6-489e-958e-84e7c7cc30b2" + "REPORT RequestId: 9d1578f3-d4b6-489e-958e-84e7c7cc30b2\tDuration: 63.79 ms\tBilled Duration: 100 ms \tMemory Size: 1024 MB\tMax Memory Used: 199 MB\t"], + :error nil, + :version "$LATEST"} +#+END_SRC + +=send= is async and returns immediately with a trackable ID +#+BEGIN_SRC clojure +(orb/send "partman-lambda" {:cue :eventlog.add-partitions :param {}}) +#+END_SRC diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..e9a5f8a --- /dev/null +++ b/deps.edn @@ -0,0 +1,12 @@ +{:deps + {com.amazonaws/aws-java-sdk-events {:mvn/version "1.11.475"}, + com.amazonaws/aws-java-sdk-lambda {:mvn/version "1.11.475"}, + omnyway-labs/saw + {:git/url "https://github.com/omnyway-labs/saw.git", + :sha "52fcaa673306133c0f535c0bc70178e33f84bc3e", + :time "2019-05-03T16:11:18.000+0000", + :tag "0.1.19"}, + org.clojure/clojure {:mvn/version "1.9.0"}, + org.clojure/data.json {:mvn/version "0.2.6"}, + org.slf4j/slf4j-simple {:mvn/version "1.6.1"}}, + :paths ["src"]} diff --git a/src/orb/core.clj b/src/orb/core.clj new file mode 100644 index 0000000..18734ae --- /dev/null +++ b/src/orb/core.clj @@ -0,0 +1,35 @@ +(ns orb.core + (:refer-clojure :exclude [send delete list]) + (:require + [saw.core :as saw] + [orb.event :as event] + [orb.lambda :as lambda])) + +(defn request [fn-name payload] + (lambda/invoke :request-response fn-name payload)) + +(defn send [fn-name payload] + (lambda/invoke :event fn-name payload)) + +(defn list [] + (event/list-rules)) + +(defn add [name & {:keys [rule lambda input]}] + (let [rule-arn (event/add-rule name rule)] + (event/add-target name lambda input) + (lambda/add-permission lambda name rule-arn) + rule-arn)) + +(defn delete [name] + (-> (event/find-target name) + :arn + (lambda/remove-permission name)) + (event/delete-rule name) + :ok) + +(defn init! [auth] + (let [session (saw/login auth) + region (or (:region auth) "us-east-1")] + (event/init! region) + (lambda/init! region) + :ok)) diff --git a/src/orb/event.clj b/src/orb/event.clj new file mode 100644 index 0000000..87c1221 --- /dev/null +++ b/src/orb/event.clj @@ -0,0 +1,107 @@ +(ns orb.event + (:require + [saw.core :as saw] + [orb.event-pattern :as ep] + [clojure.data.json :as json]) + (:import + [com.amazonaws.regions Regions] + [com.amazonaws.services.cloudwatchevents + AmazonCloudWatchEventsClientBuilder] + [com.amazonaws.services.cloudwatchevents.model + PutTargetsRequest + RemoveTargetsRequest + Target + TestEventPatternRequest + PutRuleRequest + DeleteRuleRequest + RemoveTargetsRequest + ListRulesRequest + ListTargetsByRuleRequest + PutEventsRequest + PutEventsRequestEntry + TestEventPatternRequest])) + +(defonce client (atom nil)) + +(defn make-client [region] + (-> (AmazonCloudWatchEventsClientBuilder/standard) + (.withCredentials (saw/creds)) + (.withRegion region) + .build)) + +(defn as-target [t] + {:id (.getId t) + :arn (.getArn t) + :input (-> (.getInput t) + (json/read-str :key-fn keyword))}) + +(defn find-target [rule-name] + (->> (doto (ListTargetsByRuleRequest.) + (.withRule rule-name)) + (.listTargetsByRule @client) + (.getTargets) + (first) + (as-target))) + +(defn- as-rule [r] + {:name (.getName r) + :state (.getState r) + :rule (.getScheduleExpression r)}) + +(defn list-rules [] + (->> (ListRulesRequest.) + (.listRules @client) + (.getRules) + (map as-rule))) + +(defn put-schedule-rule [schedule-expression name] + (->> (doto (PutRuleRequest.) + (.withState "ENABLED") + (.withScheduleExpression schedule-expression) + (.withName name)) + (.putRule @client) + (.getRuleArn))) + +(defn put-event-rule [event-pattern name] + (let [exp (ep/make event-pattern)] + (->> (doto (PutRuleRequest.) + (.withState "ENABLED") + (.withEventPattern exp) + (.withName name)) + (.putRule @client) + (.getRuleArn)))) + +(defn make-target [rule-name orb-arn input] + (let [input-str (json/write-str input)] + (doto (Target.) + (.withId rule-name) + (.withInput input-str) + (.withArn orb-arn)))) + +(defn add-rule [rule-name rule] + (if (and (map? rule) (:event-id rule)) + (put-event-rule rule rule-name) + (put-schedule-rule rule rule-name))) + +(defn add-target [rule-name orb-arn input] + (->> (doto (PutTargetsRequest.) + (.withTargets [(make-target rule-name orb-arn input)]) + (.withRule rule-name)) + (.putTargets @client))) + +(defn delete-target [rule-name target-id] + (->> (doto (RemoveTargetsRequest.) + (.withRule rule-name) + (.withIds [target-id])) + (.removeTargets @client))) + +(defn delete-rule [rule-name] + (->> (find-target rule-name) + :id + (delete-target rule-name)) + (->> (doto (DeleteRuleRequest.) + (.withName rule-name)) + (.deleteRule @client))) + +(defn init! [region] + (reset! client (make-client region))) diff --git a/src/orb/event_pattern.clj b/src/orb/event_pattern.clj new file mode 100644 index 0000000..60fc46b --- /dev/null +++ b/src/orb/event_pattern.clj @@ -0,0 +1,16 @@ +(ns orb.event-pattern) + +(defn s3-bucket-event [{:keys [bucket-name]}] + {:source ["aws.s3"] + :detailType ["AWS API Call via CloudTrail"] + :detail {:eventSource ["s3.amazonaws.com"] + :eventName ["PutObject"] + :requestParameters + {"bucketName" [bucket-name]}}}) + +(def events + {:s3-bucket-change #'s3-bucket-event}) + +(defn make [{:keys [event-id] :as param}] + (when-let [f (get events event-id)] + (f param))) diff --git a/src/orb/lambda.clj b/src/orb/lambda.clj new file mode 100644 index 0000000..4b78f07 --- /dev/null +++ b/src/orb/lambda.clj @@ -0,0 +1,81 @@ +(ns orb.lambda + (:require + [clojure.string :as str] + [clojure.data.json :as json] + [saw.core :as saw]) + (:import + [java.util Base64] + [com.amazonaws.services.lambda + AWSLambdaClientBuilder] + [com.amazonaws.services.lambda.model + InvokeRequest + LogType + AddPermissionRequest + RemovePermissionRequest + InvocationType])) + +(def client (atom nil)) + +(defn make-client [region] + (-> (AWSLambdaClientBuilder/standard) + (.withCredentials (saw/creds)) + (.withRegion region) + .build)) + +(defn b64-decode [s] + (.decode (Base64/getDecoder) s)) + +(defn b64-encode [s] + (.encode (Base64/getEncoder) s)) + +(defn as-result [x] + {:response (->> (.getPayload x) + (.array) + (map char) + (apply str) + (json/read-str)) + :log (as-> (->> (.getLogResult x) + (b64-decode) + (map char) + (apply str)) r + (str/split r #"\n")) + :error (.getFunctionError x) + :version (.getExecutedVersion x)}) + +(defn as-log-type [type] + (condp = type + :tail (LogType/valueOf "Tail") + nil (LogType/valueOf "None"))) + +(defn as-invocation-type [type] + (condp = type + :request-response (InvocationType/valueOf "RequestResponse") + :event (InvocationType/valueOf "Event") + :dry-run (InvocationType/valueOf "DryRun"))) + +(defn invoke [type fn-name payload] + (->> (doto (InvokeRequest.) + (.withInvocationType (as-invocation-type type)) + (.withFunctionName fn-name) + (.withLogType (as-log-type :tail)) + (.withPayload (json/write-str payload))) + (.invoke @client) + (as-result))) + +(defn add-permission [fn-name rule-name rule-arn] + (->> (doto (AddPermissionRequest.) + (.withFunctionName fn-name) + (.withStatementId rule-name) + (.withPrincipal "events.amazonaws.com") + (.withAction "lambda:InvokeFunction") + (.withSourceArn rule-arn)) + (.addPermission @client))) + +(defn remove-permission [fn-name rule-name] + (->> (doto (RemovePermissionRequest.) + (.withFunctionName fn-name) + (.withStatementId rule-name)) + (.removePermission @client))) + +(defn init! [region] + (reset! client (make-client region))) diff --git a/test/orb/core_test.clj b/test/orb/core_test.clj new file mode 100644 index 0000000..230a021 --- /dev/null +++ b/test/orb/core_test.clj @@ -0,0 +1,16 @@ +(ns orb.core-test + (:require + [clojure.test :refer :all] + [orb.core :as orb])) + +(deftest ^:integration add-list-test + (orb/init! :qa-002) + (is (= [{:name "test-rule" + :state "ENABLED" + :rule "rate(5 minutes)"}] + (do + (orb/add "test-rule" + :rule "rate(5 minutes)" + :lambda (System/getenv "ORB_LAMBDA_ARN") + :input {:cue "health.ping" :param {}}) + (orb/list)))))