Skip to content

Commit

Permalink
Merge 9cb47ec into 76602a4
Browse files Browse the repository at this point in the history
  • Loading branch information
rzats committed Nov 9, 2018
2 parents 76602a4 + 9cb47ec commit c794133
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 8 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ sudo: false

language: go
go:
- 1.8.5
- 1.9.2
- tip

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 0.8.0 (2018-11-08)
--------------------------
Add support for BigQuery (#92)

Version 0.7.1 (2018-10-15)
--------------------------
Fix non-default Snowflake region from being invalidated (#134)
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ Used with **[Snowplow][snowplow]** for scheduled SQL-based transformations of ev
Assuming you are running on **64bit Linux**:

```bash
> wget http://dl.bintray.com/snowplow/snowplow-generic/sql_runner_0.7.1_linux_amd64.zip
> unzip sql_runner_0.7.1_linux_amd64.zip
> wget http://dl.bintray.com/snowplow/snowplow-generic/sql_runner_0.8.0_linux_amd64.zip
> unzip sql_runner_0.8.0_linux_amd64.zip
> ./sql-runner -usage
```

Expand All @@ -38,7 +38,7 @@ limitations under the License.
[travis]: https://travis-ci.org/snowplow/sql-runner
[travis-image]: https://travis-ci.org/snowplow/sql-runner.png?branch=master

[release-image]: http://img.shields.io/badge/release-0.7.1-6ad7e5.svg?style=flat
[release-image]: http://img.shields.io/badge/release-0.8.0-6ad7e5.svg?style=flat
[releases]: https://github.com/snowplow/sql-runner/releases

[license-image]: http://img.shields.io/badge/license-Apache--2-blue.svg?style=flat
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.1
0.8.0
26 changes: 26 additions & 0 deletions config/bigquery.yml.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
targets:
- name: "My BigQuery database"
type: bigquery
project: ADD HERE # Project ID as shown in the GCP console's front page
# Service account credentials should also be implicitly provided: https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually
variables:
foo: bar
steps:
- name: ADD HERE
queries:
- name: ADD HERE
file: ADD REL/ABS PATH
template: true
- name: ADD HERE
queries:
- name: ADD HERE
file: ADD REL/ABS PATH
template: true
- name: ADD HERE
file: ADD REL/ABS PATH
template: true
- name: ADD HERE
queries:
- name: ADD HERE
file: ADD REL/ABS PATH
template: true
167 changes: 167 additions & 0 deletions sql_runner/bigquery_target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package main

import (
"fmt"
"log"
"os"
"strings"
bq "cloud.google.com/go/bigquery"
"golang.org/x/net/context"
"google.golang.org/api/iterator"
"github.com/olekukonko/tablewriter"
)

type BigQueryTarget struct {
Target
Client *bq.Client
}

func (bqt BigQueryTarget) IsConnectable() bool {
var err error = nil
ctx := context.Background()

client := bqt.Client
query := client.Query("SELECT 1") // empty query to test connection

it, err := query.Read(ctx)
if err != nil {
log.Printf("ERROR: Failed to perform test query: %v", err)
return false
}

var row []bq.Value
err = it.Next(&row)
if err != nil {
log.Printf("ERROR: Failed to read test query results: %v", err)
return false
}

return fmt.Sprint(row) == "[1]"
}

func NewBigQueryTarget(target Target) *BigQueryTarget {
projectID := target.Project
ctx := context.Background()

client, err := bq.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("ERROR: Failed to create client: %v", err)
}

return &BigQueryTarget{target,client}
}

func (bqt BigQueryTarget) GetTarget() Target {
return bqt.Target
}

// Run a query against the target
// One statement per API call
func (bqt BigQueryTarget) RunQuery(query ReadyQuery, dryRun bool, showQueryOutput bool) QueryStatus {
var affected int64 = 0
var err error = nil
var schema bq.Schema = nil
ctx := context.Background()

if dryRun {
if bqt.IsConnectable() {
log.Printf("SUCCESS: Able to connect to target database, %s.", bqt.Project)
} else {
log.Printf("ERROR: Cannot connect to target database, %s.", bqt.Project)
}
return QueryStatus{query, query.Path, 0, nil}
}

scripts := strings.Split(query.Script, ";")

for _, script := range scripts {
if len(strings.TrimSpace(script)) > 0 {
// If showing query output, perform a dry run to get column metadata
if showQueryOutput {
dq := bqt.Client.Query(script)
dq.DryRun = true
dqJob, err := dq.Run(ctx)
if err != nil {
log.Printf("ERROR: Failed to dry run job: %s.", err)
return QueryStatus{query, query.Path, int(affected), err}
}

schema = dqJob.LastStatus().Statistics.Details.(*bq.QueryStatistics).Schema
}

q := bqt.Client.Query(script)

job, err := q.Run(ctx)
if err != nil {
log.Printf("ERROR: Failed to run job: %s.", err)
return QueryStatus{query, query.Path, int(affected), err}
}

it, err := job.Read(ctx)
if err != nil {
log.Printf("ERROR: Failed to read job results: %s.", err)
return QueryStatus{query, query.Path, int(affected), err}
}

if showQueryOutput {
err = printBqTable(it, schema)
if err != nil {
log.Printf("ERROR: Failed to print output: %s.", err)
return QueryStatus{query, query.Path, int(affected), err}
}
} else {
queryStats := job.LastStatus().Statistics.Details.(*bq.QueryStatistics)
aff := queryStats.NumDMLAffectedRows
affected += aff
}
}
}

return QueryStatus{query, query.Path, int(affected), err}
}

func printBqTable(rows *bq.RowIterator, schema bq.Schema) error {
outputBuffer := make([][]string, 0, 10)

for {
var row []bq.Value
err := rows.Next(&row)
if err == iterator.Done {
break
}
if err != nil {
return err
}
outputBuffer = append(outputBuffer, bqStringify(row))
}

if len(outputBuffer) > 0 {
log.Printf("QUERY OUTPUT:\n")
table := tablewriter.NewWriter(os.Stdout)

// Get columns from table schema
columns := make([]string, len(schema))
for i, field := range schema {
columns[i] = field.Name
}
table.SetHeader(columns)

table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false})
table.SetCenterSeparator("|")

for _, row := range outputBuffer {
table.Append(row)
}

table.Render() // Send output
}
return nil
}

