Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #2 from lovegandhi/multi_part_upload

Multi part upload and cron style scheduler
  • Loading branch information...
commit bc1553188df97d3df825de6d826b34ab7185a431 2 parents 43642ab + cb13da5
@marcboeker authored
View
19 README.md
@@ -20,6 +20,9 @@ backup jobs, that will be run in the defined interval.
db: mongodb://user:password@host:port/database
location: bucket_name/prefix
versions: 5
+ compress_tar_file: False
+ temp_directory: /mnt/some_ebs_location/backups
+ cron: 0 22 * * 1-5
The s3 section contains the credentials, to authenticate with AWS S3. The
jobs section contains a list ob jobs, that will be executed in the given
@@ -29,6 +32,22 @@ interval. Each job must contain the following keys:
* **db** - Is a URI, that defines the database host, database name and auth credentials.
* **location** - The location is the S3 bucket, where to put the dump and a prefix.
* **versions** - Keep the latest X versions of the backup.
+* **compress_tar_file** - True/False A large backup might take too long to compress on smaller EC2 instances
+* **temp_directory** - (optional) Use this directory for storing temp dump and tar files. If not provided it will use system's temp directory
+* **cron** - 0 22 * * 1-5 (optional every day of the week at 22:00 (10pm).
+ If it's not provided interval is used instead
+
+Cron explained:
+
+|Field name |Mandatory |Allowed values |Allowed special characters|
+|:------------|:--------:|:---------------|:-------------------------|
+|Minutes |Yes |0-59 |* / , - |
+|Hours |Yes |0-23 |/ , - |
+|Day of month |Yes |1-31 |* / , - ? L W |
+|Month |Yes |1-12 or JAN-DEC |* / , - |
+|Day of week |Yes |0-6 or SUN-SAT |* / , - ? L # |
+|Year |No |1970–2099 |* / , - |
+
Please consider, that the location option works like this:
View
7 examples/jobs.yml
@@ -7,9 +7,14 @@ jobs:
db: mongodb://user:password@host:port/database
location: bucket_name/prefix
versions: 5
+ compress_tar_file: False
+ temp_directory: /mnt/some_ebs_location/backups
+ cron: 0 1 * * 0-6
- interval: 1d
db: mongodb://user:password@host:port/database2
location: bucket_name/prefix_db2
versions: 2
-
+ compress_tar_file: True
+ temp_directory: /mnt/some_ebs_location/backups
+ cron: 0 2 * * 0-6
View
143 lib/mongolicious/backup.rb
@@ -1,6 +1,6 @@
module Mongolicious
class Backup
-
+
# Initialize the backup system.
#
# @param [String] jobfile the path of the job configuration file.
@@ -8,14 +8,14 @@ class Backup
# @return [Backup]
def initialize(jobfile)
@conf = parse_jobfile(jobfile)
-
+
@storage = Storage.new(@conf['s3'])
@filesystem = Filesystem.new
@db = DB.new
-
+
schedule_jobs(@conf['jobs'])
end
-
+
protected
# Parse YAML job configuration.
@@ -23,8 +23,8 @@ def initialize(jobfile)
# @param [String] jobfile the path of the job configuration file.
#
# @return [Hash]
- def parse_jobfile(jobfile)
- YAML.load(File.read(jobfile))
+ def parse_jobfile(jobfile)
+ YAML.load(File.read(jobfile))
rescue Errno::ENOENT
Mongolicious.logger.error("Could not find job file at #{ARGV[0]}")
exit
@@ -32,49 +32,140 @@ def parse_jobfile(jobfile)
Mongolicious.logger.error("Could not parse job file #{ARGV[0]} - #{e}")
exit
end
-
+
# Schedule the jobs to be executed in the given interval.
#
# This method will block and keep running until it gets interrupted.
#
# @param [Array] jobs the list of jobs to be scheduled.
#
- # @return [nil]
+ # @return [nil]
def schedule_jobs(jobs)
scheduler = Rufus::Scheduler.start_new
-
+
jobs.each do |job|
- Mongolicious.logger.info("Scheduled new job for #{job['db'].split('/').last} with interval #{job['interval']}")
- scheduler.every job['interval'] do
- backup(job)
+ if job['cron']
+ Mongolicious.logger.info("Scheduled new job for #{job['db'].split('/').last} with cron: #{job['cron']}")
+ scheduler.cron job['cron'] do
+ backup(job)
+ end
+ else
+ scheduler.every job['interval'] do
+ Mongolicious.logger.info("Scheduled new job for #{job['db'].split('/').last} with interval: #{job['interval']}")
+ backup(job)
+ end
end
- end
-
+ end
+
scheduler.join
- end
-
+ end
+
# Dump database, compress and upload it.
#
# @param [Hash] job the job to execute.
#
# @return [nil]
def backup(job)
- path = @filesystem.get_tmp_path
+ path = @filesystem.get_tmp_path(job['temp_directory'])
s3 = @storage.parse_location(job['location'])
db = @db.get_opts(job['db'])
-
+
Mongolicious.logger.info("Starting job for #{db[:host]}:#{db[:port]}/#{db[:db]}")
@db.dump(db, path)
- @filesystem.compress(path)
-
- key = "#{s3[:prefix]}_#{Time.now.strftime('%m%d%Y_%H%M%S')}.tar.bz2"
- @storage.upload(s3[:bucket], key, path)
-
- @filesystem.cleanup(path)
+ path = @filesystem.compress(path, job['compress_tar_file'])
+ key = "#{s3[:prefix]}_#{Time.now.strftime('%Y%m%d_%H%M%S')}.tar.bz2"
+
+ min_file_size = 5 * (1024 * 1024) # 5 MB
+ max_file_size = 4 * (1024 * 1024 * 1024) # 4 GB
+ split_size = max_file_size
+ file_size = File.size("#{path}")
+ Mongolicious.logger.info("Total backup size: #{file_size} bytes")
+
+ if file_size > max_file_size
+ split_parts = file_size / max_file_size + (file_size % max_file_size > 0 ? 1 : 0)
+
+ last_part_size_in_bytes = file_size -
+ (max_file_size * ((split_parts - 1) <= 0 ? 1: (split_parts - 1)))
+
+ if last_part_size_in_bytes < min_file_size
+ # If we are sending the file in chunks we need to make sure that the last part of the
+ # file is bigger than the 5MB otherwise the whole upload will fail.
+ # If last part is smaller than 5MB then we distribute its bytes to the other parts
+ split_size = max_file_size +
+ (last_part_size_in_bytes/((split_parts - 1) <= 0 ? 1 : (split_parts - 1)))
+ end
+
+ Mongolicious.logger.info("Splitting file into #{split_size} bytes/part before uploading.")
+ system("split -b #{split_size} #{path} #{path}.")
+
+ Mongolicious.logger.info("Deleting tar file: #{path}")
+ @filesystem.cleanup_tar_file(path)
+
+ # Get a list of all the split files bigfile.gzip.aa/ab/ac...
+ file_parts = Dir.glob("#{path}.*").sort
+ upload_id = @storage.initiate_multipart_upload(s3[:bucket], key)
+ part_ids = []
+
+ Mongolicious.logger.info("Uploading #{path} in #{file_parts.count} parts.")
+
+ file_parts.each_with_index do |part, position|
+ Mongolicious.logger.info("Uploading file part: #{part}")
+ part_number = (position + 1).to_s
+
+ File.open part do |file_part|
+ attempts = 0
+ max_attempts = 3
+
+ begin
+ # While in production we would get frequent "Connection reset by peer" while uploading to S3
+ # retrying the upload would cause the begin block to be called after 30-40 minutes, therefore,
+ # we can't reuse the same socket as that one has timed out.
+ # http://scie.nti.st/2008/3/14/amazon-s3-and-connection-reset-by-peer for explanation on "connection
+ # reset by peer" and what you can do to fix the issue
+ #
+ # issue with fog 0.5.1 https://github.com/fog/fog/issues/327
+ # fixed with: https://github.com/fog/fog/commit/597acf03631d3c21442f036a0433a2aa24f98345
+ # Fog 0.5.1 was released on January 31 2011
+ # Fix was issued on May 25 2011
+ # Whenever there is connection reset fog would not set content length to the right value.
+
+ etag = @storage.upload_part(s3[:bucket], key, upload_id, part_number, file_part)
+ rescue Exception => exception
+ attempts += 1
+ Mongolicious.logger.warn("Retry #{attempts} of #{max_attempts}. Error while uploading part: #{part}")
+ Mongolicious.logger.warn(exception.message)
+ Mongolicious.logger.warn(exception.backtrace)
+ retry unless attempts >= max_attempts
+
+ Mongolicious.logger.error("Aborting upload! Error uploading part: #{part}")
+ @filesystem.cleanup_parts(file_parts)
+
+ # tell S3 that we are aborting the upload.
+ @storage.abort_multipart_upload(s3[:bucket], key, upload_id)
+
+ # There is nothing that we can do anymore
+ # Exit this method with error code 0 so that subsequent jobs can fire as scheduled.
+ return
+ end
+
+ part_ids << etag
+ end
+ end
+
+ Mongolicious.logger.info("Completing multipart upload.")
+ response = @storage.complete_multipart_upload(s3[:bucket], key, upload_id, part_ids)
+ Mongolicious.logger.info("#{response.inspect}\n\n")
+
+ @filesystem.cleanup_parts(file_parts)
+ else
+ @storage.upload(s3[:bucket], key, path)
+ @filesystem.cleanup_tar_file(path)
+ end
+
@storage.cleanup(s3[:bucket], s3[:prefix], job['versions'])
-
- Mongolicious.logger.info("Finishing job for #{db[:host]}:#{db[:port]}/#{db[:db]}")
+
+ Mongolicious.logger.info("Finishing job for #{db[:host]}:#{db[:port]}/#{db[:db]}")
end
end
View
28 lib/mongolicious/db.rb
@@ -3,44 +3,44 @@ class DB
# Initialize a ne DB object.
#
- # @return [DB]
+ # @return [DB]
def initialize
-
+
end
# Parse the MongoDB URI.
#
# @param [String] db_uri the DB URI.
#
- # @return [Hash]
+ # @return [Hash]
def get_opts(db_uri)
uri = URI.parse(db_uri)
-
+
{
- :host => uri.host,
- :port => uri.port,
- :user => uri.user,
- :password => uri.password,
+ :host => uri.host,
+ :port => uri.port,
+ :user => uri.user,
+ :password => uri.password,
:db => uri.path.gsub('/', '')
}
- end
+ end
# Dump database using mongodump.
#
# @param [Hash] db the DB connection opts.
# @param [String] path the path, where the dump should be stored.
#
- # @return [nil]
+ # @return [nil]
def dump(db, path)
Mongolicious.logger.info("Dumping database #{db[:db]}")
-
+
cmd = "mongodump -d #{db[:db]} -h #{db[:host]}:#{db[:port]} -o #{path}"
cmd << " -u '#{db[:user]}' -p '#{db[:password]}'" unless (db[:user].nil? || db[:user].empty?)
cmd << " > /dev/null"
-
+
system(cmd)
raise "Error while backuing up #{db[:db]}" if $?.to_i != 0
end
-
+
end
-end
+end
View
71 lib/mongolicious/filesystem.rb
@@ -3,41 +3,76 @@ class Filesystem
# Initialize a ne Filesystem object.
#
- # @return [Filesytem]
+ # @return [Filesytem]
def initialize
-
+
end
# Compress the dump to an tar.bz2 archive.
#
# @param [String] path the path, where the dump is located.
#
- # @return [nil]
- def compress(path)
+ # @return [String]
+ def compress(path, compress_tar_file)
Mongolicious.logger.info("Compressing database #{path}")
-
- system("cd #{path} && tar -cjpf #{path}.tar.bz2 .")
+
+ system("cd #{path} && tar -cpf#{compress_tar_file ? 'j' : ''} #{path}.tar.bz2 .")
raise "Error while compressing #{path}" if $?.to_i != 0
+
+ # Remove mongo dump now that we have the bzip
+ FileUtils.rm_rf(path)
+
+ return "#{path}.tar.bz2"
end
-
+
# Generate tmp path for dump.
#
# @return [String]
- def get_tmp_path
- "#{Dir.tmpdir}/#{Time.now.to_i * rand}"
+ def get_tmp_path(temp_path)
+ if not temp_path
+ temp_path = Dir.tmpdir
+ end
+
+ Mongolicious.logger.info("Using #{temp_path} as root for our temp backup.")
+ return "#{temp_path}/#{Time.now.to_i}"
end
-
- # Remove dump and archive from tmp path.
+
+ # Remove dump from tmp path.
#
# @param [String] path the path, where the dump/archive is located.
#
# @return [nil]
- def cleanup(path)
+ def cleanup_tar_file(path)
Mongolicious.logger.info("Cleaning up local path #{path}")
-
- FileUtils.rm_rf(path)
- File.delete("#{path}.tar.bz2")
- end
-
+ begin
+ File.delete(path)
+ rescue => exception
+ Mongolicious.logger.error("Error trying to delete: #{path}")
+ Mongolicious.logger.info(exception.message)
+ end
+ end
+
+ # Remove all the bzip parts
+ #
+ # @param [Array] file_parts an array of paths
+ #
+ # @return [nill]
+ def cleanup_parts(file_parts)
+ Mongolicious.logger.info("Cleaning up file parts.")
+
+ if file_parts
+ file_parts.each do |part|
+ Mongolicious.logger.info("Deleting part: #{part}")
+ begin
+ File.delete(part)
+ rescue => exception
+ Mongolicious.logger.error("Error trying to delete part: #{part}")
+ Mongolicious.logger.error(exception.message)
+ Mongolicious.logger.error(exception.backtrace)
+ end
+ end
+ end
+ end
+
end
-end
+end
View
89 lib/mongolicious/storage.rb
@@ -1,30 +1,30 @@
module Mongolicious
class Storage
-
+
# Initialize the storage object.
#
# @option opts [Hash] :access_id the Access ID of the S3 account.
- # @option opts [Hash] :secret_key the Secret Key of the S3 account.
+ # @option opts [Hash] :secret_key the Secret Key of the S3 account.
#
- # @return [Storage]
+ # @return [Storage]
def initialize(opts)
@con = Fog::Storage.new({
:provider => 'AWS',
:aws_access_key_id => opts['access_id'],
:aws_secret_access_key => opts['secret_key']
})
- end
-
+ end
+
# Parse the given location into a bucket and a prefix.
#
# @param [String] location the bucket/prefix location.
#
- # @return [Hash]
+ # @return [Hash]
def parse_location(location)
location = location.split('/')
{:bucket => location.first, :prefix => location[1..-1].join('/')}
- end
+ end
# Upload the given path to S3.
#
@@ -35,25 +35,80 @@ def parse_location(location)
# @return [Hash]
def upload(bucket, key, path)
Mongolicious.logger.info("Uploading archive to #{key}")
-
+
@con.put_object(
- bucket, key, File.open("#{path}.tar.bz2", 'r'),
- {'x-amz-acl' => 'private', 'Content-Type' => 'application/x-tar'}
- )
- end
-
+ bucket, key, File.open(path, 'r'),
+ {'x-amz-acl' => 'private', 'Content-Type' => 'application/x-tar'}
+ )
+ end
+
+ # Initiate a multipart upload to S3
+ # content of this upload will be private
+ #
+ # @param [String] bucket the bucket where to store the archive in.
+ # @param [String] key the key where the archive is stored under.
+ #
+ # @return [String] UploadId the id where amazon will save all the parts.
+ # When uploading all the parts this will need to be provided.
+ def initiate_multipart_upload(bucket, key)
+ response = @con.initiate_multipart_upload(bucket, key,
+ {'x-amz-acl' => 'private', 'Content-Type' => 'application/x-tar'})
+
+ return response.body['UploadId']
+ end
+
+ # Upload a part for a multipart upload
+ #
+ # @param [String] bucket Name of bucket to add part to
+ # @param [String] key Name of object to add part to
+ # @param [String] upload_id Id of upload to add part to
+ # @param [String] part_number Index of part in upload
+ # @param [String] data Contect of part
+ #
+ # @return [String] ETag etag of new object. Will be needed to complete upload
+ def upload_part(bucket, key, upload_id, part_number, data)
+ response = @con.upload_part(bucket, key, upload_id, part_number, data)
+
+ return response.headers['ETag']
+ end
+
+ # Complete a multipart upload
+ #
+ # @param [String] bucket Name of bucket to complete multipart upload for
+ # @param [String] key Name of object to complete multipart upload for
+ # @param [String] upload_id Id of upload to add part to
+ # @param [String] parts Array of etags for parts
+ #
+ # @return [Excon::Response]
+ def complete_multipart_upload(bucket, key, upload_id, parts)
+ response = @con.complete_multipart_upload(bucket, key, upload_id, parts)
+
+ return response
+ end
+
+ # Aborts a multipart upload
+ #
+ # @param [String] bucket Name of bucket to abort multipart upload on
+ # @param [String] key Name of object to abort multipart upload on
+ # @param [String] upload_id Id of upload to add part to
+ #
+ # @return [nil]
+ def abort_multipart_upload(bucket, key, upload_id)
+ @con.abort_multipart_upload(bucket, key, upload_id)
+ end
+
# Remove old versions of a backup.
#
# @param [String] bucket the bucket where the archive is stored in.
# @param [String] prefix the prefix where to look for outdated versions.
# @param [Integer] versions number of versions to keep.
#
- # @return [nil]
+ # @return [nil]
def cleanup(bucket, prefix, versions)
objects = @con.get_bucket(bucket, :prefix => prefix).body['Contents']
-
+
return if objects.size <= versions
-
+
objects[0...(objects.size - versions)].each do |o|
Mongolicious.logger.info("Removing outdated version #{o['Key']}")
@con.delete_object(bucket, o['Key'])
@@ -61,4 +116,4 @@ def cleanup(bucket, prefix, versions)
end
end
-end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.