Skip to content
This repository has been archived by the owner on Mar 22, 2024. It is now read-only.

Commit

Permalink
use logback in uberjar
Browse files Browse the repository at this point in the history
  • Loading branch information
pingles committed Jun 21, 2016
0 parents commit 5d3d2fa
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target/

5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SOURCES=$(wildcard *.clj)

./target/big-replicate-standalone.jar: $(SOURCES)
echo $(SOURCES)
lein uberjar
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Big Replicate
Replicates data between Google Cloud BigQuery projects. Currently focused on copying Google Analytics Premium BigQuery exported data between different projects.

## Usage

The tool currently expects the destination project to have a dataset (with the same name as the source) that already exists. It will look for any session tables that are missing from the destination dataset, and replicate the `--number` of most recent ones.

```
export GCLOUD_PROJECT="$SOURCE_PROJECT"
export GOOGLE_APPLICATION_CREDENTIALS="./service-account-key.json"
export SOURCE_PROJECT="some-project"
export DESTINATION_PROJECT="other-project"
export STAGING_BUCKET="gs://some-bucket"
export JVM_OPTS="-Dlogback.configurationFile=./logback.example.xml"
java $JVM_OPTS -jar bigreplicate.jar \
--source-project source-project-id \
--destination-project destination-project-id \
--google-cloud-bucket gs://staging-data-bucket \
--datasets 98909919 \
--number 30
```

Because only missing tables from the destination dataset are processed, tables will not be overwritten.

## Building

The tool is written in [Clojure](https://clojure.org) and requires [Leiningen](https://github.com/technomancy/leiningen).

```
$ lein uberjar
```

## To Do

* The tool currently assumes its replicating only Google Analytics exported data. It would be nice to change this to allow a table regexp to be specified on the cli so its less specific to GA data.
* The tool also assumes its replicating data between projects (with datasets of the same name). It might be useful to be able to copy data across projects as well as datasets.
* Staging data is not automatically deleted once its been loaded into the destination table.

## License

Copyright © 2016 uSwitch

Distributed under the Eclipse Public License either version 1.0 or (at
your option) any later version.
12 changes: 12 additions & 0 deletions logback.example.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="10 seconds">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%d] %-5level %logger{36} - %msg%n%ex{full}</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
18 changes: 18 additions & 0 deletions project.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
(defproject big-replicate "0.1.0-SNAPSHOT"
:description "Copies data between BigQuery projects"
:url "https://github.com/uswitch/big-replicate"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:main uswitch.big-replicate
:uberjar-name "big-replicate-standalone.jar"
:dependencies [[org.clojure/clojure "1.8.0"]
[gclouj/bigquery "0.2.4-SNAPSHOT" :exclusions [commons-logging]]
[org.clojure/tools.cli "0.3.1"]
[org.clojure/tools.logging "0.3.1"]
[org.clojure/core.async "0.2.385"]
[org.slf4j/slf4j-api "1.7.21"]
[org.slf4j/jcl-over-slf4j "1.7.21"]]
:profiles {:dev {:dependencies [[org.slf4j/slf4j-simple "1.7.21"]]
:jvm-opts ["-Dorg.slf4j.simpleLogger.defaultLogLevel=debug"]}
:uberjar {:dependencies [[ch.qos.logback/logback-classic "1.1.7"]]
:aot :all}})
173 changes: 173 additions & 0 deletions src/uswitch/big_replicate.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
(ns uswitch.big-replicate
(:require [gclouj.bigquery :as bq]
[clojure.string :as s]
[clojure.tools.cli :refer (parse-opts)]
[clojure.set :as se]
[clojure.tools.logging :refer (info error debug)]
[clojure.core.async :as a])
(:import [com.google.api.client.googleapis.json GoogleJsonResponseException])
(:gen-class))

