Browse files

add tests for streaming with static bson, including S3.

  • Loading branch information...
1 parent 413c976 commit da4542091f987c78fed4ca3ab7be21a043e3bf86 @mpobrien mpobrien committed Apr 29, 2013
Showing with 42 additions and 7 deletions.
  1. +42 −7 testing/run_treasury.py
View
49 testing/run_treasury.py
@@ -199,16 +199,19 @@ def runstreamingjob(hostname, params, input_collection='mongo_hadoop.yield_histo
output_collection='mongo_hadoop.yield_historical.out',
readpref="primary",
input_auth=None,
- output_auth=None):
+ output_auth=None,
+ inputpath='/tmp/in',
+ outputpath='/tmp/out',
+ inputformat='com.mongodb.hadoop.mapred.MongoInputFormat',
+ outputformat='com.mongodb.hadoop.mapred.MongoOutputFormat'):
cmd = [os.path.join(HADOOP_HOME, "bin", "hadoop")]
- cmd.append("jar")
- cmd.append('$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*')
+ cmd += ['jar','$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*']
cmd += ["-libjars", STREAMING_JARPATH]
- cmd += ["-input", '/tmp/in']
- cmd += ["-output", '/tmp/out']
- cmd += ["-inputformat", 'com.mongodb.hadoop.mapred.MongoInputFormat']
- cmd += ["-outputformat", 'com.mongodb.hadoop.mapred.MongoOutputFormat']
+ cmd += ["-input", inputpath]
+ cmd += ["-output", outputpath]
+ cmd += ["-inputformat",inputformat]
+ cmd += ["-outputformat",outputformat]
cmd += ["-io", 'mongodb']
input_uri = 'mongodb://%s%s/%s?readPreference=%s' % (input_auth + "@" if input_auth else '', hostname, input_collection, readpref)
cmd += ['-jobconf', "mongo.input.uri=%s" % input_uri]
@@ -450,6 +453,7 @@ def test_treasury(self):
results = list(out_col.find({},{'_id':1}).sort("_id"))
self.assertTrue(len(results) == 14)
+
class TestS3BSON(Standalone):
@@ -480,6 +484,37 @@ def tearDown(self):
super(TestStaticBSON, self).tearDown();
shutil.rmtree(self.temp_outdir)
+
+ @unittest.skipIf(HADOOP_RELEASE.startswith('1.0') or HADOOP_RELEASE.startswith('0.20'),
+ 'streaming not supported')
+ def test_streaming_static(self):
+ PARAMETERS = DEFAULT_PARAMETERS.copy()
+ PARAMETERS["mapred.max.split.size"] = '200000'
+ inputpath = os.path.join("file://" + self.temp_outdir, "mongo_hadoop","yield_historical.in.bson")
+ runstreamingjob(self.server_hostname,
+ inputformat="com.mongodb.hadoop.mapred.BSONFileInputFormat",
+ inputpath=inputpath,
+ params=PARAMETERS)
+ out_col = self.server.connection()['mongo_hadoop']['yield_historical.out']
+ self.assertTrue(compare_results(out_col))
+
+ @unittest.skipIf(not AWS_ACCESSKEY or not AWS_SECRET, 'AWS credentials not provided')
+ @unittest.skipIf(HADOOP_RELEASE.startswith('1.0') or HADOOP_RELEASE.startswith('0.20'),
+ 'streaming not supported')
+ def test_streaming_s3_static(self):
+ PARAMETERS = DEFAULT_PARAMETERS.copy()
+ PARAMETERS["mapred.max.split.size"] = '200000'
+ PARAMETERS["fs.s3.awsAccessKeyId"] = AWS_ACCESSKEY
+ PARAMETERS["fs.s3.awsSecretAccessKey"] = AWS_SECRET
+ inputpath = "s3n://%s:%s@mongo-test-data/yield_historical.in.bson" % (AWS_ACCESSKEY, AWS_SECRET)
+ runstreamingjob(self.server_hostname,
+ inputformat="com.mongodb.hadoop.mapred.BSONFileInputFormat",
+ inputpath=inputpath,
+ params=PARAMETERS)
+ out_col = self.server.connection()['mongo_hadoop']['yield_historical.out']
+ self.assertTrue(compare_results(out_col))
+
+
def test_treasury(self):
PARAMETERS = DEFAULT_PARAMETERS.copy()
PARAMETERS["mongo.job.input.format"] = "com.mongodb.hadoop.BSONFileInputFormat"

0 comments on commit da45420

Please sign in to comment.