Skip to content

Commit

Permalink
Refactoring API client
Browse files Browse the repository at this point in the history
  • Loading branch information
hakobera committed Jun 4, 2016
1 parent e0e0f99 commit e206160
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 39 deletions.
6 changes: 3 additions & 3 deletions lib/tumugi/plugin/google_cloud_storage/atomic_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ module Tumugi
module Plugin
module GoogleCloudStorage
class AtomicFile < Tumugi::AtomicFile
def initialize(path, client)
def initialize(path, fs)
super(path)
@client = client
@fs = fs
end

def move_to_final_destination(temp_file)
@client.upload(temp_file, path)
@fs.upload(temp_file, path)
end
end
end
Expand Down
78 changes: 42 additions & 36 deletions lib/tumugi/plugin/google_cloud_storage/file_system.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'uri'
require 'json'
require 'googleauth/service_account'
require 'google/apis/storage_v1'
require 'google/apis/drive_v3'
require 'tumugi/file_system'

module Tumugi
Expand All @@ -10,7 +12,7 @@ class FileSystem < Tumugi::FileSystem
attr_reader :client

def initialize(config)
@client = create_client(config)
save_config(config)
end

#######################################################################
Expand All @@ -33,16 +35,16 @@ def remove(path, recursive: true)
raise Tumugi::FileSystemError.new("Cannot delete root of bucket at path '#{path}'") if root?(key)

if obj_exist?(bucket, key)
@client.delete_object(bucket, key, options: default_request_options)
client.delete_object(bucket, key, options: request_options)
wait_until { !obj_exist?(bucket, key) }
true
elsif directory?(path)
raise Tumugi::FileSystemError.new("Path '#{path}' is a directory. Must use recursive delete") if !recursive

objs = entries(path).map(&:name)
@client.batch do |client|
client.batch do |client|
objs.each do |obj|
client.delete_object(bucket, obj, options: default_request_options)
client.delete_object(bucket, obj, options: request_options)
end
end
wait_until { !directory?(path) }
Expand Down Expand Up @@ -80,7 +82,7 @@ def directory?(path)
true
else
# Any objects with this prefix
objects = @client.list_objects(bucket, prefix: obj, max_results: 20, options: default_request_options)
objects = client.list_objects(bucket, prefix: obj, max_results: 20, options: request_options)
!!(objects.items && objects.items.size > 0)
end
end
Expand All @@ -95,7 +97,7 @@ def entries(path)
next_page_token = ''

until next_page_token.nil?
objects = @client.list_objects(bucket, prefix: obj, page_token: next_page_token, options: default_request_options)
objects = client.list_objects(bucket, prefix: obj, page_token: next_page_token, options: request_options)
if objects && objects.items
results.concat(objects.items)
next_page_token = objects.next_page_token
Expand All @@ -120,7 +122,7 @@ def move(src_path, dest_path, raise_if_exist: false)
def upload(media, path, content_type: nil)
bucket, key = path_to_bucket_and_key(path)
obj = Google::Apis::StorageV1::Object.new(bucket: bucket, name: key)
@client.insert_object(bucket, obj, upload_source: media, content_type: content_type, options: default_request_options)
client.insert_object(bucket, obj, upload_source: media, content_type: content_type, options: request_options)
wait_until { obj_exist?(bucket, key) }
rescue
process_error($!)
Expand All @@ -131,7 +133,7 @@ def download(path, download_path: nil, mode: 'r', &block)
if download_path.nil?
download_path = Tempfile.new('tumugi_gcs_file_system').path
end
@client.get_object(bucket, key, download_dest: download_path, options: default_request_options)
client.get_object(bucket, key, download_dest: download_path, options: request_options)
wait_until { File.exist?(download_path) }

