Skip to content

Commit

Permalink
Implement load task from Google Cloud Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
hakobera committed May 31, 2016
1 parent 0078167 commit 00cdbd3
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 18 deletions.
24 changes: 24 additions & 0 deletions examples/load.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
task :task1, type: :bigquery_load do
requires :task2
param_set :bucket, 'tumugi-plugin-bigquery'
param_set :key, 'test.csv'
param_set :dataset_id, -> { input.dataset_id }
param_set :table_id, 'load_test'
param_set :skip_leading_rows, 1
param_set :schema, [
{
name: 'row_number',
type: 'INTEGER',
mode: 'NULLABLE'
},
{
name: 'value',
type: 'INTEGER',
mode: 'NULLABLE'
},
]
end

task :task2, type: :bigquery_dataset do
param_set :dataset_id, 'test'
end
6 changes: 6 additions & 0 deletions examples/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
row_number,value
1,1
2,2
3,3
4,4
5,5
39 changes: 21 additions & 18 deletions lib/tumugi/plugin/task/bigquery_load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@ module Plugin
class BigqueryLoadTask < Tumugi::Task
Tumugi::Plugin.register_task('bigquery_load', self)

param :bucket, type: :string, required: true, auto_bind: false
param :key, type: :string, required: true, auto_bind: false
param :project_id, type: :string, auto_bind: false
param :dataset_id, type: :string, required: true, auto_bind: false
param :table_id, type: :string, required: true, auto_bind: false
param :bucket, type: :string, required: true
param :key, type: :string, required: true
param :project_id, type: :string
param :dataset_id, type: :string, required: true
param :table_id, type: :string, required: true

param :schema, auto_bind: false # type: :array
param :field_delimiter, type: :string, auto_bind: false, default: ','
param :mode, type: :string, auto_bind: false, default: 'append' # truncate, empty
param :allow_jagged_rows, type: :bool, auto_bind: false, default: false
param :max_bad_records, type: :integer, auto_bind: false, default: 0
param :ignore_unknown_values, type: :bool, auto_bind: false, default: false
param :allow_quoted_newlines, type: :bool, auto_bind: false, default: false
param :quote, type: :string, auto_bind: false, default: '"'
param :skip_leading_rows, type: :interger, auto_bind: false, default: 0
param :source_format, type: :string, auto_bind: false, default: 'CSV' # NEWLINE_DELIMITED_JSON, AVRO
param :wait, type: :integer, auto_bind: false, default: 60
param :schema # type: :array
param :field_delimiter, type: :string, default: ','
param :mode, type: :string, default: 'append' # truncate, empty
param :allow_jagged_rows, type: :bool, default: false
param :max_bad_records, type: :integer, default: 0
param :ignore_unknown_values, type: :bool, default: false
param :allow_quoted_newlines, type: :bool, default: false
param :quote, type: :string, default: '"'
param :skip_leading_rows, type: :interger, default: 0
param :source_format, type: :string, default: 'CSV' # NEWLINE_DELIMITED_JSON, AVRO
param :wait, type: :integer, default: 60

def output
opts = { dataset_id: dataset_id, table_id: table_id }
Expand All @@ -35,8 +35,11 @@ def run
raise Tumugi::ParameterError.new("Parameter 'schema' is required when 'mode' is 'truncate' or 'empty'") if schema.nil?
end

key = "/#{key}" unless key.start_with?('/')
source_uri = "gs://#{bucket}#{key}"
object_id = key
unless object_id.start_with?('/')
object_id = "/#{key}"
end
source_uri = "gs://#{bucket}#{object_id}"
log "Source: #{source_uri}"
log "Destination: #{output}"

Expand Down
80 changes: 80 additions & 0 deletions test/plugin/task/bigquery_load_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
require_relative '../../test_helper'
require 'tumugi/plugin/task/bigquery_load'

class Tumugi::Plugin::BigqueryLoadTaskTest < Test::Unit::TestCase
include Tumugi::Plugin::BigqueryTestHelper

setup do
@klass = Class.new(Tumugi::Plugin::BigqueryLoadTask)
@klass.param_set :bucket, 'tumugi-plugin-bigquery'
@klass.param_set :key, 'test.csv'
@klass.param_set :dataset_id, Tumugi::Plugin::BigqueryTestHelper::TEST_DATASETS[0]
@klass.param_set :table_id, 'load_test'
@klass.param_set :skip_leading_rows, 1
@klass.param_set :schema, [
{
name: 'row_number',
type: 'INTEGER',
mode: 'NULLABLE'
},
{
name: 'value',
type: 'INTEGER',
mode: 'NULLABLE'
},
]
end

sub_test_case "parameters" do
test "should set correctly" do
task = @klass.new
assert_equal('tumugi-plugin-bigquery', task.bucket)
assert_equal('test.csv', task.key)
assert_equal(nil, task.project_id)
assert_equal(TEST_DATASETS[0], task.dataset_id)
assert_equal('load_test', task.table_id)
assert_equal(60, task.wait)
end

data({
"bucket" => [:bucket],
"key" => [:key],
"dataset_id" => [:dataset_id],
"table_id" => [:table_id],
})
test "raise error when required parameter is not set" do |params|
params.each do |param|
@klass.param_set(param, nil)
end
assert_raise(Tumugi::ParameterError) do
@klass.new
end
end
end

test "#output" do
task = @klass.new
output = task.output
assert_true(output.is_a? Tumugi::Plugin::BigqueryTableTarget)
assert_equal(ENV['PROJECT_ID'], output.project_id)
assert_equal(Tumugi::Plugin::BigqueryTestHelper::TEST_DATASETS[0], output.dataset_id)
assert_equal('load_test', output.table_id)
end

test "#run" do
task = @klass.new
output = task.output
task.run
result = output.client.list_tabledata(task.dataset_id, task.table_id, project_id: task.project_id)
assert_equal(5, result[:total_rows])

expected = [
{"row_number"=>"1", "value"=>"1"},
{"row_number"=>"2", "value"=>"2"},
{"row_number"=>"3", "value"=>"3"},
{"row_number"=>"4", "value"=>"4"},
{"row_number"=>"5", "value"=>"5"}
]
assert_equal(expected, result[:rows])
end
end

0 comments on commit 00cdbd3

Please sign in to comment.