Skip to content

Commit

Permalink
WIP BigQuery target
Browse files Browse the repository at this point in the history
  • Loading branch information
rzats committed Oct 18, 2018
1 parent 76602a4 commit d187f17
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 2 deletions.
116 changes: 116 additions & 0 deletions sql_runner/bigquery_target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package main

import (
"fmt"
"log"
"strings"

// Imports the Google Cloud BigQuery client package.
bq "cloud.google.com/go/bigquery"
"golang.org/x/net/context"
"google.golang.org/api/iterator"
)

type BigQueryTarget struct {
Target
Client *bq.Client
}

func (bqt BigQueryTarget) IsConnectable() bool {
var err error = nil
ctx := context.Background()

client := bqt.Client
query := client.Query("SELECT 1") // empty query to test connection

it, err := query.Read(ctx)
if err != nil {
log.Printf("ERROR: Failed to perform test query: %v", err)
return false
}

var row []bq.Value
err = it.Next(&row)
if err != nil {
log.Printf("ERROR: Failed to read test query results: %v", err)
return false
}

return fmt.Sprint(row) == "[1]"
}

func NewBigQueryTarget(target Target) *BigQueryTarget {
projectID := target.Project
ctx := context.Background()

client, err := bq.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("ERROR: Failed to create client: %v", err)
}

return &BigQueryTarget{target,client}
}

func (bqt BigQueryTarget) GetTarget() Target {
return bqt.Target
}

// Run a query against the target
// One statement per API call
func (bqt BigQueryTarget) RunQuery(query ReadyQuery, dryRun bool, showQueryOutput bool) QueryStatus {
var affected int64 = 0
var err error = nil
ctx := context.Background()

if dryRun {
if bqt.IsConnectable() {
log.Printf("SUCCESS: Able to connect to target database, %s.", bqt.Project)
} else {
log.Printf("ERROR: Cannot connect to target database, %s.", bqt.Project)
}
return QueryStatus{query, query.Path, 0, nil}
}

scripts := strings.Split(query.Script, ";")

for _, script := range scripts {
if len(strings.TrimSpace(script)) > 0 {
q := bqt.Client.Query(script)
it, err := q.Read(ctx)

if err != nil {
log.Printf("ERROR: %s.", err)
return QueryStatus{query, query.Path, int(affected), err}
}

if showQueryOutput {
// TODO: replace with table output
for {
var values []bq.Value
err := it.Next(&values)
if err == iterator.Done {
break
}
if err != nil {
log.Printf("ERROR: %s.", err)
return QueryStatus{query, query.Path, int(affected), err}
}
fmt.Println(values)
}
} else {
jobId := q.JobID
job, err := bqt.Client.JobFromID(ctx, jobId)

if err != nil {
log.Printf("ERROR: %s.", err)
return QueryStatus{query, query.Path, int(affected), err}
}

aff := job.LastStatus().Statistics.Details.(*bq.QueryStatistics).NumDMLAffectedRows
affected += aff
}
}
}

return QueryStatus{query, query.Path, int(affected), err}
}
4 changes: 2 additions & 2 deletions sql_runner/playbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type Playbook struct {

type Target struct {
Name, Type, Host, Database, Port, Username,
Password, Region, Account, Warehouse string
Ssl bool
Password, Region, Account, Warehouse, Project string
Ssl bool
}

type Step struct {
Expand Down
6 changes: 6 additions & 0 deletions sql_runner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
POSTGRES_TYPE = "postgres"
POSTGRESQL_TYPE = "postgresql"
SNOWFLAKE_TYPE = "snowflake"
BIGQUERY_TYPE = "bigquery"

ERROR_UNSUPPORTED_DB_TYPE = "Database type is unsupported"
ERROR_FROM_STEP_NOT_FOUND = "The fromStep argument did not match any available steps"
Expand Down Expand Up @@ -260,6 +261,11 @@ func routeAndRun(target Target, readySteps []ReadyStep, targetChan chan TargetSt
snfl := NewSnowflakeTarget(tgt)
targetChan <- runSteps(snfl, readySteps, dryRun, showQueryOutput)
}(target)
case BIGQUERY_TYPE:
go func(tgt Target) {
bq := NewBigQueryTarget(tgt)
targetChan <- runSteps(bq, readySteps, dryRun, showQueryOutput)
}(target)
default:
targetChan <- unsupportedDbType(target.Name, target.Type)
}
Expand Down

0 comments on commit d187f17

Please sign in to comment.