Skip to content

Commit

Permalink
output management for emr
Browse files Browse the repository at this point in the history
  • Loading branch information
erh committed May 25, 2012
1 parent 99d6bfd commit 087253b
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 18 deletions.
32 changes: 32 additions & 0 deletions buildscripts/emr/IOUtil.java
Expand Up @@ -2,6 +2,7 @@

import java.io.*;
import java.net.*;
import java.util.*;

public class IOUtil {

Expand Down Expand Up @@ -68,6 +69,37 @@ public static String readStringFully( InputStream in )

}

public static Map<String,Object> readPythonSettings( File file )
throws IOException {

String all = readStringFully( new FileInputStream( file ) );

Map<String,Object> map = new TreeMap<String,Object>();

for ( String line : all.split( "\n" ) ) {
line = line.trim();
if ( line.length() == 0 )
continue;

String[] pcs = line.split( "=" );
if ( pcs.length != 2 )
continue;

String name = pcs[0].trim();
String value = pcs[1].trim();

if ( value.startsWith( "\"" ) ) {
map.put( name , value.substring( 1 , value.length() - 1 ) );
}
else {
map.put( name , Long.parseLong( value ) );
}

}

return map;
}

public static String[] runCommand( String cmd , File dir )
throws IOException {

Expand Down
78 changes: 70 additions & 8 deletions buildscripts/emr/emr.java
Expand Up @@ -23,7 +23,9 @@ void copy( MongoSuite c ) {
mongo = c.mongo;
code = c.code;
workingDir = c.workingDir;

suite = c.suite;

}

void downloadTo( File localDir )
Expand All @@ -40,10 +42,12 @@ boolean runTest()
dir.mkdirs();

// download
System.out.println( "going to download" );
downloadTo( dir );


// explode
System.out.println( "going to explode" );
IOUtil.runCommand( "tar zxvf " + IOUtil.urlFileName( code ) , dir );
String[] res = IOUtil.runCommand( "tar zxvf " + IOUtil.urlFileName( mongo ) , dir );
for ( String x : res[0].split( "\n" ) ) {
Expand All @@ -54,31 +58,69 @@ boolean runTest()
throw new RuntimeException( "rename failed" );
}

List<String> cmd = new ArrayList<String>();
cmd.add( "/usr/bin/python" );
cmd.add( "buildscripts/smoke.py" );

File log_config = new File( dir , "log_config.py" );
System.out.println( "log_config: " + log_config.exists() );
if ( log_config.exists() ) {

java.util.Map<String,Object> properties = IOUtil.readPythonSettings( log_config );

cmd.add( "--buildlogger-builder" );
cmd.add( properties.get( "name" ).toString() );

cmd.add( "--buildlogger-buildnum" );
cmd.add( properties.get( "number" ).toString() );

cmd.add( "--buildlogger-credentials" );
cmd.add( "log_config.py" );

cmd.add( "--buildlogger-phase" );
{
int idx = suite.lastIndexOf( "/" );
if ( idx < 0 )
cmd.add( suite );
else
cmd.add( suite.substring( 0 , idx ) );
}

}

cmd.add( suite );

System.out.println( cmd );

Process p = Runtime.getRuntime().exec( cmd.toArray( new String[cmd.size()] ) , new String[]{} , dir );

Process p = Runtime.getRuntime().exec( new String[]{ "/usr/bin/python" , "buildscripts/smoke.py" , suite } , new String[]{} , dir );
List<Thread> threads = new ArrayList<Thread>();
threads.add( new IOUtil.PipingThread( p.getInputStream() , System.out ) );
threads.add( new IOUtil.PipingThread( p.getErrorStream() , System.out ) );

for ( Thread t : threads ) t.start();

for ( Thread t : threads )
t.start();

try {
for ( Thread t : threads ) t.join();
for ( Thread t : threads ) {
t.join();
}
int rc = p.waitFor();
System.out.println( "\n\nResult: " + rc );

return rc == 0;
}
catch ( InterruptedException ie ) {
ie.printStackTrace();
throw new RuntimeException( "sad" , ie );
}

}

public void readFields( DataInput in )
throws IOException {
mongo = in.readUTF();
code = in.readUTF();
workingDir = in.readUTF();

suite = in.readUTF();
}

Expand All @@ -87,6 +129,7 @@ public void write( final DataOutput out )
out.writeUTF( mongo );
out.writeUTF( code );
out.writeUTF( workingDir );

out.writeUTF( suite );
}

Expand Down Expand Up @@ -119,6 +162,14 @@ public void map( Text key, MongoSuite value, OutputCollector<Text,IntWritable> o
ip = ip.substring( ip.indexOf( ":" ) + 1 ).trim();
output.collect( new Text( ip ) , new IntWritable(1) );
}
catch ( RuntimeException re ) {
re.printStackTrace();
throw re;
}
catch ( IOException ioe ) {
ioe.printStackTrace();
throw ioe;
}
finally {
lock.unlock();
}
Expand Down Expand Up @@ -254,6 +305,7 @@ public static void main( String[] args ) throws Exception{

String workingDir = "/data/db/emr/";


// parse args

int pos = 0;
Expand Down Expand Up @@ -291,6 +343,16 @@ public static void main( String[] args ) throws Exception{
System.out.println( "output\t: " + output );
System.out.println( "suites\t: " + suites );

if ( false ) {
MongoSuite s = new MongoSuite();
s.mongo = mongo;
s.code = code;
s.workingDir = workingDir;
s.suite = suites;
s.runTest();
return;
}

// main hadoop set
conf.set( "mongo" , mongo );
conf.set( "code" , code );
Expand Down

0 comments on commit 087253b

Please sign in to comment.