Skip to content
This repository has been archived by the owner on Jul 15, 2019. It is now read-only.
thiruvel edited this page Jul 6, 2012 · 17 revisions

Welcome to the StreamingBz2Split wiki!

This is a hacky tool that I wrote and was adopted before I could make it lot more better. When I do have time, I will make it better. If you do have any suggestions/feedback, drop me a note.

Usage:

[thiruvel@localhost StreamingBz2Split]$ ./run.sh -h

./run.sh: [-t] [-c <chunk size> | -n <number of chunks>] [-v] [-m no_maps] -i input_dir -o output_dir
  -t - Verify integrity of the input bzip2 files. OFF by default.
  -c - Chunk size of each bzip2 split in MB, final size of gzip files may vary. 4 by default.
  -n - Number of chunks to be generated, mutually exclusive to -c. Disabled by default.
  -v - Verify rowcounts between input and output - OFF by default.
  -m - Number of Maps to be launched, default number of maps = number of files.
  -i - Input dir. The directory should exist and contain bz2 files. Other files will be ignored.
  -o - Output dir. The directory will be cleaned if it exists and the output split files in .gz
       format will be placed here. It will also be used as a scratch directory.
  -h - Print usage

Example:

  1. Split into 8 chunks: $./run.sh -i /tmp/input -o /tmp/output -n 8
  2. Split into chunks of size approx. 4MB: $./run.sh -i /tmp/input -o /tmp/output -c 4

Execution:

[thiruvel@localhost StreamingBz2Split]$ hadoop fs -ls /tmp/input
Found 1 items
-rw-r--r--   1 thiruvel supergroup   66410623 2012-03-16 03:09 /tmp/input/CB.bz2
[thiruvel@localhost StreamingBz2Split]$ 

[thiruvel@localhost StreamingBz2Split]$ ./run.sh -i /tmp/input -o /tmp/output -n 8

Deleted hdfs://localhost.:8020/tmp/output

/tmp/input/CB.bz2:/tmp/output/bz2out

packageJobJar: [splitFile.sh, splitBzip2.sh, verifyRecordCount.sh, /home/thiruvel/cluster/tmp/hadoop-    unjar1287667628951594655/] [] /tmp/streamjob3293762976681085942.jar tmpDir=null
12/03/26 04:15:53 INFO mapred.FileInputFormat: Total input paths to process : 1
12/03/26 04:15:53 INFO streaming.StreamJob: getLocalDirs(): [/home/thiruvel/cluster/tmp/mapred/local]
12/03/26 04:15:53 INFO streaming.StreamJob: Running job: job_201201082309_1228
12/03/26 04:15:53 INFO streaming.StreamJob: To kill this job, run:
12/03/26 04:15:53 INFO streaming.StreamJob: hadoop-0.20.205.0.3.1112071329/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:50300 -kill job_201201082309_1228
12/03/26 04:15:53 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201201082309_1228
12/03/26 04:15:54 INFO streaming.StreamJob:  map 0%  reduce 0%
12/03/26 04:16:10 INFO streaming.StreamJob:  map 100%  reduce 0%
12/03/26 04:16:43 INFO streaming.StreamJob:  map 100%  reduce 100%
12/03/26 04:16:43 INFO streaming.StreamJob: Job complete: job_201201082309_1228
12/03/26 04:16:43 INFO streaming.StreamJob: Output: /tmp/output/hadoop_streaming_todelete

Deleted hdfs://localhost:8020/tmp/output/hadoop_streaming_todelete
Deleted hdfs://localhost:8020/tmp/output/scratchstreaming

-rw-r--r--   1 thiruvel supergroup    9096899 2012-03-26 04:16 /tmp/output/bz2out/chunk-1-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8761934 2012-03-26 04:16 /tmp/output/bz2out/chunk-2-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8523903 2012-03-26 04:16 /tmp/output/bz2out/chunk-3-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8869790 2012-03-26 04:16 /tmp/output/bz2out/chunk-4-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8580745 2012-03-26 04:16 /tmp/output/bz2out/chunk-5-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8496121 2012-03-26 04:16 /tmp/output/bz2out/chunk-6-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8854693 2012-03-26 04:16 /tmp/output/bz2out/chunk-7-CB.bz2
-rw-r--r--   1 thiruvel supergroup    5272162 2012-03-26 04:16 /tmp/output/bz2out/chunk-8-CB.bz2

/tmp/output/bz2out/chunk-1-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-2-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-3-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-4-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-5-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-6-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-7-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-8-CB.bz2:/tmp/output

