Skip to content

Commit

Permalink
#166: added testing for memory storage
Browse files Browse the repository at this point in the history
- need to keep commands as they are executing!
  • Loading branch information
ypujante committed Nov 7, 2012
1 parent 8c8f202 commit 5f2370f
Show file tree
Hide file tree
Showing 4 changed files with 672 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ public class MemoryCommandExecutionIOStorage extends AbstractCommandExecutionIOS
int maxNumberOfElements = 25

/**
* Keeps a maximum number of elements.
* Completed commands: keeps a maximum number of elements.
*/
Map<String, CommandExecution> commands =
Map<String, CommandExecution> completedCommands =
new EvictingWithLRUPolicyMap<String, CommandExecution>(maxNumberOfElements)

/**
* The commands that are currently executing */
Map<String, CommandExecution> executingCommands = [:]

/**
* For the compile to stop bugging me with commands being non final... */
private final Object _lock = new Object()
Expand All @@ -48,16 +52,20 @@ public class MemoryCommandExecutionIOStorage extends AbstractCommandExecutionIOS
void setMaxNumberOfElements(int maxNumberOfElements)
{
this.maxNumberOfElements = maxNumberOfElements
commands = new EvictingWithLRUPolicyMap<String, CommandExecution>(maxNumberOfElements)
completedCommands = new EvictingWithLRUPolicyMap<String, CommandExecution>(maxNumberOfElements)
}

@Override
CommandExecution findCommandExecution(String commandId)
{
CommandExecution commandExecution
synchronized(_lock)
{
return commands[commandId]
commandExecution = executingCommands[commandId]
if(!commandExecution)
commandExecution = completedCommands[commandId]
}
return commandExecution
}

@Override
Expand All @@ -66,7 +74,8 @@ public class MemoryCommandExecutionIOStorage extends AbstractCommandExecutionIOS
{
synchronized(_lock)
{
if(commands.containsKey(commandExecution.id))
if(completedCommands.containsKey(commandExecution.id) ||
executingCommands.containsKey(commandExecution.id))
throw new IllegalArgumentException("duplicate command id [${commandExecution.id}]")

def storage = new MemoryStreamStorage(commandExecution: commandExecution)
Expand All @@ -80,15 +89,30 @@ public class MemoryCommandExecutionIOStorage extends AbstractCommandExecutionIOS
new BufferedInputStream(stdin).withStream { storage.stdin << it }
}

commands[commandExecution.id] = commandExecution

return storage
}
}

