Skip to content

Commit

Permalink
Support append mode query and use legacy SQL flag
Browse files Browse the repository at this point in the history
  • Loading branch information
hakobera committed Jun 12, 2016
1 parent 265f11a commit 13d02d0
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 1 deletion.
13 changes: 13 additions & 0 deletions examples/query_append.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
task :task1 do
requires :task2
run do
log input.table_name
end
end

task :task2, type: :bigquery_query do
param_set :query, "SELECT COUNT(*) AS cnt FROM [bigquery-public-data:samples.wikipedia]"
param_set :dataset_id, "test"
param_set :table_id, "dest_append"
param_set :mode, "append"
end
2 changes: 2 additions & 0 deletions lib/tumugi/plugin/bigquery/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def query(sql, mode: :truncate,
flatten_results: true,
priority: "INTERACTIVE",
use_query_cache: true,
use_legacy_sql: true,
user_defined_function_resources: nil,
project_id: nil,
job_project_id: nil,
Expand All @@ -191,6 +192,7 @@ def query(sql, mode: :truncate,
flatten_results: flatten_results,
priority: priority,
use_query_cache: use_query_cache,
use_legacy_sql: use_legacy_sql,
user_defined_function_resources: user_defined_function_resources,
project_id: project_id || @project_id,
job_project_id: job_project_id || @project_id,
Expand Down
21 changes: 20 additions & 1 deletion lib/tumugi/plugin/task/bigquery_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,38 @@ class BigqueryQueryTask < Tumugi::Task
param :project_id, type: :string
param :dataset_id, type: :string, required: true
param :table_id, type: :string, required: true
param :mode, type: :string, default: 'truncate' # append, empty
param :flatten_results, type: :bool, default: true
param :use_legacy_sql, type: :bool, default: true

param :wait, type: :int, default: 60

def output
@output ||= Tumugi::Plugin::BigqueryTableTarget.new(project_id: project_id, dataset_id: dataset_id, table_id: table_id)
end

def completed?
if mode.to_sym == :append && @state != :completed
false
else
super
end
end

def run
log "Launching Query"
log "Query: #{query}"
log "Query destination: #{output}"

bq_client = output.client
bq_client.query(query, project_id: project_id, dataset_id: output.dataset_id, table_id: output.table_id, wait: wait)
bq_client.query(query,
project_id: project_id,
dataset_id: output.dataset_id,
table_id: output.table_id,
mode: mode.to_sym,
flatten_results: flatten_results,
use_legacy_sql: use_legacy_sql,
wait: wait)
end
end
end
Expand Down
1 change: 1 addition & 0 deletions test/cli_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Tumugi::Plugin::Bigquery::CLITest < Test::Unit::TestCase
'copy' => ['copy.rb', 'task1'],
'dataset' => ['dataset.rb', 'task1'],
'query' => ['query.rb', 'task1'],
'query_append' => ['query_append.rb', 'task1'],
}

def invoke(file, task, options)
Expand Down
1 change: 1 addition & 0 deletions test/plugin/bigquery/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class Tumugi::Plugin::Bigquery::ClientTest < Test::Unit::TestCase
flatten_results: true,
priority: "INTERACTIVE",
use_query_cache: true,
use_legacy_sql: true,
user_defined_function_resources: nil,
project_id: credential[:project_id],
job_project_id: credential[:project_id],
Expand Down
43 changes: 43 additions & 0 deletions test/plugin/task/bigquery_query_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
class Tumugi::Plugin::BigqueryQueryTaskTest < Test::Unit::TestCase
include Tumugi::Plugin::BigqueryTestHelper

class ExistTarget < Tumugi::Target
def exist?; true;end
end

class NotExistTarget < Tumugi::Target
def exist?; false; end
end

setup do
@klass = Class.new(Tumugi::Plugin::BigqueryQueryTask)
@klass.param_set :query, "SELECT COUNT(*) AS cnt FROM [bigquery-public-data:samples.wikipedia]"
Expand All @@ -19,6 +27,9 @@ class Tumugi::Plugin::BigqueryQueryTaskTest < Test::Unit::TestCase
assert_equal(TEST_DATASETS[0], task.dataset_id)
assert_equal('test', task.table_id)
assert_equal(ENV['PROJECT_ID'], task.project_id)
assert_equal('truncate', task.mode)
assert_equal(true, task.flatten_results)
assert_equal(true, task.use_legacy_sql)
assert_equal(60, task.wait)
end

Expand All @@ -37,6 +48,24 @@ class Tumugi::Plugin::BigqueryQueryTaskTest < Test::Unit::TestCase
end
end

data({
"truncate mode with completed state and exist target" => ["truncate", ExistTarget, :completed, true],
"truncate mode with completed state and not exist target" => ["truncate", NotExistTarget, :completed, false],
"append mode with pending state and exist target" => ["append", ExistTarget, :pending, false],
"append mode with pending state and not exist target" => ["append", NotExistTarget, :pending, false],
"append mode with completed state and exist target" => ["append", ExistTarget, :completed, true],
"append mode with completed state and not exist target" => ["append", NotExistTarget, :completed, false],
})
test "#complted?" do |(mode, target_klass, state, expected)|
@klass.param_set :mode, mode
@klass.send(:define_method, :output) do
target_klass.new
end
task = @klass.new
task.state = state
assert_equal(expected, task.completed?)
end

test "#output" do
task = @klass.new
output = task.output
Expand All @@ -53,4 +82,18 @@ class Tumugi::Plugin::BigqueryQueryTaskTest < Test::Unit::TestCase
result = output.client.list_tabledata(output.dataset_id, task.table_id)
assert_equal({:total_rows=>1, :next_token=>nil, :rows=>[{"cnt"=>"313797035"}]}, result)
end

test "#run with append mode" do
@klass.param_set :table_id, 'test_append'
@klass.param_set :mode, 'append'
task = @klass.new
output = task.output
task.run
result = output.client.list_tabledata(output.dataset_id, task.table_id)
assert_equal({:total_rows=>1, :next_token=>nil, :rows=>[{"cnt"=>"313797035"}]}, result)

task.run
result = output.client.list_tabledata(output.dataset_id, task.table_id)
assert_equal({:total_rows=>2, :next_token=>nil, :rows=>[{"cnt"=>"313797035"}, {"cnt"=>"313797035"}]}, result)
end
end

0 comments on commit 13d02d0

Please sign in to comment.