diff --git a/sql_runner/bigquery_target.go b/sql_runner/bigquery_target.go new file mode 100644 index 0000000..3ac5497 --- /dev/null +++ b/sql_runner/bigquery_target.go @@ -0,0 +1,116 @@ +package main + +import ( + "fmt" + "log" + "strings" + + // Imports the Google Cloud BigQuery client package. + bq "cloud.google.com/go/bigquery" + "golang.org/x/net/context" + "google.golang.org/api/iterator" +) + +type BigQueryTarget struct { + Target + Client *bq.Client +} + +func (bqt BigQueryTarget) IsConnectable() bool { + var err error = nil + ctx := context.Background() + + client := bqt.Client + query := client.Query("SELECT 1") // empty query to test connection + + it, err := query.Read(ctx) + if err != nil { + log.Printf("ERROR: Failed to perform test query: %v", err) + return false + } + + var row []bq.Value + err = it.Next(&row) + if err != nil { + log.Printf("ERROR: Failed to read test query results: %v", err) + return false + } + + return fmt.Sprint(row) == "[1]" +} + +func NewBigQueryTarget(target Target) *BigQueryTarget { + projectID := target.Project + ctx := context.Background() + + client, err := bq.NewClient(ctx, projectID) + if err != nil { + log.Fatalf("ERROR: Failed to create client: %v", err) + } + + return &BigQueryTarget{target,client} +} + +func (bqt BigQueryTarget) GetTarget() Target { + return bqt.Target +} + +// Run a query against the target +// One statement per API call +func (bqt BigQueryTarget) RunQuery(query ReadyQuery, dryRun bool, showQueryOutput bool) QueryStatus { + var affected int64 = 0 + var err error = nil + ctx := context.Background() + + if dryRun { + if bqt.IsConnectable() { + log.Printf("SUCCESS: Able to connect to target database, %s.", bqt.Project) + } else { + log.Printf("ERROR: Cannot connect to target database, %s.", bqt.Project) + } + return QueryStatus{query, query.Path, 0, nil} + } + + scripts := strings.Split(query.Script, ";") + + for _, script := range scripts { + if len(strings.TrimSpace(script)) > 0 { + q := bqt.Client.Query(script) + it, err := q.Read(ctx) + + if err != nil { + log.Printf("ERROR: %s.", err) + return QueryStatus{query, query.Path, int(affected), err} + } + + if showQueryOutput { + // TODO: replace with table output + for { + var values []bq.Value + err := it.Next(&values) + if err == iterator.Done { + break + } + if err != nil { + log.Printf("ERROR: %s.", err) + return QueryStatus{query, query.Path, int(affected), err} + } + fmt.Println(values) + } + } else { + jobId := q.JobID + job, err := bqt.Client.JobFromID(ctx, jobId) + + if err != nil { + log.Printf("ERROR: %s.", err) + return QueryStatus{query, query.Path, int(affected), err} + } + + aff := job.LastStatus().Statistics.Details.(*bq.QueryStatistics).NumDMLAffectedRows + affected += aff + } + } + } + + return QueryStatus{query, query.Path, int(affected), err} +} \ No newline at end of file diff --git a/sql_runner/playbook.go b/sql_runner/playbook.go index e1d4ab4..1beee63 100644 --- a/sql_runner/playbook.go +++ b/sql_runner/playbook.go @@ -21,8 +21,8 @@ type Playbook struct { type Target struct { Name, Type, Host, Database, Port, Username, - Password, Region, Account, Warehouse string - Ssl bool + Password, Region, Account, Warehouse, Project string + Ssl bool } type Step struct { diff --git a/sql_runner/run.go b/sql_runner/run.go index 4d12a73..0cb9b49 100644 --- a/sql_runner/run.go +++ b/sql_runner/run.go @@ -24,6 +24,7 @@ const ( POSTGRES_TYPE = "postgres" POSTGRESQL_TYPE = "postgresql" SNOWFLAKE_TYPE = "snowflake" + BIGQUERY_TYPE = "bigquery" ERROR_UNSUPPORTED_DB_TYPE = "Database type is unsupported" ERROR_FROM_STEP_NOT_FOUND = "The fromStep argument did not match any available steps" @@ -260,6 +261,11 @@ func routeAndRun(target Target, readySteps []ReadyStep, targetChan chan TargetSt snfl := NewSnowflakeTarget(tgt) targetChan <- runSteps(snfl, readySteps, dryRun, showQueryOutput) }(target) + case BIGQUERY_TYPE: + go func(tgt Target) { + bq := NewBigQueryTarget(tgt) + targetChan <- runSteps(bq, readySteps, dryRun, showQueryOutput) + }(target) default: targetChan <- unsupportedDbType(target.Name, target.Type) }