Skip to content
This repository

Home 

thiruvel edited this page · 18 revisions

Pages 1

Clone this wiki locally

Copyright (c) 2012 Yahoo! Inc. All rights reserved.

Copyrights licensed under the New BSD License. See the accompanying LICENSE file for terms.

Welcome to the StreamingBz2Split wiki!

This is a hacky tool that I wrote and was adopted before I could make it lot more better. When I do have time, I will make it better. If you do have any suggestions/feedback, drop me a note.

Usage:

[thiruvel@localhost StreamingBz2Split]$ ./run.sh -h

./run.sh: [-t] [-c <chunk size> | -n <number of chunks>] [-v] [-m no_maps] -i input_dir -o output_dir
  -t - Verify integrity of the input bzip2 files. OFF by default.
  -c - Chunk size of each bzip2 split in MB, final size of gzip files may vary. 4 by default.
  -n - Number of chunks to be generated, mutually exclusive to -c. Disabled by default.
  -v - Verify rowcounts between input and output - OFF by default.
  -m - Number of Maps to be launched, default number of maps = number of files.
  -i - Input dir. The directory should exist and contain bz2 files. Other files will be ignored.
  -o - Output dir. The directory will be cleaned if it exists and the output split files in .gz
       format will be placed here. It will also be used as a scratch directory.
  -h - Print usage

Example:

  1. Split into 8 chunks: $./run.sh -i /tmp/input -o /tmp/output -n 8
  2. Split into chunks of size approx. 4MB: $./run.sh -i /tmp/input -o /tmp/output -c 4

Execution:

[thiruvel@localhost StreamingBz2Split]$ hadoop fs -ls /tmp/input
Found 1 items
-rw-r--r--   1 thiruvel supergroup   66410623 2012-03-16 03:09 /tmp/input/CB.bz2
[thiruvel@localhost StreamingBz2Split]$ 

[thiruvel@localhost StreamingBz2Split]$ ./run.sh -i /tmp/input -o /tmp/output -n 8

Deleted hdfs://localhost.:8020/tmp/output

/tmp/input/CB.bz2:/tmp/output/bz2out

packageJobJar: [splitFile.sh, splitBzip2.sh, verifyRecordCount.sh, /home/thiruvel/cluster/tmp/hadoop-    unjar1287667628951594655/] [] /tmp/streamjob3293762976681085942.jar tmpDir=null
12/03/26 04:15:53 INFO mapred.FileInputFormat: Total input paths to process : 1
12/03/26 04:15:53 INFO streaming.StreamJob: getLocalDirs(): [/home/thiruvel/cluster/tmp/mapred/local]
12/03/26 04:15:53 INFO streaming.StreamJob: Running job: job_201201082309_1228
12/03/26 04:15:53 INFO streaming.StreamJob: To kill this job, run:
12/03/26 04:15:53 INFO streaming.StreamJob: hadoop-0.20.205.0.3.1112071329/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:50300 -kill job_201201082309_1228
12/03/26 04:15:53 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201201082309_1228
12/03/26 04:15:54 INFO streaming.StreamJob:  map 0%  reduce 0%
12/03/26 04:16:10 INFO streaming.StreamJob:  map 100%  reduce 0%
12/03/26 04:16:43 INFO streaming.StreamJob:  map 100%  reduce 100%
12/03/26 04:16:43 INFO streaming.StreamJob: Job complete: job_201201082309_1228
12/03/26 04:16:43 INFO streaming.StreamJob: Output: /tmp/output/hadoop_streaming_todelete

Deleted hdfs://localhost:8020/tmp/output/hadoop_streaming_todelete
Deleted hdfs://localhost:8020/tmp/output/scratchstreaming

-rw-r--r--   1 thiruvel supergroup    9096899 2012-03-26 04:16 /tmp/output/bz2out/chunk-1-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8761934 2012-03-26 04:16 /tmp/output/bz2out/chunk-2-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8523903 2012-03-26 04:16 /tmp/output/bz2out/chunk-3-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8869790 2012-03-26 04:16 /tmp/output/bz2out/chunk-4-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8580745 2012-03-26 04:16 /tmp/output/bz2out/chunk-5-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8496121 2012-03-26 04:16 /tmp/output/bz2out/chunk-6-CB.bz2
-rw-r--r--   1 thiruvel supergroup    8854693 2012-03-26 04:16 /tmp/output/bz2out/chunk-7-CB.bz2
-rw-r--r--   1 thiruvel supergroup    5272162 2012-03-26 04:16 /tmp/output/bz2out/chunk-8-CB.bz2

/tmp/output/bz2out/chunk-1-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-2-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-3-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-4-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-5-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-6-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-7-CB.bz2:/tmp/output
/tmp/output/bz2out/chunk-8-CB.bz2:/tmp/output