func bqStringify(row []bq.Value) []string {
var line []string
for _, element := range row {
line = append(line, fmt.Sprint(element))
}
return line
}
2 changes: 1 addition & 1 deletion sql_runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
const (
CLI_NAME = "sql-runner"
CLI_DESCRIPTION = `Run playbooks of SQL scripts in series and parallel on Redshift and Postgres`
CLI_VERSION = "0.7.1"
CLI_VERSION = "0.8.0"

SQLROOT_BINARY = "BINARY"
SQLROOT_PLAYBOOK = "PLAYBOOK"
Expand Down
4 changes: 2 additions & 2 deletions sql_runner/playbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type Playbook struct {

type Target struct {
Name, Type, Host, Database, Port, Username,
Password, Region, Account, Warehouse string
Ssl bool
Password, Region, Account, Warehouse, Project string
Ssl bool
}

type Step struct {
Expand Down
6 changes: 6 additions & 0 deletions sql_runner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
POSTGRES_TYPE = "postgres"
POSTGRESQL_TYPE = "postgresql"
SNOWFLAKE_TYPE = "snowflake"
BIGQUERY_TYPE = "bigquery"

ERROR_UNSUPPORTED_DB_TYPE = "Database type is unsupported"
ERROR_FROM_STEP_NOT_FOUND = "The fromStep argument did not match any available steps"
Expand Down Expand Up @@ -260,6 +261,11 @@ func routeAndRun(target Target, readySteps []ReadyStep, targetChan chan TargetSt
snfl := NewSnowflakeTarget(tgt)
targetChan <- runSteps(snfl, readySteps, dryRun, showQueryOutput)
}(target)
case BIGQUERY_TYPE:
go func(tgt Target) {
bq := NewBigQueryTarget(tgt)
targetChan <- runSteps(bq, readySteps, dryRun, showQueryOutput)
}(target)
default:
targetChan <- unsupportedDbType(target.Name, target.Type)
}
Expand Down

0 comments on commit c794133

Please sign in to comment.