Skip to content
This repository has been archived by the owner on Mar 22, 2024. It is now read-only.

Commit

Permalink
update to gclouj 0.2.5; add cleanup stage to delete temporary cloudst…
Browse files Browse the repository at this point in the history
…orage data (issue #3)
  • Loading branch information
pingles committed Jul 8, 2016
1 parent 1023285 commit 0506fc8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
:url "http://www.eclipse.org/legal/epl-v10.html"}
:uberjar-name "big-replicate-standalone.jar"
:dependencies [[org.clojure/clojure "1.8.0"]
[gclouj/bigquery "0.2.4" :exclusions [commons-logging]]
[gclouj/bigquery "0.2.5" :exclusions [commons-logging]]
[gclouj/storage "0.2.5"]
[org.clojure/tools.cli "0.3.1"]
[org.clojure/tools.logging "0.3.1"]
[org.clojure/core.async "0.2.385"]
Expand Down
46 changes: 28 additions & 18 deletions src/uswitch/big_replicate/sync.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns uswitch.big-replicate.sync
(:require [gclouj.bigquery :as bq]
[gclouj.storage :as cs]
[clojure.string :as s]
[clojure.tools.cli :refer (parse-opts)]
[clojure.set :as se]
Expand Down Expand Up @@ -39,23 +40,24 @@
(map (fn [{:keys [project-id dataset-id table-id]}] (TableReference. project-id dataset-id table-id)))))

(defn staging-location [bucket {:keys [dataset-id table-id] :as table-reference}]
(format "%s/%s/%s/*" bucket dataset-id table-id))


(let [prefix (format "%s/%s" dataset-id table-id)]
{:uri (format "%s/%s/*" bucket prefix)
:prefix prefix}))




(defn extract-table [{:keys [source-table staging-bucket] :as current-state}]
{:pre [(.startsWith staging-bucket "gs://")
(not (.endsWith staging-bucket "/"))]}
(let [uri (staging-location staging-bucket source-table)]
(let [{:keys [uri prefix]} (staging-location staging-bucket source-table)]
(info "starting extract for" source-table "into" uri)
(let [job (bq/extract-job (bq/service {:project-id (:project-id source-table)}) source-table uri)]
(assoc current-state
:state :wait-for-extract
:extract-uri uri
:job job))))
:state :wait-for-extract
:extract-uri uri
:staging-prefix prefix
:job job))))

(defn pending? [job]
(= :pending (get-in job [:status :state])))
Expand Down Expand Up @@ -85,7 +87,7 @@
(assoc current-state :job job)))))

(def wait-for-extract (partial wait-for-job :load))
(def wait-for-load (partial wait-for-job :completed))
(def wait-for-load (partial wait-for-job :cleanup))

(defn load-table [{:keys [destination-table source-table extract-uri] :as current-state}]
(let [table (bq/table (bq/service {:project-id (:project-id source-table)}) source-table)
Expand All @@ -100,6 +102,23 @@
:state :wait-for-load
:job job))))

(defn cleanup [{:keys [extract-uri staging-bucket staging-prefix] :as current-state}]
(let [bucket (st/replace staging-bucket "gs://" "")
storage (cs/service)]
(info "deleting staging location" extract-uri)
(debug "finding objects in" bucket "with prefix" staging-prefix)
(loop [objects (cs/blobs storage bucket staging-prefix)]
(if-let [blobs (seq objects)]
(let [blob (first blobs)]
(debug "deleting" blob)
(let [deleted? (cs/delete-blob storage (:id blob))]
(if deleted?
(recur (rest objects))
(assoc current-state
:state :failed
:cause (format "couldn't delete %s" (pr-str (:id blob)))))))
(assoc current-state :state :completed)))))


(defn failed [current-state]
(error "failed" current-state))
Expand All @@ -115,6 +134,7 @@
:failed failed
:load load-table
:wait-for-load wait-for-load
:cleanup cleanup
:completed completed} state)
next-state (try (op current-state)
(catch Exception ex
Expand All @@ -133,16 +153,6 @@
(recur (a/<!! in-ch))))))


;; sources:
;; project, dataset, table
;;
;; destinations:
;; table is input, project and dataset (although maybe overridden from cli)
;;
;; need to find tables to sync, but may have different project/dataset
;; so probably need to diff on just table
;; but then build destination based on source + cli

(defn missing-tables [sources destinations]
(let [s (map :table-id sources)
d (map :table-id destinations)
Expand Down

0 comments on commit 0506fc8

Please sign in to comment.