packageJobJar: [createGzipFromBzip.sh, /home/thiruvel/cluster/tmp/hadoop-unjar4063740201930314043/] [] /tmp/streamjob4343584794179189004.jar tmpDir=null
12/03/26 04:16:48 INFO mapred.FileInputFormat: Total input paths to process : 1
12/03/26 04:16:49 INFO streaming.StreamJob: getLocalDirs(): [/home/thiruvel/cluster/tmp/mapred/local]
12/03/26 04:16:49 INFO streaming.StreamJob: Running job: job_201201082309_1229
12/03/26 04:16:49 INFO streaming.StreamJob: To kill this job, run:
12/03/26 04:16:49 INFO streaming.StreamJob: hadoop-0.20.205.0.3.1112071329/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:50300 -kill job_201201082309_1229
12/03/26 04:16:49 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201201082309_1229
12/03/26 04:16:50 INFO streaming.StreamJob:  map 0%  reduce 0%
12/03/26 04:17:05 INFO streaming.StreamJob:  map 13%  reduce 0%
12/03/26 04:17:08 INFO streaming.StreamJob:  map 25%  reduce 0%
12/03/26 04:18:08 INFO streaming.StreamJob:  map 38%  reduce 0%
12/03/26 04:18:11 INFO streaming.StreamJob:  map 50%  reduce 0%
12/03/26 04:18:53 INFO streaming.StreamJob:  map 63%  reduce 0%
12/03/26 04:18:56 INFO streaming.StreamJob:  map 75%  reduce 0%
12/03/26 04:19:47 INFO streaming.StreamJob:  map 88%  reduce 0%
12/03/26 04:19:50 INFO streaming.StreamJob:  map 100%  reduce 0%
12/03/26 04:20:14 INFO streaming.StreamJob:  map 100%  reduce 100%
12/03/26 04:20:14 INFO streaming.StreamJob: Job complete: job_201201082309_1229
12/03/26 04:20:14 INFO streaming.StreamJob: Output: /tmp/output/hadoop_streaming_todelete

Deleted hdfs://localhost:8020/tmp/output/bz2out
Deleted hdfs://localhost:8020/tmp/output/hadoop_streaming_todelete
Deleted hdfs://localhost:8020/tmp/output/scratchstreaming

-rw-r--r--   1 thiruvel supergroup   17282201 2012-03-26 04:17 /tmp/output/chunk-1-CB.gz
-rw-r--r--   1 thiruvel supergroup   16683234 2012-03-26 04:17 /tmp/output/chunk-2-CB.gz
-rw-r--r--   1 thiruvel supergroup   16506643 2012-03-26 04:18 /tmp/output/chunk-3-CB.gz
-rw-r--r--   1 thiruvel supergroup   17012086 2012-03-26 04:18 /tmp/output/chunk-4-CB.gz
-rw-r--r--   1 thiruvel supergroup   16463777 2012-03-26 04:19 /tmp/output/chunk-5-CB.gz
-rw-r--r--   1 thiruvel supergroup   16322551 2012-03-26 04:19 /tmp/output/chunk-6-CB.gz
-rw-r--r--   1 thiruvel supergroup   17030241 2012-03-26 04:20 /tmp/output/chunk-7-CB.gz
-rw-r--r--   1 thiruvel supergroup   10128594 2012-03-26 04:19 /tmp/output/chunk-8-CB.gz
[thiruvel@localhost StreamingBz2Split]$ 

TODO:

1. Avoid the second hadoop job which generates gz from bz2 splits.

As this was more of a hack, I just concatenated the smaller bz2 files (rec...) to generate a larger one and didnt fix the header of the split file. Without that the Unix tools (bunzip2 etc) can read the entire bz2 file and not Hadoop. Another hack is to bunzip and gzip. The ideal way is to generate the bz2 header for split file and append smaller files, but this was just a start.

FAQ:

1. Whats the status of bzip2 splitting support on Hadoop?

HADOOP-4012 and MAPREDUCE-830 added support to handle bz2 splits in 0.21 and is now available in hadoop 2. But most organizations run Hadoop 0.20.x (current stable branches hadoop 1.0.x) and its support is not yet available. See HADOOP-7823 where the backporting effort is going on.

2. Doesn't Pig support bz2 splitting? Why this tool?

Pig can read and process bz2 files in parallel, but the limitation is that the data should be on a directory whose name should end with '.bz2'. Not many directories on HDFS are in this format in most companies and it would have been nice if Pig [0.9 as of development] supports a way to use a regular directory with bz2 files with a knob [correct me if I am wrong].

3. Can I not just distcp the data to another folder with .bz2 extension and process them using Pig?

Yes. That's another way.

4. How do I know if the data generated by the tool is the same as original data?

I had written a hadoop-diff to verify this. I am in the process of open sourcing that also.

5. Why do we need two jobs? Isn't one sufficient?

See TODO.1. Just to parallelize the second part - converting smaller bz2 to gz - a second job would scale well.

6. Can I run these on a Linux/Unix system?

Yes, just run the splitBzip2.sh and set the ENVIRONMENT variables appropriately.

7. Are there any limitations?

Yes, if the bz2 file is large (> 3GB or something I dont remember), bzip2recover would fail. You have to recompile bz2 by increasing the number of blocks it can handle and place the recompiled bzip2recover in the same directory as the tool. We have run this tool with 5+GB bz2 files using the recompiled bzip2recover and that has worked well for us.

Something went wrong with that request. Please try again.