diff --git a/testing/build.sh b/testing/build.sh new file mode 100755 index 00000000..5a1d87fa --- /dev/null +++ b/testing/build.sh @@ -0,0 +1,26 @@ +#!/bin/sh + + +#HADOOPVER="0.23.x" +HADOOPVER="1.0.x" +sed "s/hadoopRelease.*/hadoopRelease in ThisBuild := \"$HADOOPVER\"/g" ../build.sbt > ../build2.sbt +mv ../build2.sbt ../build.sbt +cd .. +#./sbt clean +./sbt package +./sbt treasury-example/package +#export HADOOP_HOME="/Users/mike/hadoop/hadoop-0.20.2-cdh3u5" +export HADOOP_HOME="/Users/mike/hadoop/hadoop-1.0.4" +#/hadoop-1.0.4" + +# +rm $HADOOP_HOME/lib/mongo-hadoop*.jar +cp core/target/mongo-hadoop-core_*.jar $HADOOP_HOME/lib/ +cp target/mongo-hadoop_*.jar $HADOOP_HOME/lib/ +#$HADOOP_HOME/bin/hadoop jar examples/treasury_yield/target/treasury-example_*.jar com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig +#target/mongo-hadoop_0.20.205.0-1.1.0-SNAPSHOT.jar flume/target/mongo-flume-1.1.0-SNAPSHOT.jar examples/treasury_yield/target/treasury-example_0.20.205.0-1.1.0-SNAPSHOT.jar pig/target/mongo-hadoop-pig_0.20.205.0-1.1.0-SNAPSHOT.jar + + + + + diff --git a/testing/mongo_manager.py b/testing/mongo_manager.py new file mode 100644 index 00000000..4fdc823b --- /dev/null +++ b/testing/mongo_manager.py @@ -0,0 +1,633 @@ +try: + import json +except ImportError: + import simplejson as json +import logging +import os +import random +import shutil +import signal +import socket +import subprocess +import sys +import time + +import pymongo + +hostname = 'localhost' #socket.gethostname() +start_port = int(os.environ.get('DB_PORT', 4000)) +standalone_count = 0 +mongos_count = 0 +mongod = os.environ.get('MONGOD', 'mongod') +mongos = os.environ.get('MONGOS', 'mongos') +mongoimport = os.environ.get('MONGOIMPORT', 'mongoimport') +print "> MongoDB Path: %s " % mongod +replsets = 0 + +def mongo_import(host, db, collection, filename): + cmd = [mongoimport, + '--host', str( host ), + '--db', db, + '--collection', collection, + '--file', filename] + subprocess.call(cmd) + +class MongosManager(object): + def __init__(self, port=None, home=None): + global start_port, replsets, mongos_count + self.home = home + if not port: + self.port = start_port + start_port += 1 + if not self.home: + self.home = os.environ.get("HOME") + self.name = "mongos" + str(mongos_count) + mongos_count += 1 + self.logpath = os.path.join(self.home, "log_" + self.name) + self.dbpath = os.path.join(self.home, "data_" + self.name) + + def connection(self): + return pymongo.Connection('localhost:' + str(self.port)) + + def start_mongos(self, confighost, shards, noauth=False, fresh=True, addShards=False): + global mongos + self.host = '%s:%d' % (hostname, self.port) + if fresh: + try: + logging.info("Deleting dbpth '%s'" % self.dbpath) + shutil.rmtree(self.dbpath) + except OSError: + # dbpath doesn't exist yet + pass + + try: + logging.info("Deleting logpath '%s'" % self.logpath) + shutil.rmtree(self.logpath) + except OSError: + # logpath doesn't exist yet + pass + + if not os.path.exists(self.home): + os.makedirs(self.home) + os.chdir(self.home) + + self.start_time = time.time() + self.info = {} + host = '%s:%d' % (hostname, self.port) + path = os.path.join(self.dbpath, 'db' + str(self.port)) + if not os.path.exists(path): + logging.info("Making Data directory: %s" % path) + os.makedirs(path) + + member_logpath = os.path.join(self.logpath, 'db' + str(self.port) + '.log') + if not os.path.exists(os.path.dirname(member_logpath)): + os.makedirs(os.path.dirname(member_logpath)) + + keyFilePath = os.path.join(path, 'keyFile') + keyFile = open(keyFilePath, 'w+') + keyFile.write('password1') + keyFile.close() + os.chmod(keyFilePath, int('0600', 8)) + + cmd = [ + mongos, + '--configdb', confighost, + '--port', str(self.port), + '--logpath', member_logpath, + #'--nohttpinterface', + '-vvvvv', + '--ipv6', + ] + + if not noauth: + cmd.extend(['--keyFile', keyFilePath,]) + + print "starting", cmd + logging.info('Starting %s' % ' '.join(cmd)) + proc = subprocess.Popen(cmd, + stderr=subprocess.STDOUT) + res = self.wait_for(proc, self.port) + self.pid = proc.pid + print "pid is", self.pid + + if addShards: + mongos_cxn = pymongo.Connection(self.host) + admin = mongos_cxn['admin'] + for shard in shards: + admin.command("addShard", shard) + + if not res: + raise Exception("Couldn't execute %s" % ' '.join(cmd)) + else: + return host; + + def wait_for(self, proc, port): + trys = 0 + return_code = proc.poll() + while return_code is None: # and trys < 100: # ~25 seconds + trys += 1 + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + try: + s.connect((hostname, port)) + return True + except (IOError, socket.error): + time.sleep(0.25) + finally: + s.close() + return_code = proc.poll() + + kill_all_members() #TODO + return False + + def kill_all_members(self, sig=2): + if self.pid is not None: + print "killing ", self.pid + if sys.platform in ('win32', 'cygwin'): + os.kill(self.pid, signal.CTRL_C_EVENT) + else: + os.kill(self.pid, sig) + + + + + +class StandaloneManager(object): + def __init__(self, port=None, home=None): + global start_port, replsets, standalone_count + self.home = home + if not port: + self.port = start_port + start_port += 1 + if not self.home: + self.home = os.environ.get("HOME") + self.name = "standalone" + str(standalone_count) + standalone_count += 1 + self.logpath = os.path.join(self.home, "log_" + self.name) + self.dbpath = os.path.join(self.home, "data_" + self.name) + + def connection(self): + return pymongo.Connection('localhost:' + str(self.port)) + + def wait_for(self, proc, port): + trys = 0 + return_code = proc.poll() + + while return_code is None: # and trys < 100: # ~25 seconds + trys += 1 + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + try: + s.connect((hostname, port)) + return True + except (IOError, socket.error): + time.sleep(0.25) + finally: + s.close() + + return_code = proc.poll() + kill_all_members() #TODO + return False + + def kill_all_members(self, sig=2): + if self.pid is not None: + print "killing ", self.pid + if sys.platform in ('win32', 'cygwin'): + os.kill(self.pid, signal.CTRL_C_EVENT) + else: + os.kill(self.pid, sig) + + def start_mongod(self, port, noauth=False): + self.host = '%s:%d' % (hostname, port) + path = os.path.join(self.dbpath, 'db' + str(port)) + if not os.path.exists(path): + logging.info("Making Data directory: %s" % path) + os.makedirs(path) + + keyFilePath = os.path.join(path, 'keyFile') + keyFile = open(keyFilePath, 'w+') + keyFile.write('password1') + keyFile.close() + os.chmod(keyFilePath, int('0600', 8)) + + member_logpath = os.path.join(self.logpath, 'db' + str(port) + '.log') + if not os.path.exists(os.path.dirname(member_logpath)): + logging.info("Making Log directory: %s" % os.path.dirname(member_logpath)) + os.makedirs(os.path.dirname(member_logpath)) + cmd = [ + mongod, + '--dbpath', path, + '--port', str(port), + '--logpath', member_logpath, + '--nojournal', + # Various attempts to make startup faster on Mac by limiting + # the size of files created at startup + '--nohttpinterface', + '--noprealloc', + '--smallfiles', + '--nssize', '1', + '-vvvvv', + # Ensure that Mongo starts with all its features turned on so we + # can test them! + '--ipv6', + ] + + if not noauth: + cmd.extend(['--auth', + '--keyFile', keyFilePath,]) + + logging.info('Starting %s' % ' '.join(cmd)) + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + res = self.wait_for(proc, port) + self.pid = proc.pid + print "pid is", self.pid + if not res: + raise Exception("Couldn't execute %s" % ' '.join(cmd)) + else: + return self.host; + + def start_server(self,fresh=False): + if fresh: + try: + logging.info("Deleting dbpth '%s'" % self.dbpath) + shutil.rmtree(self.dbpath) + except OSError: + # dbpath doesn't exist yet + pass + + try: + logging.info("Deleting logpath '%s'" % self.logpath) + shutil.rmtree(self.logpath) + except OSError: + # logpath doesn't exist yet + pass + + # If we just deleted the working dir (because it was equal to, or + # beneath, the dbpath or logpath), then recreate the working directory + # and change to the new copy of it. + if not os.path.exists(self.home): + os.makedirs(self.home) + os.chdir(self.home) + + self.start_time = time.time() + self.info = {} + self.host = self.start_mongod(self.port) + return self.host + +class ReplicaSetManager(object): + + def __init__(self, port=None, name=None, home=None, with_arbiter=True, num_members=3, master_slave=False, noauth=False): + global start_port, replsets + self.home = home + self.num_members = num_members + self.with_arbiter = with_arbiter + self.master_slave = master_slave + self.noauth = noauth + self.members_info = {} + + if not name: + self.name = "replset" + str(replsets) + replsets += 1 + if not port: + self.port = start_port + start_port += num_members + else: + self.port = int(port) + if not self.home: + self.home = os.environ.get("HOME") + self.logpath = os.path.join(self.home, "log_" + self.name) + self.dbpath = os.path.join(self.home, "data_" + self.name) + + def get_shard_string(self): + return self.name + "/" + ",".join([host for host in self.members_info.keys()]) + + def kill_members(self, members, sig=2): + for member in members: + if member not in self.members_info: + # We killed this member earlier, but it's still in the + # replSetGetStatus() output as 'stateStr: (not reachable/healthy)' + continue + + try: + pid = self.members_info[member]['pid'] + logging.info('Killing pid %s' % pid) + # Not sure if cygwin makes sense here... + if sys.platform in ('win32', 'cygwin'): + os.kill(pid, signal.CTRL_C_EVENT) + else: + os.kill(pid, sig) + + # Make sure it's dead + os.waitpid(pid, 0) + logging.info('Killed.') + except OSError: + pass # already dead + + del self.members_info[member] + + def kill_all_members(self): + self.kill_members(self.members_info.keys()) + + def kill_primary(self): + primary = self.get_primary() + self.kill_members(primary) + return primary + + def kill_secondary(self): + secondary = self.get_random_secondary() + self.kill_members(secondary) + return secondary + + def kill_all_secondaries(self): + secondaries = self.get_all_secondaries() + self.kill_members(secondaries) + return secondaries + + def get_master(self): + return [ + name for name, info in self.members_info.items() + if info.get('master') + ] + + def get_slaves(self): + return [ + name for name, info in self.members_info.items() + if info.get('slave') + ] + + def get_primary(self): + return self.get_members_in_state(1) + + def get_random_secondary(self): + secondaries = self.get_members_in_state(2) + if len(secondaries): + return [secondaries[random.randrange(0, len(secondaries))]] + return secondaries + + def get_secondaries(self): + return self.get_members_in_state(2) + + def get_arbiters(self): + return self.get_members_in_state(7) + + def wait_for(self, proc, port): + trys = 0 + return_code = proc.poll() + if return_code is not None: return True + while return_code is None: # and trys < 100: # ~25 seconds + trys += 1 + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + try: + s.connect((hostname, port)) + return True + except (IOError, socket.error): + time.sleep(0.25) + finally: + s.close() + + return_code = proc.poll() + + logging.error( + "Couldn't start member on port %s, return_code = %s, killing other" \ + " members and quitting" % (port, return_code) + ) + + kill_all_members() #TODO + return False + + + def start_member( + self, + port, + _id, + arbiter=False, + master=False, + slave=False, + source_port=None, + noauth=False, + extras={} + ): + """ + Start a replica-set member on port. + @param port: Int, the port to start this member on + @param _id: Unique int + @param arbiter: Bool, whether to make this an arbiter + @param master: Whether this is a master in a master-slave pair + @param slave: Whether this is a slave in a master-slave pair + @param source_port: If this is a slave, the master's port number + @return: Member info, suitable as part of replSetInitiate() + """ + host = '%s:%d' % (hostname, port) + path = os.path.join(self.dbpath, 'db' + str(port)) + if not os.path.exists(path): + logging.info("Making Data directory: %s" % path) + os.makedirs(path) + + # Create a keyFile for the --auth argument to mongod + keyFilePath = os.path.join(path, 'keyFile') + keyFile = open(keyFilePath, 'w+') + keyFile.write('password1') + keyFile.close() + os.chmod(keyFilePath, int('0600', 8)) + + member_logpath = os.path.join(self.logpath, 'db' + str(port) + '.log') + if not os.path.exists(os.path.dirname(member_logpath)): + logging.info("Making Log directory: %s" % os.path.dirname(member_logpath)) + os.makedirs(os.path.dirname(member_logpath)) + cmd = [ + mongod, + '--dbpath', path, + '--port', str(port), + '--logpath', member_logpath, + '--nojournal', + # Various attempts to make startup faster on Mac by limiting + # the size of files created at startup + '--nohttpinterface', + '--noprealloc', + '--smallfiles', + '--nssize', '1', + '--oplogSize', '150', # 150MB oplog, not 5% of disk + '-vvvvv', + # Ensure that Mongo starts with all its features turned on so we + # can test them! + '--ipv6', + ] + # If auth is enabled (default), add auth mode + if not noauth: + cmd.extend(['--auth', + '--keyFile', keyFilePath,]) + + assert not (master and slave), ( + "Conflicting arguments: can't be both master and slave" + ) + + assert bool(source_port) == bool(slave), ( + "Conflicting arguments: must provide 'slave' and 'source_port' or" + " neither" + ) + if master: + cmd.append('--master') + elif slave: + cmd += ['--slave', '--source', 'localhost:' + str(source_port)] + else: + cmd += ['--replSet', self.name] + + logging.info('Starting %s' % ' '.join(cmd)) + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + res = self.wait_for(proc, port) + if not res: + raise Exception("Couldn't execute %s" % ' '.join(cmd)) + else: + # Do TWO things!! + # 1) Add member's info to global_state, in a format that rs_manager + # understands, so we can save it to disk at the end of this script + # and read it later + # 2) Return member's info for use in replSetInitiate() in a format + # the Mongo server understands + self.members_info[host] ={ + '_id': _id, + 'pid': proc.pid, + 'cmd': cmd, + 'logpath': member_logpath, + 'host': hostname, + 'port': port, + 'arbiter': arbiter, + 'master': master, + 'slave': slave, + 'source_port': source_port, + } + + configitem = { '_id': _id, 'host': host, 'arbiterOnly': arbiter } + configitem.update(extras) + return configitem + + def start_set(self, fresh=False, restart=False, extra_options={}): + if fresh: + try: + logging.info("Deleting dbpth '%s'" % self.dbpath) + shutil.rmtree(self.dbpath) + except OSError: + # dbpath doesn't exist yet + pass + + try: + logging.info("Deleting logpath '%s'" % self.logpath) + shutil.rmtree(self.logpath) + except OSError: + # logpath doesn't exist yet + pass + + # If we just deleted the working dir (because it was equal to, or + # beneath, the dbpath or logpath), then recreate the working directory + # and change to the new copy of it. + if not os.path.exists(self.home): + logging.info( + "I deleted my own working directory '%s', recreating it and " + "chdir'ing to it" % self.home + ) + os.makedirs(self.home) + os.chdir(self.home) + + self.start_time = time.time() + self.members = [] + for i in xrange(self.num_members): + cur_port = self.port + i + + # If this is the last member we're starting, maybe make it arbiter + arbiter = ( self.with_arbiter and (i==self.num_members-1)) + master = ( self.master_slave and i == 0) + slave = ( self.master_slave and i > 0) + + source_port = self.port if slave else None + + # start_member() adds the member to global_state, and gives us some + # JSON to include in replSetInitiate() + member_info = self.start_member( + port=cur_port, _id=i, arbiter=arbiter, master=master, slave=slave, + source_port=source_port, noauth=self.noauth, extras=extra_options.get(i,{}) + ) + self.members.append(member_info) + + self.config = {'_id': self.name, 'members': self.members} + self.primary = self.members[0]['host'] + + if self.master_slave: + # Is there any way to verify the pair is up? + pass + else: + # Honestly, what's the hurry? members take a *long* time to start on EC2 + time.sleep(1) + c = pymongo.Connection(self.primary) + + logging.info('Initiating replica set....') + if not restart: + c.admin.command('replSetInitiate', self.config) + + # Wait for all members to come online + expected_secondaries = self.num_members - 1 + if self.with_arbiter: expected_secondaries -= 1 + expected_arbiters = 1 if self.with_arbiter else 0 + + while True: + time.sleep(2) + + try: + if ( + len(self.get_primary()) == 1 and + len(self.get_secondaries()) == expected_secondaries and + len(self.get_arbiters()) == expected_arbiters + ): + break + except pymongo.errors.AutoReconnect: + # Keep waiting + pass + logging.info('Started %s members in %s seconds' % ( + self.num_members, int(time.time() - self.start_time) + )) + + return self.primary + + def get_members_in_state(self, state): + """ + @param state: A state constant (1, 2, 3, ...) or 'any' + @return: RS members currently in that state, e.g.: + ['localhost:4000', 'localhost:4001'] + """ + if not self.members: + logging.warning('No running members!') + return [] + + num_retries = 10 + while True: + try: + c = pymongo.Connection([m['host'] for m in self.members]) + break + except (pymongo.errors.AutoReconnect, pymongo.errors.ConnectionFailure) as e: + # Special case: all nodes down, or only arbiter (to which we are + # banned from connecting) is up + if state == 'any': + return global_state['members'].keys() + else: + print "failed", e + num_retries -= 1 + if num_retries == 0: + raise + else: + time.sleep(5) + continue + + try: + status = c.admin.command('replSetGetStatus') + members = status['members'] + return [ + k['name'] for k in members + if (k['state'] == state or state == 'any') + ] + except pymongo.errors.PyMongoError, e: + logging.warning(e) + return [] diff --git a/testing/replsetmaker.py b/testing/replsetmaker.py new file mode 100644 index 00000000..f1132b8f --- /dev/null +++ b/testing/replsetmaker.py @@ -0,0 +1,5 @@ +import mongo_manager +x = mongo_manager.ReplicaSetManager(home="/tmp/rs0", with_arbiter=True, num_members=3) +x.start_set(fresh=True) +primary = x.get_primary()[0] +mongo_manager.mongo_import(primary, "mongo_hadoop", "yield_historical.in", "/Users/mike/projects/mongo-hadoop/examples/treasury_yield/src/main/resources/yield_historical_in.json") diff --git a/testing/run_treasury.py b/testing/run_treasury.py new file mode 100644 index 00000000..e4990407 --- /dev/null +++ b/testing/run_treasury.py @@ -0,0 +1,136 @@ +#!/bin/env python +import unittest +import mongo_manager +import subprocess +import os +import time +HADOOP_HOME="/Users/mike/hadoop/hadoop-1.0.4" +#declare -a job_args +#cd .. + +parameters = { + "mongo.job.verbose":"true", + "mongo.job.background":"false", + #"mongo.input.key":"", + #"mongo.input.query":"", + "mongo.job.mapper":"com.mongodb.hadoop.examples.treasury.TreasuryYieldMapper", + "mongo.job.reducer":"com.mongodb.hadoop.examples.treasury.TreasuryYieldReducer", + "mongo.job.input.format":"com.mongodb.hadoop.MongoInputFormat", + "mongo.job.output.format":"com.mongodb.hadoop.MongoOutputFormat", + "mongo.job.output.key":"org.apache.hadoop.io.IntWritable", + "mongo.job.output.value":"org.apache.hadoop.io.DoubleWritable", + "mongo.job.mapper.output.key":"org.apache.hadoop.io.IntWritable", + "mongo.job.mapper.output.value":"org.apache.hadoop.io.DoubleWritable", + "mongo.job.combiner":"com.mongodb.hadoop.examples.treasury.TreasuryYieldReducer", + "mongo.job.partitioner":"", + "mongo.job.sort_comparator":"", +} + + +class TestBasic(unittest.TestCase): + + def setUp(self): + self.server = mongo_manager.StandaloneManager(home="/tmp/standalone1") + self.server_hostname = self.server.start_server(fresh=True) + self.server.connection().drop_database('mongo_hadoop') + mongo_manager.mongo_import(self.server_hostname, + "mongo_hadoop", + "yield_historical.in", + "/Users/mike/projects/mongo-hadoop/examples/treasury_yield/src/main/resources/yield_historical_in.json") + print "server is ready." + + def test_treasury(self): + print "running treasury example test." + cmd = [os.path.join(HADOOP_HOME, "bin", "hadoop")] + cmd.append("jar") + cmd.append("/Users/mike/projects/mongo-hadoop/examples/treasury_yield/target/treasury-example_1.0.3-1.1.0-SNAPSHOT.jar") + #cmd.append("com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig") + + for key, val in parameters.items(): + cmd.append("-D") + cmd.append(key + "=" + val) + + cmd.append("-D") + cmd.append("mongo.input.uri=mongodb://%s/mongo_hadoop.yield_historical.in" % self.server_hostname) + cmd.append("-D") + cmd.append("mongo.output.uri=mongodb://%s/mongo_hadoop.yield_historical.out" % self.server_hostname) + + subprocess.call(cmd) + self.assertEqual( self.server.connection()['mongo_hadoop']['yield_historical.out'].count(), 21 ) + print ' '.join(cmd) + + def tearDown(self): + print "killing it!" + self.server.kill_all_members() + + + + +class TestSharded(unittest.TestCase): + + def setUp(self): + self.shard1 = mongo_manager.ReplicaSetManager(home="/tmp/rs0", with_arbiter=True, num_members=3) + self.shard1.start_set(fresh=True) + self.shard2 = mongo_manager.ReplicaSetManager(home="/tmp/rs1", with_arbiter=True, num_members=3) + self.shard2.start_set(fresh=True) + self.configdb = mongo_manager.StandaloneManager(home="/tmp/config_db") + self.confighost = self.configdb.start_server(fresh=True) + + self.mongos = mongo_manager.MongosManager(home="/tmp/mongos") + self.mongos_hostname = self.mongos.start_mongos(self.confighost, [h.get_shard_string() for h in (self.shard1,self.shard2)], + noauth=False, fresh=True, addShards=True) + + self.mongos_connection = self.mongos.connection() + self.mongos_connection.drop_database('mongo_hadoop') + mongo_manager.mongo_import(self.mongos_hostname, + "mongo_hadoop", + "yield_historical.in", + "/Users/mike/projects/mongo-hadoop/examples/treasury_yield/src/main/resources/yield_historical_in.json") + self.mongos_connection['admin'].command("enablesharding", "mongo_hadoop") + self.mongos_connection['admin'].command("shardCollection", "mongo_hadoop.yield_historical.in", key={"_id":1}) + self.mongos_connection['admin'].command("split", "mongo_hadoop.yield_historical.in", find={"_id":1}) + + def test_treasury(self): + print "running treasury example test." + cmd = [os.path.join(HADOOP_HOME, "bin", "hadoop")] + cmd.append("jar") + cmd.append("/Users/mike/projects/mongo-hadoop/examples/treasury_yield/target/treasury-example_1.0.3-1.1.0-SNAPSHOT.jar") + #cmd.append("com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig") + + for key, val in parameters.items(): + cmd.append("-D") + cmd.append(key + "=" + val) + + cmd.append("-D") + cmd.append("mongo.input.uri=mongodb://%s/mongo_hadoop.yield_historical.in?readPreference=primary" % self.mongos_hostname) + cmd.append("-D") + cmd.append("mongo.output.uri=mongodb://%s/mongo_hadoop.yield_historical.out" % self.mongos_hostname) + + subprocess.call(cmd) + self.assertEqual( self.mongos_connection['mongo_hadoop']['yield_historical.out'].count(), 21 ) + print ' '.join(cmd) + + def tearDown(self): + print "killing servers!" + self.mongos.kill_all_members() + self.shard1.kill_all_members() + self.shard2.kill_all_members() + self.configdb.kill_all_members() + +#def testtreasury(): + #cmd = [os.path.join(HADOOP_HOME, "bin", "hadoop")] + #cmd.append("jar") + #cmd.append("/Users/mike/projects/mongo-hadoop/examples/treasury_yield/target/treasury-example_1.0.3-1.1.0-SNAPSHOT.jar") + + #for key, val in parameters.items(): + #cmd.append("-D") + #cmd.append(key + "=" + val) +# + #cmd.append("-D") + #cmd.append("mongo.input.uri=mongodb://localhost:4007/mongo_hadoop.yield_historical.in") + #cmd.append("-D") + #cmd.append("mongo.output.uri=mongodb://localhost:4007/mongo_hadoop.yield_historical.out") + + #subprocess.call(cmd) + +if __name__ == '__main__': testtreasury() diff --git a/testing/run_treasury.sh b/testing/run_treasury.sh new file mode 100755 index 00000000..3555e6d9 --- /dev/null +++ b/testing/run_treasury.sh @@ -0,0 +1,25 @@ +#!/bin/sh +export HADOOP_HOME="/Users/mike/hadoop/hadoop-1.0.4" +declare -a job_args +cd .. +job_args=("jar" "examples/treasury_yield/target/treasury-example_*.jar") +job_args=(${job_args[@]} "-D" "mongo.job.verbose=true") +job_args=(${job_args[@]} "-D" "mongo.job.background=false") +job_args=(${job_args[@]} "-D" "mongo.input.key=") +job_args=(${job_args[@]} "-D" "mongo.input.uri=mongodb://127.0.0.1:4000,127.0.0.1:4001/mongo_hadoop.yield_historical.in?readPreference=secondary&replicaSet=replset0") +job_args=(${job_args[@]} "-D" "mongo.output.uri=mongodb://127.0.0.1:4000/mongo_hadoop.yield_historical.out") +job_args=(${job_args[@]} "-D" "mongo.input.query=") +job_args=(${job_args[@]} "-D" "mongo.job.mapper=com.mongodb.hadoop.examples.treasury.TreasuryYieldMapper") +job_args=(${job_args[@]} "-D" "mongo.job.reducer=com.mongodb.hadoop.examples.treasury.TreasuryYieldReducer") +job_args=(${job_args[@]} "-D" "mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat") +job_args=(${job_args[@]} "-D" "mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat") +job_args=(${job_args[@]} "-D" "mongo.job.output.key=org.apache.hadoop.io.IntWritable") +job_args=(${job_args[@]} "-D" "mongo.job.output.value=org.apache.hadoop.io.DoubleWritable") +job_args=(${job_args[@]} "-D" "mongo.job.mapper.output.key=org.apache.hadoop.io.IntWritable") +job_args=(${job_args[@]} "-D" "mongo.job.mapper.output.value=org.apache.hadoop.io.DoubleWritable") +job_args=(${job_args[@]} "-D" "mongo.job.combiner=com.mongodb.hadoop.examples.treasury.TreasuryYieldReducer") +job_args=(${job_args[@]} "-D" "mongo.job.partitioner=") +job_args=(${job_args[@]} "-D" "mongo.job.sort_comparator=") + +#echo "${job_args[@]}" +$HADOOP_HOME/bin/hadoop "${job_args[@]}" "$1" diff --git a/testing/shardmaker.py b/testing/shardmaker.py new file mode 100644 index 00000000..094e4d56 --- /dev/null +++ b/testing/shardmaker.py @@ -0,0 +1,25 @@ +import mongo_manager, sys + +try: + shard1 = mongo_manager.ReplicaSetManager(home="/tmp/rs0", with_arbiter=True, num_members=3) + shard1.start_set(fresh=True) + + shard2 = mongo_manager.ReplicaSetManager(home="/tmp/rs1", with_arbiter=True, num_members=3) + shard2.start_set(fresh=True) + + # config server + z = mongo_manager.StandaloneManager(home="/tmp/config_db") + zhost = z.start_server(fresh=True) + + s = mongo_manager.MongosManager(home="/tmp/mongos") + s.start_mongos(zhost, [h.get_shard_string() for h in (shard1,shard2)], noauth=False, fresh=True, addShards=True) + + mongo_manager.mongo_import(s.port, "testdb", "testcoll", "/Users/mike/projects/mongo-hadoop/examples/treasury_yield/src/main/resources/yield_historical_in.json") + + s_client = s.connection() + s_client['admin'].command("enablesharding", "testdb") + s_client['admin'].command("shardCollection", "testdb.testcoll", key={"_id":1}) + sys.exit(0) +except: + sys.exit(1) +