Skip to content

Commit

Permalink
WIP BigQuery target
Browse files Browse the repository at this point in the history
  • Loading branch information
rzats committed Oct 18, 2018
1 parent 76602a4 commit 7bfd3a0
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 1 deletion.
79 changes: 79 additions & 0 deletions sql_runner/bigquery_target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"fmt"
"google.golang.org/api/option"
"log"

// Imports the Google Cloud BigQuery client package.
bq "cloud.google.com/go/bigquery"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
)

type BigQueryTarget struct {
Target
Client *bq.Client
}

func (bqt BigQueryTarget) IsConnectable() bool {
ctx := context.Background()
client := bqt.Client
query := client.Query("SELECT 1") // empty query to test connection
it, err := query.Read(ctx)
if err != nil {
log.Printf("Error while performing test query: %v", err)
return false
}
var row []bq.Value
itErr := it.Next(&row)
if itErr != nil {
log.Printf("Error while reading test query results: %v", itErr)
return false
}
return fmt.Sprint(row) == "[1]"
}

func NewBigQueryTarget(target Target) *BigQueryTarget {
projectID := target.Project
ctx := context.Background()

creds, err := google.CredentialsFromJSON(ctx, []byte(target.Credentials), bq.Scope)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}

client, err := bq.NewClient(ctx, projectID, option.WithCredentials(creds))
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}

return &BigQueryTarget{target,client}
}

func (bqt BigQueryTarget) GetTarget() Target {
return bqt.Target
}

func (bqt BigQueryTarget) RunQuery(query ReadyQuery, dryRun bool, showQueryOutput bool) QueryStatus {
var err error = nil
ctx := context.Background()
if dryRun {
if bqt.IsConnectable() {
log.Printf("SUCCESS: Able to connect to target database.")
} else {
log.Printf("ERROR: Cannot connect to target database.")
}
return QueryStatus{query, query.Path, 0, nil}
}

affected := 0 // TODO are affected rows returned by the client library?
q := bqt.Client.Query(query.Script)
it, err := q.Read(ctx)
if err != nil {
log.Printf("ERROR: %s.", err)
return QueryStatus{query, query.Path, int(affected), err}
}
println(it)
return QueryStatus{query, query.Path, affected, err}
}
3 changes: 2 additions & 1 deletion sql_runner/playbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type Playbook struct {

type Target struct {
Name, Type, Host, Database, Port, Username,
Password, Region, Account, Warehouse string
Password, Region, Account, Warehouse,
Project, Credentials string
Ssl bool
}

Expand Down
6 changes: 6 additions & 0 deletions sql_runner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
POSTGRES_TYPE = "postgres"
POSTGRESQL_TYPE = "postgresql"
SNOWFLAKE_TYPE = "snowflake"
BIGQUERY_TYPE = "bigquery"

ERROR_UNSUPPORTED_DB_TYPE = "Database type is unsupported"
ERROR_FROM_STEP_NOT_FOUND = "The fromStep argument did not match any available steps"
Expand Down Expand Up @@ -260,6 +261,11 @@ func routeAndRun(target Target, readySteps []ReadyStep, targetChan chan TargetSt
snfl := NewSnowflakeTarget(tgt)
targetChan <- runSteps(snfl, readySteps, dryRun, showQueryOutput)
}(target)
case BIGQUERY_TYPE:
go func(tgt Target) {
bq := NewBigQueryTarget(tgt)
targetChan <- runSteps(bq, readySteps, dryRun, showQueryOutput)
}(target)
default:
targetChan <- unsupportedDbType(target.Name, target.Type)
}
Expand Down

0 comments on commit 7bfd3a0

Please sign in to comment.