def captureIO(CommandExecution commandExecution, Closure closure)
{
// nothing to do here really since we are not really storing it anywhere else...
return closure(commandExecution.storage)
def commandId = commandExecution.id

synchronized(_lock)
{
executingCommands[commandId] = commandExecution
}

try
{
return closure(commandExecution.storage)
}
finally
{
synchronized(_lock)
{
executingCommands.remove(commandId)
completedCommands[commandId] = commandExecution
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.linkedin.glu.commands.impl.StreamType
import org.linkedin.util.clock.Timespan
import org.linkedin.util.io.resource.Resource
import org.linkedin.glu.commands.impl.CommandExecution
import org.linkedin.glu.utils.io.DemultiplexedOutputStream
import org.linkedin.glu.utils.io.MultiplexedInputStream

/**
Expand Down Expand Up @@ -371,93 +370,6 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase
}
}

/**
* Test read while executing
*/
public void testReadWhileExecuting()
{
FileSystemCommandExecutionIOStorage ioStorage =
new FileSystemCommandExecutionIOStorage(clock: clock,
commandExecutionFileSystem: fs,
gluCommandFactory: factory)

def ce = ioStorage.createStorageForCommandExecution([command: 'c0',
xtra0: "x0",
stdin: new ByteArrayInputStream("in0".bytes)])

// commandId should start with the current time
def commandId = ce.id

assertTrue(commandId.startsWith("${Long.toHexString(clock.currentTimeMillis())}-"))

// check the file stored on the file system
def path = new SimpleDateFormat('yyyy/MM/dd/HH/z').format(clock.currentDate())
def commandResource = fs.toResource("${path}/${commandId}/${ioStorage.commandFileName}")
assertTrue(commandResource.exists())

long startTime = clock.currentTimeMillis()

assertTrue(fs.toResource("${path}/${commandId}/${ioStorage.stdinStreamFileName}").exists())
assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stdoutStreamFileName}").exists())
assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").exists())

def processing = { CommandStreamStorage storage ->
storage.withStorageInput(StreamType.STDIN) { stdin ->

storage.withStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->

stdout.write("o0".bytes)
stderr.write("e0".bytes)

// due to buffering, there should be anything written yet
assertEquals("", fs.toResource("${path}/${commandId}/${ioStorage.stdoutStreamFileName}").file.text)
assertEquals("", fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").file.text)

// but if we use the api it should flush the output
ioStorage.withOrWithoutCommandExecutionAndStreams(commandId,
[stdinStream: true,
stdoutStream: true,
stderrStream: true]) { m ->
def res =
MultiplexedInputStream.demultiplexToString(m.stream,
[StreamType.STDIN, StreamType.STDOUT, StreamType.STDERR].collect { it.multiplexName } as Set,
null)

assertEquals("in0", res[StreamType.STDIN.multiplexName])
assertEquals("o0", res[StreamType.STDOUT.multiplexName])
assertEquals("e0", res[StreamType.STDERR.multiplexName])
}

assertEquals("o0", fs.toResource("${path}/${commandId}/${ioStorage.stdoutStreamFileName}").file.text)
assertEquals("e0", fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").file.text)


stdout << "o1"
stderr << "e1"

assertEquals("in0", stdin.text)

// simulate delay
clock.addDuration(Timespan.parse("1s"))

return [exitValue: 14]
}
}
}
}

assertEquals(14, ce.syncCaptureIO(processing))

long completionTime = clock.currentTimeMillis()
assertEquals(completionTime, startTime + Timespan.parse("1s").durationInMilliseconds)

assertEquals("in0", fs.toResource("${path}/${commandId}/${ioStorage.stdinStreamFileName}").file.text)
assertEquals("o0o1", fs.toResource("${path}/${commandId}/${ioStorage.stdoutStreamFileName}").file.text)
assertEquals("e0e1", fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").file.text)
}

/**
* Test redirect stderr
*/
Expand Down Expand Up @@ -607,6 +519,93 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase
}
}

/**
* Test read while executing
*/
public void testReadWhileExecuting()
{
FileSystemCommandExecutionIOStorage ioStorage =
new FileSystemCommandExecutionIOStorage(clock: clock,
commandExecutionFileSystem: fs,
gluCommandFactory: factory)

def ce = ioStorage.createStorageForCommandExecution([command: 'c0',
xtra0: "x0",
stdin: new ByteArrayInputStream("in0".bytes)])

// commandId should start with the current time
def commandId = ce.id

assertTrue(commandId.startsWith("${Long.toHexString(clock.currentTimeMillis())}-"))

// check the file stored on the file system
def path = new SimpleDateFormat('yyyy/MM/dd/HH/z').format(clock.currentDate())
def commandResource = fs.toResource("${path}/${commandId}/${ioStorage.commandFileName}")
assertTrue(commandResource.exists())

long startTime = clock.currentTimeMillis()

assertTrue(fs.toResource("${path}/${commandId}/${ioStorage.stdinStreamFileName}").exists())
assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stdoutStreamFileName}").exists())
assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").exists())

def processing = { CommandStreamStorage storage ->
storage.withStorageInput(StreamType.STDIN) { stdin ->

storage.withStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->

stdout.write("o0".bytes)
stderr.write("e0".bytes)

// due to buffering, there should be anything written yet
assertEquals("", fs.toResource("${path}/${commandId}/${ioStorage.stdoutStreamFileName}").file.text)
assertEquals("", fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").file.text)

// but if we use the api it should flush the output
ioStorage.withOrWithoutCommandExecutionAndStreams(commandId,
[stdinStream: true,
stdoutStream: true,
stderrStream: true]) { m ->
def res =
MultiplexedInputStream.demultiplexToString(m.stream,
[StreamType.STDIN, StreamType.STDOUT, StreamType.STDERR].collect { it.multiplexName } as Set,
null)

assertEquals("in0", res[StreamType.STDIN.multiplexName])
assertEquals("o0", res[StreamType.STDOUT.multiplexName])
assertEquals("e0", res[StreamType.STDERR.multiplexName])
}

assertEquals("o0", fs.toResource("${path}/${commandId}/${ioStorage.stdoutStreamFileName}").file.text)
assertEquals("e0", fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").file.text)


stdout << "o1"
stderr << "e1"

assertEquals("in0", stdin.text)

// simulate delay
clock.addDuration(Timespan.parse("1s"))

return [exitValue: 14]
}
}
}
}

assertEquals(14, ce.syncCaptureIO(processing))

long completionTime = clock.currentTimeMillis()
assertEquals(completionTime, startTime + Timespan.parse("1s").durationInMilliseconds)

assertEquals("in0", fs.toResource("${path}/${commandId}/${ioStorage.stdinStreamFileName}").file.text)
assertEquals("o0o1", fs.toResource("${path}/${commandId}/${ioStorage.stdoutStreamFileName}").file.text)
assertEquals("e0e1", fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").file.text)
}

/**
* Convenient call to compare and ignore type
*/
Expand Down
Loading

0 comments on commit 5f2370f

Please sign in to comment.