forked from alienrobotwizard/swineherd
/
s3filesystem.rb
287 lines (252 loc) · 7.4 KB
/
s3filesystem.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
require 'tempfile'
module Swineherd
#
# Methods for interacting with Amazon's Simple Store Service (s3).
#
class S3FileSystem
include Swineherd::BaseFileSystem
attr_accessor :s3
#
# Initialize a new s3 file system, needs path to aws keys
#
def initialize aws_access_key_id, aws_secret_access_key
require 'right_aws'
@s3 = RightAws::S3.new(aws_access_key_id, aws_secret_access_key)
end
def open path, mode="r", &blk
S3File.new(path,mode,self,&blk)
end
def size path
sz = 0
if type(path) == "directory"
lr(path).each do |f|
sz += file_size(f)
end
else
sz += file_size(path)
end
sz
end
def file_size path
containing_bucket = bucket(path)
header = @s3.interface.head(containing_bucket, key_path(path))
header['content-length'].to_i
end
def rm path
bkt = bucket(path)
key = key_path(path)
if key.empty? # only the bucket was passed in, delete it
@s3.interface.force_delete_bucket(bkt)
else
case type(path)
when "directory" then
keys_to_delete = lr(path)
keys_to_delete.each do |k|
key_to_delete = key_path(k)
@s3.interface.delete(bkt, key_to_delete)
end
keys_to_delete
when "file" then
@s3.interface.delete(bkt, key)
[path]
end
end
end
def bucket path
uri = URI.parse(path)
uri.path.split('/').reject{|x| x.empty?}.first
end
def key_path path
uri = URI.parse(path)
File.join(uri.path.split('/').reject{|x| x.empty?}[1..-1])
end
def needs_trailing_slash pre
has_trailing_slash = pre.end_with? '/'
is_empty_prefix = pre.empty?
!(has_trailing_slash || is_empty_prefix)
end
def full_contents path
bkt = bucket(path)
pre = key_path(path)
pre += '/' if needs_trailing_slash(pre)
contents = []
s3.interface.incrementally_list_bucket(bkt, {'prefix' => pre, 'delimiter' => '/'}) do |res|
contents += res[:common_prefixes].map{|c| File.join(bkt,c)}
contents += res[:contents].map{|c| File.join(bkt, c[:key])}
end
contents
end
def exists? path
object = File.basename(path)
search_dir = File.dirname(path)
case search_dir
when '.' then # only a bucket was passed in
begin
(full_contents(object).size > 0)
rescue RightAws::AwsError => e
if e.message =~ /nosuchbucket/i
false
else
raise e
end
end
else
search_dir_contents = full_contents(search_dir).map{|c| File.basename(c).gsub(/\//, '')}
search_dir_contents.include?(object)
end
end
def mv srcpath, dstpath
src_bucket = bucket(srcpath)
dst_bucket = bucket(dstpath)
dst_key_path = key_path(dstpath)
mkpath(dstpath)
case type(srcpath)
when "directory" then
paths_to_copy = lr(srcpath)
common_dir = common_directory(paths_to_copy)
paths_to_copy.each do |path|
src_key = key_path(path)
dst_key = File.join(dst_key_path, path.gsub(common_dir, ''))
@s3.interface.move(src_bucket, src_key, dst_bucket, dst_key)
end
when "file" then
@s3.interface.move(src_bucket, key_path(srcpath), dst_bucket, dst_key_path)
end
end
def cp srcpath, dstpath
src_bucket = bucket(srcpath)
dst_bucket = bucket(dstpath)
dst_key_path = key_path(dstpath)
mkpath(dstpath)
case type(srcpath)
when "directory" then
paths_to_copy = lr(srcpath)
common_dir = common_directory(paths_to_copy)
paths_to_copy.each do |path|
src_key = key_path(path)
dst_key = File.join(dst_key_path, path.gsub(common_dir, ''))
@s3.interface.copy(src_bucket, src_key, dst_bucket, dst_key)
end
when "file" then
@s3.interface.copy(src_bucket, key_path(srcpath), dst_bucket, dst_key_path)
end
end
#
# This is a bit funny, there's actually no need to create a 'path' since
# s3 is nothing more than a glorified key-value store. When you create a
# 'file' (key) the 'path' will be created for you. All we do here is create
# the bucket unless it already exists.
#
def mkpath path
bkt = bucket(path)
key = key_path(path)
if key.empty?
@s3.interface.create_bucket(bkt)
else
@s3.interface.create_bucket(bkt) unless exists? bkt
end
path
end
def type path
return "unknown" unless exists? path
return "directory" if full_contents(path).size > 0
"file"
end
def entries dirpath
return unless type(dirpath) == "directory"
full_contents(dirpath)
end
# Recursively list paths
def lr path
paths = entries(path)
if paths
paths.map{|e| lr(e)}.flatten
else
path
end
end
#
# Ick.
#
def common_directory paths
dirs = paths.map{|path| path.split('/')}
min_size = dirs.map{|splits| splits.size}.min
dirs.map!{|splits| splits[0...min_size]}
uncommon_idx = dirs.transpose.each_with_index.find{|dirnames, idx| dirnames.uniq.length > 1}.last
dirs[0][0...uncommon_idx].join('/')
end
def put srcpath, destpath
dest_bucket = bucket(destpath)
if File.directory? srcpath
else
key = srcpath
end
@s3.interface.put(dest_path, key, File.open(srcpath))
end
def close *args
end
def put srcpath, destpath
dest_bucket = bucket(destpath)
if File.directory? srcpath
else
key = srcpath
end
@s3.interface.put(dest_path, key, File.open(srcpath))
end
class S3File
attr_accessor :path, :handle, :fs
#
# In order to open input and output streams we must pass around the s3 fs object itself
#
def initialize path, mode, fs, &blk
@fs = fs
@path = path
case mode
when "r" then
raise "#{fs.type(path)} is not a readable file - #{path}" unless fs.type(path) == "file"
when "w" then
raise "Path #{path} is a directory." unless (fs.type(path) == "file") || (fs.type(path) == "unknown")
@handle = Tempfile.new('s3filestream')
if block_given?
yield self
close
end
end
end
#
# Faster than iterating
#
def read
resp = fs.s3.interface.get_object(fs.bucket(path), fs.key_path(path))
resp
end
#
# This is a little hackety. That is, once you call (.each) on the object the full object starts
# downloading...
#
def readline
@handle ||= fs.s3.interface.get_object(fs.bucket(path), fs.key_path(path)).each
begin
@handle.next
rescue StopIteration, NoMethodError
@handle = nil
raise EOFError.new("end of file reached")
end
end
def write string
@handle.write(string)
end
def puts string
write(string+"\n")
end
def close
if @handle
@handle.read
fs.s3.interface.put(fs.bucket(path), fs.key_path(path), File.open(@handle.path, 'r'))
@handle.close
end
@handle = nil
end
end
end
end