packageJobJar: [createGzipFromBzip.sh, /home/thiruvel/cluster/tmp/hadoop-unjar4063740201930314043/] [] /tmp/streamjob4343584794179189004.jar tmpDir=null
12/03/26 04:16:48 INFO mapred.FileInputFormat: Total input paths to process : 1
12/03/26 04:16:49 INFO streaming.StreamJob: getLocalDirs(): [/home/thiruvel/cluster/tmp/mapred/local]
12/03/26 04:16:49 INFO streaming.StreamJob: Running job: job_201201082309_1229
12/03/26 04:16:49 INFO streaming.StreamJob: To kill this job, run:
12/03/26 04:16:49 INFO streaming.StreamJob: hadoop-0.20.205.0.3.1112071329/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:50300 -kill job_201201082309_1229
12/03/26 04:16:49 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201201082309_1229
12/03/26 04:16:50 INFO streaming.StreamJob:  map 0%  reduce 0%
12/03/26 04:17:05 INFO streaming.StreamJob:  map 13%  reduce 0%
12/03/26 04:17:08 INFO streaming.StreamJob:  map 25%  reduce 0%
12/03/26 04:18:08 INFO streaming.StreamJob:  map 38%  reduce 0%
12/03/26 04:18:11 INFO streaming.StreamJob:  map 50%  reduce 0%
12/03/26 04:18:53 INFO streaming.StreamJob:  map 63%  reduce 0%
12/03/26 04:18:56 INFO streaming.StreamJob:  map 75%  reduce 0%
12/03/26 04:19:47 INFO streaming.StreamJob:  map 88%  reduce 0%
12/03/26 04:19:50 INFO streaming.StreamJob:  map 100%  reduce 0%
12/03/26 04:20:14 INFO streaming.StreamJob:  map 100%  reduce 100%
12/03/26 04:20:14 INFO streaming.StreamJob: Job complete: job_201201082309_1229
12/03/26 04:20:14 INFO streaming.StreamJob: Output: /tmp/output/hadoop_streaming_todelete

Deleted hdfs://localhost:8020/tmp/output/bz2out
Deleted hdfs://localhost:8020/tmp/output/hadoop_streaming_todelete
Deleted hdfs://localhost:8020/tmp/output/scratchstreaming

-rw-r--r--   1 thiruvel supergroup   17282201 2012-03-26 04:17 /tmp/output/chunk-1-CB.gz
-rw-r--r--   1 thiruvel supergroup   16683234 2012-03-26 04:17 /tmp/output/chunk-2-CB.gz
-rw-r--r--   1 thiruvel supergroup   16506643 2012-03-26 04:18 /tmp/output/chunk-3-CB.gz
-rw-r--r--   1 thiruvel supergroup   17012086 2012-03-26 04:18 /tmp/output/chunk-4-CB.gz
-rw-r--r--   1 thiruvel supergroup   16463777 2012-03-26 04:19 /tmp/output/chunk-5-CB.gz
-rw-r--r--   1 thiruvel supergroup   16322551 2012-03-26 04:19 /tmp/output/chunk-6-CB.gz
-rw-r--r--   1 thiruvel supergroup   17030241 2012-03-26 04:20 /tmp/output/chunk-7-CB.gz
-rw-r--r--   1 thiruvel supergroup   10128594 2012-03-26 04:19 /tmp/output/chunk-8-CB.gz
[thiruvel@localhost StreamingBz2Split]$ 

TODO:

1. Avoid the second hadoop job which generates gz from bz2 splits.

As this was more of a hack, I just concatenated the smaller bz2 files (rec...) to generate the split file and the metadata has to be generated appropriately for the split file which I didn't. The Unix tools (bunzip2 etc) can read the entire bz2 file though and not Hadoop. A quicker workaround is to bunzip and gzip and the second job does it. The ideal way is to construct the split file like a proper bz2 file and eliminate the second job and also making the first job faster. Thats in works.

2. Write it entirely in Java.

Currently lots of files are being produced on the local file system. Writing it entirely in Java would ensure smaller files with appropriate bz2 metadata (see #1) can be generated without writing those temporary files to disk. This tool from conceptualization to implementation took about 2 hours and shell script makes it damn easy to prototype and has been running with that. Writing in Java will also run this on Windows.

FAQ:

1. Whats the status of bzip2 splitting support on Hadoop?

HADOOP-4012 and MAPREDUCE-830 added support to handle bz2 splits in 0.21 and is now available in hadoop 2. But most organizations run Hadoop 0.20.x (current stable branches hadoop 1.0.x) and its support is not yet available. See HADOOP-7823 where the backporting effort is going on.

2. Doesn't Pig support bz2 splitting? Why this tool?

Pig can read and process bz2 files in parallel, but the limitation is that the data should be on a directory whose name should end with '.bz2'. Not many directories on HDFS are in this format in most companies and it would have been nice if Pig [0.9 as of development] supports a way to use a regular directory with bz2 files with a knob [correct me if I am wrong].

3. Can I not just distcp the data to another folder with .bz2 extension and process them using Pig?

Yes. That's another way, but I haven't measured the latency for large files.

4. How do I know if the data generated by the tool is the same as original data?

I had written a hadoop-diff to verify this. I am in the process of open sourcing that also.

5. Why do we need two jobs? Isn't one sufficient?

See TODO.1. Just to parallelize the second part - converting smaller bz2 to gz - a second job would scale well. The teams which have adopted this tool havent complained yet as this solves a big problem for them. But the second job is not required and thats in TODO now.

6. Can I run these on a Linux/Unix system?

Yes, just run the splitBzip2.sh and set the ENVIRONMENT variables at the top of the script or just export them before you run.

7. Whats the input files requirements?

The line delimiter should be '\n'. Because tools like 'tail' and 'cat' are used. And this is the way most logs are generated.

8. Are there any limitations?

Yes, if the bz2 file is large (> 3GB or something I dont remember), bzip2recover would fail. You have to recompile bz2 by increasing the number of blocks it can handle and place the recompiled bzip2recover in the same directory as the tool. We have run this tool with 5+GB bz2 files using the recompiled bzip2recover and that has worked well for us.

9. What was the intention behind the hack?

I was processing HDFS Namenode logs and these logs are large even with bz2 compression (can range to 10GB on Yahoo!'s largest clusters). Processing a year of such audit logs is going to impact our projects a lot. Hence this tool was born.