Filemerge is a utility for merging a large number of small HDFS files into smaller number of large files. Filemerge is intended for use by Hadoop operations engineers and map-reduce application developers.
The structure of the code is simple. The actual merging is performed by a Pig
script created at run time using user-supplied parameters. These parameters
control the set of files to merge. The utility consists of a single file,
filemerge.py, that takes the input parameters and invokes the created pig
script. As such, pig command needs to be available and in path of the
runtime user. The user specifies the input path, output path, topic, and
files to be merged either as a year/month/day format or specific HDFS directory
or a list of HDFS directories in a file.
Because the application code is small and self-contained, installation requires simply cloning the repository.
git clone https://github.com/intuit/filemerge.gitNote that filemerge itself does not have any dependencies besides pig
command-line. However, running the test suite locally requires installation of
the test discovery and mocking packages. These dependencies are listed in
filemerge/requirements.txt and can be installed as follows.
cd filemerge
pip install -r requirements.txtFinally, installation can be verified by running the test suite locally.
nosetests -w unit_tests -vThe full API of the script is available on commandline by typing
python filemerge.py -hThe help message is reproduced below for reference.
Usage:
python filemerge.py --topic=<'topic-in-single-quotes'>
--input-prefix=<'HDFS-location-in-single-quotes'>
--output-prefix=<'HDFS-location-in-single-quotes'>
--num-reducers=<any-positive-integer>
--queue=<hadoop queue name>
[--year=<4-digit-year>]
[--month=<month>]
[--day=<day>]
[--dir=<directory relative to input-prefix>]
[--file=<file with list of directories, relative to input-prefix>]
[--window=<window size in days>]
[--codec=<valid hadoop compression codec>]
[-r]
Options:
-h, --help show this help message and exit
-y YEAR, --year=YEAR Year for the merge
-m MONTH, --month=MONTH
Month for the merge
-d DAY, --day=DAY Day for the merge
-D DIRECTORY, --directory=DIRECTORY
Directory containing files to merge
-f FILE, --file=FILE File containing list of input directories
-w WINDOW, --window=WINDOW
Window in days (merge for the past *n* days
-l LOOKBACK, --lookback=LOOKBACK
Lookback period (merge for 1 day *n* days prior)
-t TOPIC, --topic=TOPIC
Topic for the merge
-i INPUT_PREFIX, --input-prefix=INPUT_PREFIX
Input directory prefix
-o OUTPUT_PREFIX, --output-prefix=OUTPUT_PREFIX
Output directory prefix
-n NUM_REDUCERS, --num-reducers=NUM_REDUCERS
Number of reducers
-c CODEC, --codec=CODEC
Compression codec to use
-q QUEUE, --queue=QUEUE
Mapreduce job queue
-r, --dry-run Dry run; create, but dont execute the Pig scriptThe arguments outside the square brackets are required and those in the square brackets are optional, but a minimum set of these arguments is needed to compute the set of directories to be merged. The acceptable option groups are following:
- Group 1
- year (-y)
- year (-y), month (-m)
- year (-y), month (-m), day (-d)
- Group 2
- HDFS directory (-D)
- Group 3
- file with a list of HDFS directories (-f)
- Group 4
- window with a start date (-w); files for all days between start date minus window to start date will be merged
- Group 5
- lookback with a start date (-l); files for a single day lookback days before the start date will be merged
These option groups are designed to enable merging at the directory, day, month,
or the year level. The -f offers ability to merge non-contiguous firectory
blocks. The -w and -l options allow merging of directories at periodic
intervals using a sliding window.
One can further enhance the flexibility of these options by wrapping the
python call in a shell script and providing custom list of directories,
non-contiguous months, shunking large directory lists into smaller parts etc.
The filemerge tool is written with operations and map-reduce application
developers in mind. Operations team will need periodic merges based on the
retention policy and will typically use the tool with the -y, -m, -d
options. Map-reduce application developers might need to merge single
directories or random directory groups and will use the -d and -f
options.
The most common usage pattern for filemerge is to merge all files in a
directory and produce one output file (in a different directory). To merge files
unders a specific directory, provide the basepath using the -i option and
the final directory name using the -D option. In the following invocation
the /path/to/clickstream is the base HDFS path and jan2016 is the
subdirectory that contains the files to be merged (in this case, for January
2016). In other words, the full path to the files that will be merged is:
/path/to/clickstream/jan2016
python filemerge/filemerge.py \
-i '/hdfs/path/to/clickstream' \
-D 'jan2016' \
-o '/hdfs/path/to/jan2016-merged' \
-t 'clickstream'Following command invokes the script for merging February 2015 data of the 'clickstream' directory in HDFS. This is the raw call to the filemerge python script and will initiate 28 map-reduce jobs.
python filemerge/filemerge.py \
-i '/hdfs/path/to/clickstream' \
-o '/hdfs/path/to/clickstream-merged' \
-t 'clickstream' \
-y 2015 \
-m 2Simply omit the month and day options and the merge wil be performed for the full year. Following command invokes the script for merging the entire 2015 data of the 'clickstream' directory with a 1 day chunk size. This will initiate 365 map-reduce jobs.
python filemerge/filemerge.py \
-i '/hdfs/path/to/clickstream' \
-o '/hdfs/path/to/clickstream-merged' \
-t 'clickstream' \
-y 2015Note that detecting files in time window (e.g. a certain month or a year)
requires filemerge to assume certain directory naming conventions. This
convention is specified in filemerge/templates.py and can be user-defined.
To merge files under unrelated non-contiguous directories, list all the final
directory names in a file and pass the full file path to the -f option. In
the invocation below, -i captures the common portion of the path to all the
directories and the final directories are listed in the file.
python filemerge/filemerge.py \
-i '/hdfs/path/to/clickstream' \
-o '/hdfs/path/to/clickstream-merged' \
-t 'clickstream' \
-f /local/filesystem/path/to/directory_list.txtLets assume to that /local/filesystem/path/to/directory_list.txt contains
the following lines
d_20150225
d_20160309
d_20150728In that case all files under /hdfs/path/to/clickstream/{d_20150225,
d_20160309, d_20150728} will be merged. Note, that they wont be merged into
the same file. Rather, three different output directories, one for each directory
in listed in directory_list.txt, will be created.
The following invocation the filemerge script will merge files in the
clickstream directory for the last 20 days (not including today). The window
is datetime aware.
python filemerge/filemerge.py \
-i '/hdfs/path/to/clickstream' \
-o '/hdfs/path/to/clickstream-merged' \
-t 'clickstream' \
-w 20The following invocation the filemerge script will merge files in the
clickstream topic for the day 20 days prior to today. The lookback is
datetime aware.
python filemerge/filemerge.py \
-i '/hdfs/path/to/clickstream' \
-o '/hdfs/path/to/clickstream-merged' \
-t 'clickstream' \
-l 20For multi-directory merges, filemerge.py can be called from a script that
provides the list of directories and the merge frequency. The following wrapper
script shows how to merge 2015 files for a subset of directories. The script needs to
be present in the same directory as the filemerge.py script.
#!/bin/bash # List of all HDFS subdirectories can be obtained as follows # hadoop fs -ls /hdfs/base/path | sed -E "s:.*/hdfs/base/path/(.*)$:\\1:" # Set of subdirectories to be merged, obtained from output of the # above command TOPICS=( businessevents customer-transactions desktop-clickstream mobile-clickstream-ios mobile-clickstream-android) YEAR=2015 for TOPIC in ${TOPICS[@]}; do OUTPUT_DIR="/hdfs/base/path/${TOPIC}-merged" python filemerge/filemerge.py \ -i '/hdfs/base/path/${TOPIC}' \ -o ${OUTPUT_DIR} \ -t ${TOPIC} \ -y 2015 done
Merging for custom months is straightforward and is similar to above looping
logic. Once again, the following script needs to be located in the same directory
as filemerge.py.
#!/bin/bash # Subset of months to be merged MONTHS=( 01 02 07 09 10 12) YEAR=2015 TOPIC="clickstream" OUTPUT_DIR="/hdfs/base/path/${TOPIC}-merged" for MM in ${MONTHS[@]}; do python filemerge/filemerge.py \ -i '/hdfs/base/path/${TOPIC}' \ -o ${OUTPUT_DIR} \ -t ${TOPIC} \ -y 2015 \ -m ${MONTH} done
The overarching pattern here is to realize that the unit of time for the merge logic is a directory. As long as this is noted, the actual logic can be customized in more ways than those shown above: simply write a wrapper shell script to create your variables and loop over them. These variables can be months, input directories, or output directories.