if block_given?
Expand Down Expand Up @@ -164,13 +166,13 @@ def copy(src_path, dest_path, raise_if_exist: false)
copied_objs = []
entries(src_path).each do |entry|
suffix = entry.name[src_prefix.length..-1]
@client.copy_object(src_bucket, src_prefix + suffix,
dest_bucket, dest_prefix + suffix, options: default_request_options)
client.copy_object(src_bucket, src_prefix + suffix,
dest_bucket, dest_prefix + suffix, options: request_options)
copied_objs << (dest_prefix + suffix)
end
wait_until { copied_objs.all? {|obj| obj_exist?(dest_bucket, obj)} }
else
@client.copy_object(src_bucket, src_key, dest_bucket, dest_key, options: default_request_options)
client.copy_object(src_bucket, src_key, dest_bucket, dest_key, options: request_options)
wait_until { obj_exist?(dest_bucket, dest_key) }
end
rescue
Expand All @@ -186,7 +188,7 @@ def path_to_bucket_and_key(path)
def create_bucket(bucket)
unless bucket_exist?(bucket)
b = Google::Apis::StorageV1::Bucket.new(name: bucket)
@client.insert_bucket(@project_id, b, options: default_request_options)
client.insert_bucket(@project_id, b, options: request_options)
true
else
false
Expand All @@ -197,7 +199,7 @@ def create_bucket(bucket)

def remove_bucket(bucket)
if bucket_exist?(bucket)
@client.delete_bucket(bucket, options: default_request_options)
client.delete_bucket(bucket, options: request_options)
true
else
false
Expand All @@ -207,7 +209,7 @@ def remove_bucket(bucket)
end

def bucket_exist?(bucket)
@client.get_bucket(bucket, options: default_request_options)
client.get_bucket(bucket, options: request_options)
true
rescue => e
return false if e.status_code == 404
Expand All @@ -217,7 +219,7 @@ def bucket_exist?(bucket)
private

def obj_exist?(bucket, key)
@client.get_object(bucket, key, options: default_request_options)
client.get_object(bucket, key, options: request_options)
true
rescue => e
return false if e.status_code == 404
Expand All @@ -236,7 +238,7 @@ def add_path_delimiter(key)
end
end

def create_client(config)
def save_config(config)
if config.private_key_file.nil?
@project_id = config.project_id
client_email = config.client_email
Expand All @@ -247,35 +249,39 @@ def create_client(config)
client_email = json['client_email']
private_key = json['private_key']
end
@key = {
client_email: client_email,
private_key: private_key
}
end

def client
return @cached_client if @cached_client && @cached_client_expiration > Time.now

# https://cloud.google.com/storage/docs/authentication
scope = "https://www.googleapis.com/auth/devstorage.read_write"

if client_email and private_key
auth = Signet::OAuth2::Client.new(
token_credential_uri: "https://accounts.google.com/o/oauth2/token",
audience: "https://accounts.google.com/o/oauth2/token",
scope: scope,
issuer: client_email,
signing_key: OpenSSL::PKey.read(private_key))
# MEMO: signet-0.6.1 depend on Farady.default_connection
Faraday.default_connection.options.timeout = 60
auth.fetch_access_token!
client = Google::Apis::StorageV1::StorageService.new
scope = Google::Apis::StorageV1::AUTH_DEVSTORAGE_READ_WRITE

if @key[:client_email] and @key[:private_key]
options = {
json_key_io: StringIO.new(JSON.generate(@key)),
scope: scope
}
auth = Google::Auth::ServiceAccountCredentials.make_creds(options)
else
auth = Google::Auth.get_application_default([scope])
auth.fetch_access_token!
end

client = Google::Apis::StorageV1::StorageService.new
auth.fetch_access_token!
client.authorization = auth
client

@cached_client_expiration = Time.now + (auth.expires_in / 2)
@cached_client = client
end

def default_request_options
Google::Apis::RequestOptions.new({
def request_options
{
retries: 5,
timeout_sec: 60
})
}
end

def wait_until(&block)
Expand Down

0 comments on commit e206160

Please sign in to comment.