Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Multi part upload and cron style scheduler #2

Merged
merged 2 commits into from

2 participants

@lovegandhi
  1. I have added multi part upload to Mongolicious.
    • Break apart a large upload
    • files are between 5MB and 4GB (Amazon's limit is <= 5MB and >= 5GB)
  2. Run jobs on a schedule (cron).
  3. Giving the option to zip or not to zip
Taulant Dhami added some commits
Taulant Dhami Implemented multi part upload for large mongo dumps. Added option to …
…run backup on a cron style interval.

If there is a mongo dump that's over 4 GB it will be send over in multiple chunks.
You can run backups as a cron style (via rufus scheduler) to run at a certain time every day/night no matter when you started your script.
Mongo dump is deleted once the gzip file is created (to save space).
0cf86ec
Taulant Dhami Adding some documentation and aborting upload whenever there are errors
Documentation that I am adding is based on my production environment running an outdated Fog gem so this might not apply to most people but it might help someone else.
cb13da5
@marcboeker
Owner

Awesome contributions. Thank you!

@marcboeker marcboeker merged commit bc15531 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 24, 2012
  1. Implemented multi part upload for large mongo dumps. Added option to …

    Taulant Dhami authored
    …run backup on a cron style interval.
    
    If there is a mongo dump that's over 4 GB it will be send over in multiple chunks.
    You can run backups as a cron style (via rufus scheduler) to run at a certain time every day/night no matter when you started your script.
    Mongo dump is deleted once the gzip file is created (to save space).
Commits on Jan 21, 2013
  1. Adding some documentation and aborting upload whenever there are errors

    Taulant Dhami authored
    Documentation that I am adding is based on my production environment running an outdated Fog gem so this might not apply to most people but it might help someone else.
This page is out of date. Refresh to see the latest.
View
19 README.md
@@ -20,6 +20,9 @@ backup jobs, that will be run in the defined interval.
db: mongodb://user:password@host:port/database
location: bucket_name/prefix
versions: 5
+ compress_tar_file: False
+ temp_directory: /mnt/some_ebs_location/backups
+ cron: 0 22 * * 1-5
The s3 section contains the credentials, to authenticate with AWS S3. The
jobs section contains a list ob jobs, that will be executed in the given
@@ -29,6 +32,22 @@ interval. Each job must contain the following keys:
* **db** - Is a URI, that defines the database host, database name and auth credentials.
* **location** - The location is the S3 bucket, where to put the dump and a prefix.
* **versions** - Keep the latest X versions of the backup.
+* **compress_tar_file** - True/False A large backup might take too long to compress on smaller EC2 instances
+* **temp_directory** - (optional) Use this directory for storing temp dump and tar files. If not provided it will use system's temp directory
+* **cron** - 0 22 * * 1-5 (optional every day of the week at 22:00 (10pm).
+ If it's not provided interval is used instead
+
+Cron explained:
+
+|Field name |Mandatory |Allowed values |Allowed special characters|
+|:------------|:--------:|:---------------|:-------------------------|
+|Minutes |Yes |0-59 |* / , - |
+|Hours |Yes |0-23 |/ , - |
+|Day of month |Yes |1-31 |* / , - ? L W |
+|Month |Yes |1-12 or JAN-DEC |* / , - |
+|Day of week |Yes |0-6 or SUN-SAT |* / , - ? L # |
+|Year |No |1970–2099 |* / , - |
+
Please consider, that the location option works like this:
View
7 examples/jobs.yml
@@ -7,9 +7,14 @@ jobs:
db: mongodb://user:password@host:port/database
location: bucket_name/prefix
versions: 5
+ compress_tar_file: False
+ temp_directory: /mnt/some_ebs_location/backups
+ cron: 0 1 * * 0-6
- interval: 1d
db: mongodb://user:password@host:port/database2
location: bucket_name/prefix_db2
versions: 2
-
+ compress_tar_file: True
+ temp_directory: /mnt/some_ebs_location/backups
+ cron: 0 2 * * 0-6
View
143 lib/mongolicious/backup.rb
@@ -1,6 +1,6 @@
module Mongolicious
class Backup
-
+
# Initialize the backup system.
#
# @param [String] jobfile the path of the job configuration file.
@@ -8,14 +8,14 @@ class Backup
# @return [Backup]
def initialize(jobfile)
@conf = parse_jobfile(jobfile)
-
+
@storage = Storage.new(@conf['s3'])
@filesystem = Filesystem.new
@db = DB.new
-
+
schedule_jobs(@conf['jobs'])
end
-
+
protected
# Parse YAML job configuration.
@@ -23,8 +23,8 @@ def initialize(jobfile)
# @param [String] jobfile the path of the job configuration file.
#
# @return [Hash]
- def parse_jobfile(jobfile)
- YAML.load(File.read(jobfile))
+ def parse_jobfile(jobfile)
+ YAML.load(File.read(jobfile))
rescue Errno::ENOENT
Mongolicious.logger.error("Could not find job file at #{ARGV[0]}")
exit
@@ -32,49 +32,140 @@ def parse_jobfile(jobfile)
Mongolicious.logger.error("Could not parse job file #{ARGV[0]} - #{e}")
exit
end
-
+
# Schedule the jobs to be executed in the given interval.
#
# This method will block and keep running until it gets interrupted.
#
# @param [Array] jobs the list of jobs to be scheduled.
#
- # @return [nil]
+ # @return [nil]
def schedule_jobs(jobs)
scheduler = Rufus::Scheduler.start_new
-
+
jobs.each do |job|
- Mongolicious.logger.info("Scheduled new job for #{job['db'].split('/').last} with interval #{job['interval']}")
- scheduler.every job['interval'] do
- backup(job)
+ if job['cron']
+ Mongolicious.logger.info("Scheduled new job for #{job['db'].split('/').last} with cron: #{job['cron']}")
+ scheduler.cron job['cron'] do
+ backup(job)
+ end
+ else
+ scheduler.every job['interval'] do
+ Mongolicious.logger.info("Scheduled new job for #{job['db'].split('/').last} with interval: #{job['interval']}")
+ backup(job)
+ end
end
- end
-
+ end
+
scheduler.join
- end
-
+ end
+
# Dump database, compress and upload it.
#
# @param [Hash] job the job to execute.
#
# @return [nil]
def backup(job)
- path = @filesystem.get_tmp_path
+ path = @filesystem.get_tmp_path(job['temp_directory'])
s3 = @storage.parse_location(job['location'])
db = @db.get_opts(job['db'])
-
+
Mongolicious.logger.info("Starting job for #{db[:host]}:#{db[:port]}/#{db[:db]}")
@db.dump(db, path)
- @filesystem.compress(path)
-
- key = "#{s3[:prefix]}_#{Time.now.strftime('%m%d%Y_%H%M%S')}.tar.bz2"
- @storage.upload(s3[:bucket], key, path)
-
- @filesystem.cleanup(path)
+ path = @filesystem.compress(path, job['compress_tar_file'])
+ key = "#{s3[:prefix]}_#{Time.now.strftime('%Y%m%d_%H%M%S')}.tar.bz2"
+
+ min_file_size = 5 * (1024 * 1024) # 5 MB
+ max_file_size = 4 * (1024 * 1024 * 1024) # 4 GB
+ split_size = max_file_size
+ file_size = File.size("#{path}")
+ Mongolicious.logger.info("Total backup size: #{file_size} bytes")
+
+ if file_size > max_file_size
+ split_parts = file_size / max_file_size + (file_size % max_file_size > 0 ? 1 : 0)
+
+ last_part_size_in_bytes = file_size -
+ (max_file_size * ((split_parts - 1) <= 0 ? 1: (split_parts - 1)))
+
+ if last_part_size_in_bytes < min_file_size
+ # If we are sending the file in chunks we need to make sure that the last part of the
+ # file is bigger than the 5MB otherwise the whole upload will fail.
+ # If last part is smaller than 5MB then we distribute its bytes to the other parts
+ split_size = max_file_size +
+ (last_part_size_in_bytes/((split_parts - 1) <= 0 ? 1 : (split_parts - 1)))
+ end
+
+ Mongolicious.logger.info("Splitting file into #{split_size} bytes/part before uploading.")
+ system("split -b #{split_size} #{path} #{path}.")
+
+ Mongolicious.logger.info("Deleting tar file: #{path}")
+ @filesystem.cleanup_tar_file(path)
+
+ # Get a list of all the split files bigfile.gzip.aa/ab/ac...
+ file_parts = Dir.glob("#{path}.*").sort
+ upload_id = @storage.initiate_multipart_upload(s3[:bucket], key)
+ part_ids = []
+
+ Mongolicious.logger.info("Uploading #{path} in #{file_parts.count} parts.")
+
+ file_parts.each_with_index do |part, position|
+ Mongolicious.logger.info("Uploading file part: #{part}")
+ part_number = (position + 1).to_s
+
+ File.open part do |file_part|
+ attempts = 0
+ max_attempts = 3
+
+ begin
+ # While in production we would get frequent "Connection reset by peer" while uploading to S3
+ # retrying the upload would cause the begin block to be called after 30-40 minutes, therefore,
+ # we can't reuse the same socket as that one has timed out.
+ # http://scie.nti.st/2008/3/14/amazon-s3-and-connection-reset-by-peer for explanation on "connection
+ # reset by peer" and what you can do to fix the issue
+ #
+ # issue with fog 0.5.1 https://github.com/fog/fog/issues/327
+ # fixed with: https://github.com/fog/fog/commit/597acf03631d3c21442f036a0433a2aa24f98345
+ # Fog 0.5.1 was released on January 31 2011
+ # Fix was issued on May 25 2011
+ # Whenever there is connection reset fog would not set content length to the right value.
+
+ etag = @storage.upload_part(s3[:bucket], key, upload_id, part_number, file_part)
+ rescue Exception => exception
+ attempts += 1
+ Mongolicious.logger.warn("Retry #{attempts} of #{max_attempts}. Error while uploading part: #{part}")
+ Mongolicious.logger.warn(exception.message)
+ Mongolicious.logger.warn(exception.backtrace)
+ retry unless attempts >= max_attempts
+
+ Mongolicious.logger.error("Aborting upload! Error uploading part: #{part}")
+ @filesystem.cleanup_parts(file_parts)
+
+ # tell S3 that we are aborting the upload.
+ @storage.abort_multipart_upload(s3[:bucket], key, upload_id)
+
+ # There is nothing that we can do anymore
+ # Exit this method with error code 0 so that subsequent jobs can fire as scheduled.
+ return
+ end
+
+ part_ids << etag
+ end
+ end
+
+ Mongolicious.logger.info("Completing multipart upload.")
+ response = @storage.complete_multipart_upload(s3[:bucket], key, upload_id, part_ids)
+ Mongolicious.logger.info("#{response.inspect}\n\n")
+
+ @filesystem.cleanup_parts(file_parts)
+ else
+ @storage.upload(s3[:bucket], key, path)
+ @filesystem.cleanup_tar_file(path)
+ end
+
@storage.cleanup(s3[:bucket], s3[:prefix], job['versions'])
-
- Mongolicious.logger.info("Finishing job for #{db[:host]}:#{db[:port]}/#{db[:db]}")
+
+ Mongolicious.logger.info("Finishing job for #{db[:host]}:#{db[:port]}/#{db[:db]}")
end
end
View
28 lib/mongolicious/db.rb
@@ -3,44 +3,44 @@ class DB
# Initialize a ne DB object.
#
- # @return [DB]
+ # @return [DB]
def initialize
-
+
end
# Parse the MongoDB URI.
#
# @param [String] db_uri the DB URI.
#
- # @return [Hash]
+ # @return [Hash]
def get_opts(db_uri)
uri = URI.parse(db_uri)
-
+
{
- :host => uri.host,
- :port => uri.port,
- :user => uri.user,
- :password => uri.password,
+ :host => uri.host,
+ :port => uri.port,
+ :user => uri.user,
+ :password => uri.password,
:db => uri.path.gsub('/', '')
}
- end
+ end
# Dump database using mongodump.
#
# @param [Hash] db the DB connection opts.
# @param [String] path the path, where the dump should be stored.
#
- # @return [nil]
+ # @return [nil]
def dump(db, path)
Mongolicious.logger.info("Dumping database #{db[:db]}")
-
+
cmd = "mongodump -d #{db[:db]} -h #{db[:host]}:#{db[:port]} -o #{path}"
cmd << " -u '#{db[:user]}' -p '#{db[:password]}'" unless (db[:user].nil? || db[:user].empty?)
cmd << " > /dev/null"
-
+
system(cmd)
raise "Error while backuing up #{db[:db]}" if $?.to_i != 0
end
-
+
end
-end
+end
View
71 lib/mongolicious/filesystem.rb
@@ -3,41 +3,76 @@ class Filesystem
# Initialize a ne Filesystem object.
#
- # @return [Filesytem]
+ # @return [Filesytem]
def initialize
-
+
end
# Compress the dump to an tar.bz2 archive.
#
# @param [String] path the path, where the dump is located.
#
- # @return [nil]
- def compress(path)
+ # @return [String]
+ def compress(path, compress_tar_file)
Mongolicious.logger.info("Compressing database #{path}")
-
- system("cd #{path} && tar -cjpf #{path}.tar.bz2 .")
+
+ system("cd #{path} && tar -cpf#{compress_tar_file ? 'j' : ''} #{path}.tar.bz2 .")
raise "Error while compressing #{path}" if $?.to_i != 0
+
+ # Remove mongo dump now that we have the bzip
+ FileUtils.rm_rf(path)
+
+ return "#{path}.tar.bz2"
end
-
+
# Generate tmp path for dump.
#
# @return [String]
- def get_tmp_path
- "#{Dir.tmpdir}/#{Time.now.to_i * rand}"
+ def get_tmp_path(temp_path)
+ if not temp_path
+ temp_path = Dir.tmpdir
+ end
+
+ Mongolicious.logger.info("Using #{temp_path} as root for our temp backup.")
+ return "#{temp_path}/#{Time.now.to_i}"
end
-
- # Remove dump and archive from tmp path.
+
+ # Remove dump from tmp path.
#
# @param [String] path the path, where the dump/archive is located.
#
# @return [nil]
- def cleanup(path)
+ def cleanup_tar_file(path)
Mongolicious.logger.info("Cleaning up local path #{path}")
-
- FileUtils.rm_rf(path)
- File.delete("#{path}.tar.bz2")
- end
-
+ begin
+ File.delete(path)
+ rescue => exception
+ Mongolicious.logger.error("Error trying to delete: #{path}")
+ Mongolicious.logger.info(exception.message)
+ end
+ end
+
+ # Remove all the bzip parts
+ #
+ # @param [Array] file_parts an array of paths
+ #
+ # @return [nill]
+ def cleanup_parts(file_parts)
+ Mongolicious.logger.info("Cleaning up file parts.")
+
+ if file_parts
+ file_parts.each do |part|
+ Mongolicious.logger.info("Deleting part: #{part}")
+ begin
+ File.delete(part)
+ rescue => exception
+ Mongolicious.logger.error("Error trying to delete part: #{part}")
+ Mongolicious.logger.error(exception.message)
+ Mongolicious.logger.error(exception.backtrace)
+ end
+ end
+ end
+ end
+
end
-end
+end
View
89 lib/mongolicious/storage.rb
@@ -1,30 +1,30 @@
module Mongolicious
class Storage
-
+
# Initialize the storage object.
#
# @option opts [Hash] :access_id the Access ID of the S3 account.
- # @option opts [Hash] :secret_key the Secret Key of the S3 account.
+ # @option opts [Hash] :secret_key the Secret Key of the S3 account.
#
- # @return [Storage]
+ # @return [Storage]
def initialize(opts)
@con = Fog::Storage.new({
:provider => 'AWS',
:aws_access_key_id => opts['access_id'],
:aws_secret_access_key => opts['secret_key']
})
- end
-
+ end
+
# Parse the given location into a bucket and a prefix.
#
# @param [String] location the bucket/prefix location.
#
- # @return [Hash]
+ # @return [Hash]
def parse_location(location)
location = location.split('/')
{:bucket => location.first, :prefix => location[1..-1].join('/')}
- end
+ end
# Upload the given path to S3.
#
@@ -35,25 +35,80 @@ def parse_location(location)
# @return [Hash]
def upload(bucket, key, path)
Mongolicious.logger.info("Uploading archive to #{key}")
-
+
@con.put_object(
- bucket, key, File.open("#{path}.tar.bz2", 'r'),
- {'x-amz-acl' => 'private', 'Content-Type' => 'application/x-tar'}
- )
- end
-
+ bucket, key, File.open(path, 'r'),
+ {'x-amz-acl' => 'private', 'Content-Type' => 'application/x-tar'}
+ )
+ end
+
+ # Initiate a multipart upload to S3
+ # content of this upload will be private
+ #
+ # @param [String] bucket the bucket where to store the archive in.
+ # @param [String] key the key where the archive is stored under.
+ #
+ # @return [String] UploadId the id where amazon will save all the parts.
+ # When uploading all the parts this will need to be provided.
+ def initiate_multipart_upload(bucket, key)
+ response = @con.initiate_multipart_upload(bucket, key,
+ {'x-amz-acl' => 'private', 'Content-Type' => 'application/x-tar'})
+
+ return response.body['UploadId']
+ end
+
+ # Upload a part for a multipart upload
+ #
+ # @param [String] bucket Name of bucket to add part to
+ # @param [String] key Name of object to add part to
+ # @param [String] upload_id Id of upload to add part to
+ # @param [String] part_number Index of part in upload
+ # @param [String] data Contect of part
+ #
+ # @return [String] ETag etag of new object. Will be needed to complete upload
+ def upload_part(bucket, key, upload_id, part_number, data)
+ response = @con.upload_part(bucket, key, upload_id, part_number, data)
+
+ return response.headers['ETag']
+ end
+
+ # Complete a multipart upload
+ #
+ # @param [String] bucket Name of bucket to complete multipart upload for
+ # @param [String] key Name of object to complete multipart upload for
+ # @param [String] upload_id Id of upload to add part to
+ # @param [String] parts Array of etags for parts
+ #
+ # @return [Excon::Response]
+ def complete_multipart_upload(bucket, key, upload_id, parts)
+ response = @con.complete_multipart_upload(bucket, key, upload_id, parts)
+
+ return response
+ end
+
+ # Aborts a multipart upload
+ #
+ # @param [String] bucket Name of bucket to abort multipart upload on
+ # @param [String] key Name of object to abort multipart upload on
+ # @param [String] upload_id Id of upload to add part to
+ #
+ # @return [nil]
+ def abort_multipart_upload(bucket, key, upload_id)
+ @con.abort_multipart_upload(bucket, key, upload_id)
+ end
+
# Remove old versions of a backup.
#
# @param [String] bucket the bucket where the archive is stored in.
# @param [String] prefix the prefix where to look for outdated versions.
# @param [Integer] versions number of versions to keep.
#
- # @return [nil]
+ # @return [nil]
def cleanup(bucket, prefix, versions)
objects = @con.get_bucket(bucket, :prefix => prefix).body['Contents']
-
+
return if objects.size <= versions
-
+
objects[0...(objects.size - versions)].each do |o|
Mongolicious.logger.info("Removing outdated version #{o['Key']}")
@con.delete_object(bucket, o['Key'])
@@ -61,4 +116,4 @@ def cleanup(bucket, prefix, versions)
end
end
-end
+end
Something went wrong with that request. Please try again.