-
Notifications
You must be signed in to change notification settings - Fork 8
/
aws_sdk_s3.rb
77 lines (68 loc) · 2.04 KB
/
aws_sdk_s3.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
module Filey
module DataSources
class AwsSdkS3 < DataSource
def initialize(s3_bucket, config = { :concurrency_level => DEFAULT_CONCURRENCY_LEVEL }, &block)
@s3_bucket = s3_bucket
@config = config
@when_filey_loaded = lambda { |filey| block.call filey } if block
end
private
DEFAULT_CONCURRENCY_LEVEL = 3
def do_internal_load
fileys = []
map_to_filey = lambda { |s3_object|
fileys << map_s3_object_to_filey(s3_object)
}
in_parallel_or_sequentially map_to_filey
fileys
end
def in_parallel_or_sequentially(operation_on_s3_object)
jobs = @s3_bucket.objects.map { |s3_object|
lambda {
operation_on_s3_object.call s3_object
}
}
if ENV['disable_parallel_processing']
jobs.each(&:call)
else
jobs.each_slice(slice_size) { |jobs|
threads = jobs.map { |job|
Thread.new {
job.call
}
}
threads.each(&:join)
}
end
end
def slice_size
slice_size_from_cfg = @config[:concurrency_level] || @config['concurrency_level']
slice_size_from_cfg || DEFAULT_CONCURRENCY_LEVEL
end
def map_s3_object_to_filey(s3_object)
if (s3_object.key.include?'/')
path = s3_object.key.scan(/(.*\/).*/).first.first
name = s3_object.key.scan(/.*\/(.*)/).first.first
else
path = ''
name = s3_object.key
end
last_modified, md5 = last_modified_and_md5(s3_object)
normalised_path = "./#{path}"
filey = Filey.new(
normalised_path,
name,
last_modified,
md5
)
@when_filey_loaded.call(filey) if @when_filey_loaded
filey
end
def last_modified_and_md5(s3_object)
last_modified = s3_object.last_modified
md5 = s3_object.etag.gsub(/"/, '').split('-',2).first
[last_modified, md5]
end
end
end
end