diff --git a/src/main/com/mongodb/gridfs/GridFS.java b/src/main/com/mongodb/gridfs/GridFS.java index 5d70ec806a2..30ff6b23793 100644 --- a/src/main/com/mongodb/gridfs/GridFS.java +++ b/src/main/com/mongodb/gridfs/GridFS.java @@ -196,6 +196,37 @@ public GridFSInputFile createFile( InputStream in , String filename ){ return new GridFSInputFile( this , in , filename ); } + /** + * This method creates an empty {@link GridFSInputFile} instance. On this + * instance an {@link java.io.OutputStream} can be obtained using the + * {@link GridFSInputFile#getOutputStream()} method. You can still call + * {@link GridFSInputFile#setContentType(String)} and + * {@link GridFSInputFile#setFilename(String)}. The file will be completely + * written and closed after calling the {@link java.io.OutputStream#close()} + * method on the output stream. + * + * @param filename + * Name of the file to be created. + * @return GridFS file handle instance. + */ + public GridFSInputFile createFile(String filename) { + return new GridFSInputFile( this , filename ); + } + + /** + * This method creates an empty {@link GridFSInputFile} instance. On this + * instance an {@link java.io.OutputStream} can be obtained using the + * {@link GridFSInputFile#getOutputStream()} method. You can still call + * {@link GridFSInputFile#setContentType(String)} and + * {@link GridFSInputFile#setFilename(String)}. The file will be completely + * written and closed after calling the {@link java.io.OutputStream#close()} + * method on the output stream. + * + * @return GridFS file handle instance. + */ + public GridFSInputFile createFile() { + return new GridFSInputFile( this ); + } diff --git a/src/main/com/mongodb/gridfs/GridFSInputFile.java b/src/main/com/mongodb/gridfs/GridFSInputFile.java index 2ec42476b48..d2370185ac0 100644 --- a/src/main/com/mongodb/gridfs/GridFSInputFile.java +++ b/src/main/com/mongodb/gridfs/GridFSInputFile.java @@ -27,122 +27,350 @@ import com.mongodb.*; import com.mongodb.util.*; +/** + * Class implementation for writing data to GridFS. + * + * @author Eliot Horowitz and Guy K. Kloss + */ public class GridFSInputFile extends GridFSFile { - GridFSInputFile( GridFS fs , InputStream in , String filename ){ + /** + * Default constructor setting the GridFS file name and providing an input + * stream containing data to be written to the file. + * + * @param fs + * The GridFS connection handle. + * @param in + * Stream used for reading data from. + * @param filename + * Name of the file to be created. + */ + GridFSInputFile( GridFS fs , InputStream in , String filename ) { _fs = fs; _in = in; - _filename = filename; _id = new ObjectId(); _chunkSize = GridFS.DEFAULT_CHUNKSIZE; _uploadDate = new Date(); + _messageDigester = _md5Pool.get(); + _messageDigester.reset(); + _buffer = new byte[(int) _chunkSize]; } - - public DBObject getMetaData(){ - if ( _metadata == null ) + + /** + * Constructor that only provides a file name, but does not rely on the + * presence of an {@link java.io.InputStream}. An + * {@link java.io.OutputStream} can later be obtained for writing using the + * {@link #getOutputStream()} method. + * + * @param fs + * The GridFS connection handle. + * @param filename + * Name of the file to be created. + */ + GridFSInputFile( GridFS fs , String filename ) { + this( fs , null , filename ); + } + + /** + * Minimal constructor that does not rely on the presence of an + * {@link java.io.InputStream}. An {@link java.io.OutputStream} can later be + * obtained for writing using the {@link #getOutputStream()} method. + * + * @param fs + * The GridFS connection handle. + */ + GridFSInputFile( GridFS fs ) { + this( fs , null , null ); + } + + /** + * {@inheritDoc} + * + * @see com.mongodb.gridfs.GridFSFile#getMetaData() + */ + public DBObject getMetaData() { + if ( _metadata == null ) { _metadata = new BasicDBObject(); + } return _metadata; } - - public void setFilename( String fn ){ + + /** + * Sets the file name on the GridFS entry. + * + * @param fn + * File name. + */ + public void setFilename( String fn ) { _filename = fn; } - - public void setContentType( String ct ){ + + /** + * Sets the content type (MIME type) on the GridFS entry. + * + * @param ct + * Content type. + */ + public void setContentType( String ct ) { _contentType = ct; } + + /** + * {@inheritDoc} + * + * @see com.mongodb.gridfs.GridFSFile#save() + */ public void save() { - save(GridFS.DEFAULT_CHUNKSIZE); + save( GridFS.DEFAULT_CHUNKSIZE ); } + /** + * Saves the new GridFS entry with a non-default chunk size. + * + * @param chunkSize + * Size of chunks for file in bytes. + */ public void save( int chunkSize ) { - if ( ! _saved ){ + if ( ! _saved ) { try { - saveChunks(chunkSize); - } - catch ( IOException ioe ){ + saveChunks( chunkSize ); + } catch ( IOException ioe ) { throw new MongoException( "couldn't save chunks" , ioe ); } } - super.save(); + + if ( _outputStream == null ) { + _close(); + } } - - public int saveChunks() - throws IOException { - return saveChunks(GridFS.DEFAULT_CHUNKSIZE); + + /** + * Saves all data from configured {@link java.io.InputStream} input stream + * to GridFS. If an {@link java.io.OutputStream} has been obtained (with the + * {@link #getOutputStream()} method), the last, partial chunk is not + * written. It is written upon the call to + * {@link java.io.OutputStream#close()} on that stream. + * + * @return Number of the next chunk. + * @throws IOException + * on problems reading the new entry's + * {@link java.io.InputStream}. + */ + public int saveChunks() throws IOException { + return saveChunks( GridFS.DEFAULT_CHUNKSIZE ); } - public int saveChunks( int chunkSize ) - throws IOException { - if ( _saved ) + /** + * Saves all data from configured {@link java.io.InputStream} input stream + * to GridFS. For writing a non-default chunk size is used. If an + * {@link java.io.OutputStream} has been obtained (with the + * {@link #getOutputStream()} method), the last, partial chunk is not + * written. It is written upon the call to + * {@link java.io.OutputStream#close()} on that stream. + * + * @param chunkSize + * Size of chunks for file in bytes. + * @return Number of the next chunk. + * @throws IOException + * on problems reading the new entry's + * {@link java.io.InputStream}. + */ + public int saveChunks( int chunkSize ) throws IOException { + if ( _chunkSize != chunkSize ) { + _chunkSize = chunkSize; + _buffer = new byte[(int) _chunkSize]; + } + if ( _saved ) { throw new RuntimeException( "already saved!" ); + } - if ( chunkSize > 3.5 * 1000 * 1000 ) + if ( chunkSize > 3.5 * 1000 * 1000 ) { throw new RuntimeException( "chunkSize must be less than 3.5MiB!" ); + } - byte[] b = new byte[chunkSize]; - - long total = 0; - int cn = 0; + int bytesRead = 0; + while ( bytesRead >= 0 ) { + _currentBufferPosition = 0; + bytesRead = _readStream2Buffer(); + _dumpBuffer( _outputStream == null ); + } - MessageDigest md = _md5Pool.get(); - md.reset(); - DigestInputStream in = new DigestInputStream( _in , md ); + if ( _outputStream == null ) { + _close(); + } + return _currentChunkNumber; + } + + /** + * After retrieving this {@link java.io.OutputStream}, this object will be + * capable of accepting successively written data to the output stream. All + * operations proceed as usual, only the {@link #save()}, {@link #save(int)} + * , {@link #saveChunks()} and {@link #saveChunks(int)} methods will + * not finalise the and close writing the GridFS file. They will + * only read out the potentially used {@link java.io.InputStream} and + * flush it to the internal write buffer. To completely persist this GridFS + * object, you must finally call the {@link java.io.OutputStream#close()} + * method on the output stream. + * + * @return Writable stream object. + */ + public OutputStream getOutputStream() { + if ( _outputStream == null ) { + _outputStream = new MyOutputStream(); + } + return _outputStream; + } + + /** + * Dumps a new chunk into the chunks collection. Depending on the flag, also + * partial buffers (at the end) are going to be written immediately. + * + * @param data + * Data for chunk. + * @param writePartial + * Write also partial buffers full. + */ + private void _dumpBuffer( boolean writePartial ) { + if ( ( _currentBufferPosition < _chunkSize ) && !writePartial ) { + // Bail out, nothing to write (yet). + return; + } + byte[] writeBuffer = _buffer; + if ( _currentBufferPosition != _chunkSize ) { + writeBuffer = new byte[_currentBufferPosition]; + System.arraycopy( _buffer, 0, writeBuffer, 0, _currentBufferPosition ); + } - while ( true ){ - int start =0; - - while ( start < b.length ){ - int r = in.read( b , start , b.length - start ); - if ( r == 0 ) - throw new RuntimeException( "i'm doing something wrong" ); - if ( r < 0 ) - break; - start += r; + DBObject chunk = BasicDBObjectBuilder.start() + .add( "files_id", _id ) + .add( "n", _currentChunkNumber ) + .add( "data", writeBuffer ).get(); + _fs._chunkCollection.save( chunk ); + _currentChunkNumber++; + _totalBytes += writeBuffer.length; + _messageDigester.update( writeBuffer ); + _currentBufferPosition = 0; + } + + /** + * Reads a buffer full from the {@link java.io.InputStream}. + * + * @return Number of bytes read from stream. + * @throws IOException + * if the reading from the stream fails. + */ + private int _readStream2Buffer() throws IOException { + int bytesRead = 0; + while ( _currentBufferPosition < _chunkSize && bytesRead >= 0 ) { + bytesRead = _in.read( _buffer, _currentBufferPosition, + (int) _chunkSize - _currentBufferPosition ); + if ( bytesRead > 0 ) { + _currentBufferPosition += bytesRead; + } else if ( bytesRead == 0 ) { + throw new RuntimeException( "i'm doing something wrong" ); } - - total += start; - - byte[] mine = b; - - if ( start != b.length ){ - mine = new byte[start]; - System.arraycopy( b , 0 , mine , 0 , start ); + } + return bytesRead; + } + + /** + * Persist the GridFS object by writing finally also the object in the file + * collection. Calls the super class save() method. + */ + private void _close() { + if (!_saved) { + _md5 = Util.toHex( _messageDigester.digest() ); + _md5Pool.done( _messageDigester ); + _length = _totalBytes; + _saved = true; + } + super.save(); + } + + private final InputStream _in; + private boolean _saved = false; + private byte[] _buffer = null; + private int _currentChunkNumber = 0; + private int _currentBufferPosition = 0; + private long _totalBytes = 0; + private MessageDigest _messageDigester = null; + private OutputStream _outputStream = null; + + /** + * A pool of {@link java.security.MessageDigest} objects. + */ + static SimplePool _md5Pool + = new SimplePool( "md5" , 10 , -1 , false , false ) { + /** + * {@inheritDoc} + * + * @see com.mongodb.util.SimplePool#createNew() + */ + protected MessageDigest createNew() { + try { + return MessageDigest.getInstance( "MD5" ); + } catch ( java.security.NoSuchAlgorithmException e ) { + throw new RuntimeException( "your system doesn't have md5!" ); } - - DBObject chunk = BasicDBObjectBuilder.start() - .add( "files_id" , _id ) - .add( "n" , cn++ ) - .add( "data" , mine ) - .get(); - - _fs._chunkCollection.save( chunk ); - - if ( start < b.length ) - break; } + }; + + /** + * An output stream implementation that can be used to successively write to + * a GridFS file. + * + * @author Guy K. Kloss + */ + class MyOutputStream extends OutputStream { - _md5 = Util.toHex( md.digest() ); - _md5Pool.done( md ); + /** + * {@inheritDoc} + * + * @see java.io.OutputStream#write(int) + */ + @Override + public void write( int b ) throws IOException { + byte[] byteArray = new byte[1]; + byteArray[0] = (byte) (b & 0xff); + write( byteArray, 0, 1 ); + } - _length = total; - _saved = true; - return cn; - } - - final InputStream _in; - boolean _saved = false; - - static SimplePool _md5Pool = new SimplePool( "md5" , 10 , -1 , false , false ){ - protected MessageDigest createNew(){ - try { - return MessageDigest.getInstance("MD5"); + /** + * {@inheritDoc} + * + * @see java.io.OutputStream#write(byte[], int, int) + */ + @Override + public void write( byte[] b , int off , int len ) throws IOException { + int offset = off; + int length = len; + int toCopy = 0; + while ( length > 0 ) { + toCopy = length; + if ( toCopy > _chunkSize - _currentBufferPosition ) { + toCopy = (int) _chunkSize - _currentBufferPosition; } - catch ( java.security.NoSuchAlgorithmException e ){ - throw new RuntimeException( "your system doesn't have md5!" ); + System.arraycopy( b, offset, _buffer, _currentBufferPosition, toCopy ); + _currentBufferPosition += toCopy; + offset += toCopy; + length -= toCopy; + if ( _currentBufferPosition == _chunkSize ) { + _dumpBuffer( false ); } } - }; + } + + /** + * Processes/saves all data from {@link java.io.InputStream} and closes + * the potentially present {@link java.io.OutputStream}. The GridFS file + * will be persisted afterwards. + */ + @Override + public void close() { + _dumpBuffer( true ); + _close(); + } + } } diff --git a/src/test/com/mongodb/gridfs/GridFSTest.java b/src/test/com/mongodb/gridfs/GridFSTest.java index 7ce668d74ae..fd738575b7d 100644 --- a/src/test/com/mongodb/gridfs/GridFSTest.java +++ b/src/test/com/mongodb/gridfs/GridFSTest.java @@ -29,9 +29,9 @@ public GridFSTest() throws IOException , MongoException { super(); try { - cleanupMongo = new Mongo( "127.0.0.1" ); - cleanupDB = "com_mongodb_unittest_GridFSTest"; - _db = cleanupMongo.getDB( cleanupDB ); + cleanupMongo = new Mongo( "127.0.0.1" ); + cleanupDB = "com_mongodb_unittest_GridFSTest"; + _db = cleanupMongo.getDB( cleanupDB ); _fs = new GridFS( _db ); } catch ( MongoException e ){ @@ -98,6 +98,45 @@ public void testBig() testInOut( s ); } + void testOutStream( String s ) throws Exception { + + int[] start = _get(); + + GridFSInputFile in = _fs.createFile(); + OutputStream writeStream = in.getOutputStream(); + writeStream.write( s.getBytes(), 0, s.length() ); + writeStream.close(); + GridFSDBFile out = _fs.findOne( new BasicDBObject( "_id" , in.getId() ) ); + assert ( out.getId().equals( in.getId() ) ); + assert ( out.getChunkSize() == (long) GridFS.DEFAULT_CHUNKSIZE ); + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + out.writeTo( bout ); + String outString = new String( bout.toByteArray() ); + assert (outString.equals( s )); + + out.remove(); + int[] end = _get(); + assertEquals( start[0], end[0] ); + assertEquals( start[1], end[1] ); + } + + @Test(groups = { "basic" }) + public void testOutStreamSmall() throws Exception { + testOutStream( "this is a simple test" ); + } + + @Test(groups = { "basic" }) + public void testOutStreamBig() throws Exception { + int target = (int) (GridFS.DEFAULT_CHUNKSIZE * 3.5); + StringBuilder buf = new StringBuilder( target ); + while ( buf.length() < target ) { + buf.append( "asdasdkjasldkjasldjlasjdlajsdljasldjlasjdlkasjdlaskjdlaskjdlsakjdlaskjdasldjsad" ); + } + String s = buf.toString(); + testOutStream( s ); + } + @Test(groups = {"basic"}) public void testMetadata() throws Exception {