(def cli [["-s" "--source-project PROJECT_ID" "Source Google Cloud Project"]
["-d" "--destination-project PROJECT_ID" "Destination Google Cloud Project"]
["-g" "--google-cloud-bucket BUCKET" "Staging bucket to store exported data"]
["-i" "--datasets DATASETS" "Comma-separated list of Dataset IDs to export"]
["-n" "--number NUMBER" "Number of days to look back for missing tables"
:default 7 :parse-fn #(Integer/parseInt %)]
["-a" "--number-of-agents NUMBER" "Number of concurrent replication agents to run."
:default (.availableProcessors (Runtime/getRuntime)) :parse-fn #(Integer/parseInt %)]
["-h" "--help" "Display summary"]])

(defrecord TableReference [project-id dataset-id table-id])

(defn sources
"Finds all tables in dataset to replicate"
[project-id datasets]
(let [service (bq/service {:project-id project-id})]
(->> (bq/datasets service)
(map :dataset-id)
(filter (fn [dataset]
(let [id (get-in dataset [:dataset-id])]
(some (set datasets) [id]))))
(mapcat (fn [dataset]
(bq/tables service dataset)))
(map :table-id))))

(defn sessions-sources
"Finds Google Analytics session source tables"
[project-id datasets]
(->> (sources project-id datasets)
(filter (fn [table]
(re-matches #"ga_sessions_\d+" (:table-id table))))
(map (fn [{:keys [project-id dataset-id table-id]}] (TableReference. project-id dataset-id table-id)))))

(defn staging-location [bucket {:keys [dataset-id table-id] :as table-reference}]
(format "%s/%s/%s/*" bucket dataset-id table-id))






(defn extract-table [{:keys [source-table staging-bucket] :as current-state}]
{:pre [(.startsWith staging-bucket "gs://")
(not (.endsWith staging-bucket "/"))]}
(let [uri (staging-location staging-bucket source-table)]
(info "starting extract for" source-table "into" uri)
(let [job (bq/extract-job (bq/service) source-table uri)]
(assoc current-state
:state :wait-for-extract
:extract-uri uri
:job job))))

(defn pending? [job]
(= :pending (get-in job [:status :state])))

(defn failed? [job]
(and (= :done (get-in job [:status :state]))
(not (empty? (get-in job [:status :errors])))))

(defn poll-job [job]
(let [{:keys [job-id]} job
job-state (bq/job (bq/service) job-id)]
(cond (pending? job-state) [:pending job-state]
(bq/running? job-state) [:running job-state]
(failed? job-state) [:failed job-state]
(bq/successful? job-state) [:successful job-state])))

(defn wait-for-job [next-state {:keys [job] :as current-state}]
(let [[status job] (poll-job job)]
(cond (= :failed status) (assoc current-state
:state :failed
:job job)
(= :successful status) (assoc current-state
:state next-state)
:else (do (debug "waiting for job" job)
(Thread/sleep 30000)
(assoc current-state :job job)))))

(def wait-for-extract (partial wait-for-job :load))
(def wait-for-load (partial wait-for-job :completed))

(defn load-table [{:keys [destination-table source-table extract-uri dataset-location] :as current-state}]
(let [service (bq/service)
table (bq/table service source-table)
schema (get-in table [:definition :schema])]
(let [job (bq/load-job (bq/service) destination-table {:create-disposition :needed
:write-disposition :empty
:schema schema} [extract-uri])]
(info "starting load into" destination-table)
(assoc current-state
:state :wait-for-load
:job job))))


(defn failed [current-state]
(error "failed" current-state))

(defn completed [{:keys [destination-table]}]
(info "successfully replicated" destination-table))

(defn progress [current-state]
(loop [current-state current-state]
(let [state (:state current-state)
op ({:extract extract-table
:wait-for-extract wait-for-extract
:failed failed
:load load-table
:wait-for-load wait-for-load
:completed completed} state)
next-state (try (op current-state)
(catch Exception ex
(assoc current-state
:state :failed
:exception ex)))]
(when next-state
(recur next-state)))))

(defn replicator-agent [in-ch completed-ch]
(a/thread
(loop [state (a/<!! in-ch)]
(when state
(progress state)
(a/>!! completed-ch (select-keys state [:source-table :destination-table]))
(recur (a/<!! in-ch))))))

(defn -main [& args]
(let [{:keys [options summary errors]} (parse-opts args cli)]
(when errors
(println errors)
(System/exit 1))
(when (:help options)
(println summary)
(System/exit 0))
(let [datasets (-> (:datasets options) (s/split #","))
sources (sessions-sources (:source-project options) datasets)
destinations (->> (sessions-sources (:destination-project options) datasets)
(map (fn [table]
(assoc table :project-id (:source-project options)))))
target (se/difference (set sources) (set destinations))
sorted-targets (->> target (sort-by :table-id) (reverse) (take (:number options)))
in-ch (a/chan)
completed-ch (a/chan)]
(a/thread
(doseq [t sorted-targets]
(let [{:keys [google-cloud-bucket destination-project dataset-location]} options
state {:source-table t
:destination-table (assoc t :project-id destination-project)
:staging-bucket google-cloud-bucket
:state :extract
:dataset-location dataset-location}]
(a/>!! in-ch state))))
(let [agents (:number-of-agents options)]
(info "creating" agents "replicator agents")
(dotimes [_ agents]
(replicator-agent in-ch completed-ch)))
(loop [n 1
m (a/<!! completed-ch)]
(let [expected-count (count sorted-targets)]
(when m
(info (format "%d/%d" n expected-count) "completed," m)
(if (= n expected-count)
(info "finished")
(recur (inc n) (a/<!! completed-ch)))))))))

0 comments on commit 5d3d2fa

Please sign in to comment.