diff --git a/src/main/java/picoded/dstack/FileWorkspace.java b/src/main/java/picoded/dstack/FileWorkspace.java index d72af525..d3b8b8c6 100755 --- a/src/main/java/picoded/dstack/FileWorkspace.java +++ b/src/main/java/picoded/dstack/FileWorkspace.java @@ -4,11 +4,16 @@ import picoded.core.conv.StringConv; import java.io.File; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.List; import java.util.Set; +import org.apache.commons.io.IOUtils; + /** * Represent a file storage backend for a workspace * @@ -133,16 +138,46 @@ default void writeString(final String filepath, String content, String encoding) //-------------------------------------------------------------------------- /** - * Get the input stream representation of a given filepath + * Get the input stream representation of a given filepath. + * + * You are expected to close, the stream on your own, to avoid memory leaks * * @param filePath in the workspace to extract - * @return the file contents, null if file does not exists + * @return the file contents as an input stream, null if file does not exists */ default InputStream readInputStream(final String filePath) { byte[] byteArr = readByteArray(filePath); return new ByteArrayInputStream(byteArr); } + /** + * Reads an input stream, and writes it to a fil, creating the file if it does not exist. + * the parent directories of the file will be created if they do not exist. + * + * Note that depending on the implementaiton, this may not be optimized, + * and may only return after the OutputStream is fully processedd. + * + * @param filepath in the workspace to extract + * @param data the content to write to the file + **/ + default void writeInputStream(final String filepath, final InputStream data) { + // Converts it to bytearray respectively + byte[] rawBytes = null; + try { + rawBytes = IOUtils.toByteArray(data); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + data.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + // Does the bytearray writes + writeByteArray(filepath, rawBytes); + } + // // Folder Pathing support //-------------------------------------------------------------------------- diff --git a/src/main/java/picoded/dstack/KeyLongMap.java b/src/main/java/picoded/dstack/KeyLongMap.java index 7ea870c8..3303f47a 100755 --- a/src/main/java/picoded/dstack/KeyLongMap.java +++ b/src/main/java/picoded/dstack/KeyLongMap.java @@ -180,39 +180,10 @@ default Long removeValue(Object key) { **/ default Long addAndGet(Object key, Object delta) { // - // NOTE : The default implmentation of addAndGet, - // or getAndAdd relies on repetaed tries using - // weakCompareAndSet, while functional. - // Is highly inefficent in most cases + // We simply use get and add, with the delta, + // this reduce the amount of permutation needed to support // - - // Validate and convert the key to String - if (key == null) { - throw new IllegalArgumentException("key cannot be null in addAndGet"); - } - String keyAsString = key.toString(); - - // Attempt to update the key for 5 times before throwing exception - for (int tries = 0; tries < 5; tries++) { - // Retrieve value from key - Long value = getValue(keyAsString); - - // Assume value as 0 if not exist - if (value == null) { - value = new Long(0); - } - - // Calculate the updated value - Long updatedValue = GenericConvert.toLong(delta) + value; - - // Update the value with weakCompareAndSet and return - if (weakCompareAndSet(keyAsString, value, updatedValue)) { - return updatedValue; - } - } - - // Throw exception due to number of retries exceeded the limit - throw new RuntimeException("Number of retries exceeded limit for addAndGet"); + return getAndAdd(key, delta)+GenericConvert.toLong(delta); } /** @@ -233,7 +204,7 @@ default Long getAndAdd(Object key, Object delta) { // Validate and convert the key to String if (key == null) { - throw new IllegalArgumentException("key cannot be null in addAndGet"); + throw new IllegalArgumentException("key cannot be null in"); } String keyAsString = key.toString(); @@ -257,7 +228,7 @@ default Long getAndAdd(Object key, Object delta) { } // Throw exception due to number of retries exceeded the limit - throw new RuntimeException("Number of retries exceeded limit for addAndGet"); + throw new RuntimeException("Number of retries exceeded limit"); } /** diff --git a/src/main/java/picoded/dstack/core/Core_FileWorkspace.java b/src/main/java/picoded/dstack/core/Core_FileWorkspace.java index 95ac8832..b59a757c 100755 --- a/src/main/java/picoded/dstack/core/Core_FileWorkspace.java +++ b/src/main/java/picoded/dstack/core/Core_FileWorkspace.java @@ -2,6 +2,8 @@ // Java imports import java.io.File; +import java.io.InputStream; +import java.io.OutputStream; import java.util.*; // Picoded imports @@ -35,6 +37,12 @@ public class Core_FileWorkspace implements FileWorkspace { **/ protected String _oid = null; + /** + * Boolean flag, if true, indicates that the current FileWorkspace is uninitialized + * if so, we will setup the workspace if needed. + */ + protected boolean _isUninitialized = false; + // Constructor //---------------------------------------------- @@ -60,6 +68,7 @@ public Core_FileWorkspace(Core_FileWorkspaceMap inMain, String inOID) { // Issue a GUID if (_oid == null) { _oid = GUID.base58(); + _isUninitialized = true; } if (_oid.length() < 4) { @@ -94,6 +103,16 @@ public String _oid() { @Override public void setupWorkspace() { main.setupWorkspace(_oid()); + _isUninitialized = false; + } + + /** + * Calls setupWorkspace if _isUninitialized is true + */ + protected void setupUninitializedWorkspace() { + if( _isUninitialized ) { + setupWorkspace(); + } } // File / Folder string normalization @@ -103,38 +122,16 @@ public void setupWorkspace() { * @param filePath * @return filePath normalized to remove ending "/" */ - private static String normalizeFilePathString(final String filePath) { - if (filePath == null) { - throw new IllegalArgumentException("Invalid null filePath"); - } - - String res = FileUtil.normalize(filePath, true); - if (res.startsWith("/")) { - res = res.substring(1); - } - if (res.endsWith("/")) { - res = res.substring(0, res.length() - 1); - } - return res; + protected static String normalizeFilePathString(final String filePath) { + return Core_FileWorkspaceMap.normalizeFilePathString(filePath); } /** * @param folderPath * @return folderPath normalized with ending "/" */ - private static String normalizeFolderPathString(final String folderPath) { - if (folderPath == null || folderPath.length() <= 0) { - return "/"; - } - - String res = FileUtil.normalize(folderPath, true); - if (res.startsWith("/")) { - res = res.substring(1); - } - if (!res.endsWith("/")) { - res = res + "/"; - } - return res; + protected static String normalizeFolderPathString(final String folderPath) { + return Core_FileWorkspaceMap.normalizeFolderPathString(folderPath); } // File exists checks @@ -148,6 +145,9 @@ private static String normalizeFolderPathString(final String folderPath) { * @return true, if file exists (and writable), false if it does not. Possible a folder */ public boolean fileExist(final String filepath) { + if( _isUninitialized ) { + return false; + } return main.backend_fileExist(_oid, normalizeFilePathString(filepath)); } @@ -157,9 +157,42 @@ public boolean fileExist(final String filepath) { * @param filepath in the workspace to delete */ public void removeFile(final String filepath) { + if( _isUninitialized ) { + return; + } main.backend_removeFile(_oid, normalizeFilePathString(filepath)); } + // Read/write input/output stream + //-------------------------------------------------------------------------- + + /** + * Reads the contents of a file into a byte array. + * + * @param filepath in the workspace to extract + * + * @return the file contents, null if file does not exists + */ + public InputStream readInputStream(final String filepath) { + if( _isUninitialized ) { + return null; + } + return main.backend_fileReadInputStream(_oid, normalizeFilePathString(filepath)); + } + + /** + * Writes an output array to a file creating the file if it does not exist. + * + * the parent directories of the file will be created if they do not exist. + * + * @param filepath in the workspace to extract + * @param data the content to write to the file + **/ + public void writeInputStream(final String filepath, final InputStream data) { + setupUninitializedWorkspace(); + main.backend_fileWriteInputStream(_oid, normalizeFilePathString(filepath), data); + } + // Read / write byteArray information //-------------------------------------------------------------------------- @@ -171,6 +204,9 @@ public void removeFile(final String filepath) { * @return the file contents, null if file does not exists */ public byte[] readByteArray(final String filepath) { + if( _isUninitialized ) { + return null; + } return main.backend_fileRead(_oid, normalizeFilePathString(filepath)); } @@ -183,6 +219,7 @@ public byte[] readByteArray(final String filepath) { * @param data the content to write to the file **/ public void writeByteArray(final String filepath, final byte[] data) { + setupUninitializedWorkspace(); main.backend_fileWrite(_oid, normalizeFilePathString(filepath), data); } @@ -190,26 +227,14 @@ public void writeByteArray(final String filepath, final byte[] data) { * Appends a byte array to a file creating the file if it does not exist. * * NOTE that by default this DOES NOT perform any file locks. As such, - * if used in a concurrent access situation. Segmentys may get out of sync. + * if used in a concurrent access situation. Segments may get out of sync. * * @param file the file to write to * @param data the content to write to the file **/ public void appendByteArray(final String filepath, final byte[] data) { - // Normalize the file path - String path = normalizeFilePathString(filepath); - - // Get existing data - byte[] read = readByteArray(path); - if (read == null) { - writeByteArray(path, data); - } - - // Append new data to existing data - byte[] jointData = ArrayConv.addAll(read, data); - - // Write the new joint data - writeByteArray(path, jointData); + setupUninitializedWorkspace(); + main.backend_fileAppendByteArray(_oid, normalizeFilePathString(filepath), data); } // Folder Pathing support @@ -222,6 +247,9 @@ public void appendByteArray(final String filepath, final byte[] data) { * @param folderPath in the workspace (note, folderPath is normalized to end with "/") */ public void removeFolderPath(final String folderPath) { + if( _isUninitialized ) { + return; + } main.backend_removeFolderPath(_oid, normalizeFolderPathString(folderPath)); } @@ -232,6 +260,9 @@ public void removeFolderPath(final String folderPath) { * @return true if folderPath is valid */ public boolean folderPathExist(final String folderPath) { + if( _isUninitialized ) { + return false; + } return main.backend_folderPathExist(_oid, normalizeFolderPathString(folderPath)); } @@ -241,6 +272,7 @@ public boolean folderPathExist(final String folderPath) { * @param folderPath in the workspace (note, folderPath is normalized to end with "/") */ public void ensureFolderPath(final String folderPath) { + setupUninitializedWorkspace(); main.backend_ensureFolderPath(_oid, normalizeFolderPathString(folderPath)); } @@ -262,6 +294,9 @@ public void ensureFolderPath(final String folderPath) { * @return DataObject created timestamp in ms */ public long createdTimestamp(final String filepath) { + if( _isUninitialized ) { + return -1; + } return main.backend_createdTimestamp(_oid, normalizeFilePathString(filepath)); } @@ -274,6 +309,9 @@ public long createdTimestamp(final String filepath) { * @return DataObject created timestamp in ms */ public long modifiedTimestamp(final String filepath) { + if( _isUninitialized ) { + return -1; + } return main.backend_modifiedTimestamp(_oid, normalizeFilePathString(filepath)); } @@ -296,6 +334,9 @@ public long modifiedTimestamp(final String filepath) { * @param destinationFile */ public void moveFile(final String sourceFile, final String destinationFile) { + if( _isUninitialized ) { + return; + } main.backend_moveFile(_oid, normalizeFilePathString(sourceFile), normalizeFilePathString(destinationFile)); } @@ -317,6 +358,9 @@ public void moveFile(final String sourceFile, final String destinationFile) { * @param destinationFolder */ public void moveFolderPath(final String sourceFolder, final String destinationFolder) { + if( _isUninitialized ) { + return; + } main.backend_moveFolderPath(_oid, normalizeFolderPathString(sourceFolder), normalizeFolderPathString(destinationFolder)); } @@ -340,6 +384,9 @@ public void moveFolderPath(final String sourceFolder, final String destinationFo * @param destinationFile */ public void copyFile(final String sourceFile, final String destinationFile) { + if( _isUninitialized ) { + return; + } main.backend_copyFile(_oid, normalizeFilePathString(sourceFile), normalizeFilePathString(destinationFile)); } @@ -361,6 +408,9 @@ public void copyFile(final String sourceFile, final String destinationFile) { * @param destinationFolder */ public void copyFolderPath(final String sourceFolder, final String destinationFolder) { + if( _isUninitialized ) { + return; + } main.backend_copyFolderPath(_oid, normalizeFolderPathString(sourceFolder), normalizeFolderPathString(destinationFolder)); } @@ -378,6 +428,9 @@ public void copyFolderPath(final String sourceFolder, final String destinationFo */ public Set getFileAndFolderPathSet(final String folderPath, final int minDepth, final int maxDepth) { + if( _isUninitialized ) { + return new HashSet<>(); + } return main.backend_getFileAndFolderPathSet(_oid, normalizeFolderPathString(folderPath), minDepth, maxDepth); } @@ -391,6 +444,9 @@ public Set getFileAndFolderPathSet(final String folderPath, final int mi * @return list of path strings - relative to the given folderPath */ public Set getFilePathSet(final String folderPath, final int minDepth, final int maxDepth) { + if( _isUninitialized ) { + return new HashSet<>(); + } return main.backend_getFilePathSet(_oid, normalizeFolderPathString(folderPath), minDepth, maxDepth); } @@ -405,6 +461,9 @@ public Set getFilePathSet(final String folderPath, final int minDepth, f */ public Set getFolderPathSet(final String folderPath, final int minDepth, final int maxDepth) { + if( _isUninitialized ) { + return new HashSet<>(); + } return main.backend_getFolderPathSet(_oid, normalizeFolderPathString(folderPath), minDepth, maxDepth); } diff --git a/src/main/java/picoded/dstack/core/Core_FileWorkspaceMap.java b/src/main/java/picoded/dstack/core/Core_FileWorkspaceMap.java index 28bfb886..2d28e38a 100755 --- a/src/main/java/picoded/dstack/core/Core_FileWorkspaceMap.java +++ b/src/main/java/picoded/dstack/core/Core_FileWorkspaceMap.java @@ -1,14 +1,24 @@ package picoded.dstack.core; +import picoded.core.conv.ArrayConv; +import picoded.core.file.FileUtil; + // Java imports // Picoded imports import picoded.dstack.*; import java.util.HashSet; -import java.util.List; import java.util.Set; +import org.apache.commons.io.IOUtils; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + /** * Common base utility class of FileWorkspaceMap * @@ -95,6 +105,54 @@ public void setupWorkspace(String oid) { */ abstract public void backend_setupWorkspace(String oid); + //-------------------------------------------------------------------------- + // File / Folder string normalization + //-------------------------------------------------------------------------- + + /** + * @param filePath + * @return filePath normalized to remove ending "/" + */ + protected static String normalizeFilePathString(final String filePath) { + if (filePath == null) { + throw new IllegalArgumentException("Invalid null filePath"); + } + + String res = FileUtil.normalize(filePath, true); + if (res.startsWith("/")) { + res = res.substring(1); + } + if (res.endsWith("/")) { + res = res.substring(0, res.length() - 1); + } + + // Block empty filepath + if( res.isEmpty() ) { + throw new RuntimeException("Empty file path is not allowed"); + } + + return res; + } + + /** + * @param folderPath + * @return folderPath normalized with ending "/" + */ + protected static String normalizeFolderPathString(final String folderPath) { + if (folderPath == null || folderPath.length() <= 0) { + return "/"; + } + + String res = FileUtil.normalize(folderPath, true); + if (res.startsWith("/")) { + res = res.substring(1); + } + if (!res.endsWith("/")) { + res = res + "/"; + } + return res; + } + //-------------------------------------------------------------------------- // // Functions, used by FileWorkspace @@ -160,6 +218,100 @@ public void setupWorkspace(String oid) { **/ abstract public void backend_fileWrite(final String oid, final String filepath, final byte[] data); + // File read and write using byte stream + //-------------------------------------------------------------------------- + + /** + * [Internal use, to be extended in future implementation] + * + * Get and return the stored data as a byte stream. + * + * This overwrite is useful for backends which supports this flow. + * Else it would simply be a wrapper over the non-stream version. + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * + * @return the stored byte stream of the file + **/ + public InputStream backend_fileReadInputStream(final String oid, final String filepath) { + // Get the byte data + byte[] rawBytes = backend_fileRead(oid, filepath); + if (rawBytes == null) { + return null; + } + return new ByteArrayInputStream(rawBytes); + } + + /** + * [Internal use, to be extended in future implementation] + * + * Writes the full byte array of a file in the backend + * + * This overwrite is useful for backends which supports this flow. + * Else it would simply be a wrapper over the non-stream version. + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * @param data to write the file with + **/ + public void backend_fileWriteInputStream(final String oid, final String filepath, + final InputStream data) { + + // forward the null, and let the error handling below settle it + if (data == null) { + backend_fileWrite(oid, filepath, null); + } + + // Converts it to bytearray respectively + byte[] rawBytes = null; + try { + rawBytes = IOUtils.toByteArray(data); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + data.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Does the bytearray writes + backend_fileWrite(oid, filepath, rawBytes); + } + + /** + * [Internal use, to be extended in future implementation] + * + * Writes the full byte array of a file in the backend + * + * This overwrite is useful for backends which supports this flow. + * Else it would simply be a wrapper over the non-stream version. + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * @param data to write the file with + **/ + public void backend_fileAppendByteArray(final String oid, final String filepath, + final byte[] data) { + + // Get the existing byte array + byte[] read = backend_fileRead(oid, filepath); + + // Just write it as it is (read is null) + if (read == null) { + backend_fileWrite(oid, filepath, data); + return; + } + + // Append new data to existing data + byte[] jointData = ArrayConv.addAll(read, data); + + // Write the new joint data + backend_fileWrite(oid, filepath, jointData); + } + // Folder Pathing support //-------------------------------------------------------------------------- @@ -229,7 +381,8 @@ public void backend_ensureFolderPath(final String oid, final String folderPath) */ public void backend_moveFile(final String oid, final String sourceFile, final String destinationFile) { - throw new RuntimeException("Missing backend implementation"); + backend_copyFile(oid, sourceFile, destinationFile); + backend_removeFile(oid, sourceFile); } /** @@ -254,7 +407,23 @@ public void backend_moveFile(final String oid, final String sourceFile, */ public void backend_moveFolderPath(final String oid, final String sourceFolder, final String destinationFolder) { - throw new RuntimeException("Missing backend implementation"); + // Get the list of valid sub paths in the sourceFolder + Set subPath = backend_getFileAndFolderPathSet(oid, sourceFolder, -1, -1); + + // Lets sync up all the folders first + for(String dir : subPath) { + if(dir.endsWith("/")) { + backend_ensureFolderPath(oid, destinationFolder+dir); + } + } + // Lets sync up all the files next + for(String file : subPath) { + if(!file.endsWith("/")) { + backend_copyFile(oid, sourceFolder+file, destinationFolder+file); + } + } + // Lets remove the original folders + backend_removeFolderPath(oid, sourceFolder); } // Copy support @@ -280,7 +449,7 @@ public void backend_moveFolderPath(final String oid, final String sourceFolder, */ public void backend_copyFile(final String oid, final String sourceFile, final String destinationFile) { - throw new RuntimeException("Missing backend implementation"); + backend_fileWriteInputStream(oid, destinationFile, backend_fileReadInputStream(oid, sourceFile)); } /** @@ -305,7 +474,21 @@ public void backend_copyFile(final String oid, final String sourceFile, */ public void backend_copyFolderPath(final String oid, final String sourceFolder, final String destinationFolder) { - throw new RuntimeException("Missing backend implementation"); + // Get the list of valid sub paths in the sourceFolder + Set subPath = backend_getFileAndFolderPathSet(oid, sourceFolder, -1, -1); + + // Lets sync up all the folders first + for(String dir : subPath) { + if(dir.endsWith("/")) { + backend_ensureFolderPath(oid, destinationFolder+dir); + } + } + // Lets sync up all the files next + for(String file : subPath) { + if(file.endsWith("/") == false) { + backend_copyFile(oid, sourceFolder+file, destinationFolder+file); + } + } } // @@ -351,20 +534,21 @@ public long backend_modifiedTimestamp(final String oid, final String filepath) { //-------------------------------------------------------------------------- /** - * Internal utility function used to filter a path set, and remove items that does not match + * Internal utility function used to filter a path set, and remove items that does not match. + * This is used to help filter raw results, from existing implementation * * - its folderPath prefix * - min/max depth * - any / file / folder * - * @param rawSet - * @param folderPath - * @param minDepth + * @param rawSet (note this expect the full RAW paths, without removing the folderPath prefix) + * @param folderPath the folder path prefix to search and match against, and truncate + * @param minDepth (0 = all items, 1 = must be in atleast a folder, 2 = folder, inside a folder) * @param maxDepth * @param pathType (0 = any, 1 = file, 2 = folder) * @return */ - protected Set backend_filtterPathSet(final Set rawSet, final String folderPath, + protected Set backend_filterPathSet(final Set rawSet, final String folderPath, final int minDepth, final int maxDepth, final int pathType) { // Normalize the folder path @@ -416,12 +600,21 @@ protected Set backend_filtterPathSet(final Set rawSet, final Str } // Alrighto - lets check file / folder type - and add it in + + // Ignore empty, or root path + if(subPath.isEmpty() || subPath.equals("/")) { + continue; + } + + // Expect a folder, reject files if (pathType == 1) { if (subPath.endsWith("/")) { // Not a file - abort! continue; } } + + // Expect files, reject folders if (pathType == 2) { if (!subPath.endsWith("/")) { // Not a folder - abort! diff --git a/src/main/java/picoded/dstack/file/simple/FileSimple_FileWorkspaceMap.java b/src/main/java/picoded/dstack/file/simple/FileSimple_FileWorkspaceMap.java index d6d52409..58d7af31 100755 --- a/src/main/java/picoded/dstack/file/simple/FileSimple_FileWorkspaceMap.java +++ b/src/main/java/picoded/dstack/file/simple/FileSimple_FileWorkspaceMap.java @@ -6,16 +6,9 @@ import java.io.File; import java.io.IOException; import java.nio.file.attribute.BasicFileAttributes; -import java.nio.file.FileVisitOption; import java.nio.file.Files; -import java.nio.file.Path; import java.util.HashSet; -import java.util.List; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import javax.management.RuntimeErrorException; /** * Reference class for Core_FileWorkspaceMap @@ -631,7 +624,7 @@ public Set backend_getFileAndFolderPathSet(final String oid, final Strin recusively_populatePathSet(retSet, folderObj, "", maxDepth); // Return with minDepth filtering - return backend_filtterPathSet(retSet, "", minDepth, maxDepth, 0); + return backend_filterPathSet(retSet, "", minDepth, maxDepth, 0); } } diff --git a/src/main/java/picoded/dstack/jsql/JSql_FileWorkspaceMap.java b/src/main/java/picoded/dstack/jsql/JSql_FileWorkspaceMap.java index 34a96c8b..c59d76d6 100755 --- a/src/main/java/picoded/dstack/jsql/JSql_FileWorkspaceMap.java +++ b/src/main/java/picoded/dstack/jsql/JSql_FileWorkspaceMap.java @@ -6,18 +6,8 @@ import picoded.dstack.connector.jsql.JSql; import picoded.dstack.connector.jsql.JSqlResult; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import javax.management.RuntimeErrorException; - -import org.apache.commons.io.FileUtils; public class JSql_FileWorkspaceMap extends Core_FileWorkspaceMap { @@ -664,7 +654,7 @@ public Set backend_getFileAndFolderPathSet(final String oid, final Strin } // Filter and return it accordingly - return backend_filtterPathSet(rawSet, folderPath, minDepth, maxDepth, 0); + return backend_filterPathSet(rawSet, folderPath, minDepth, maxDepth, 0); } } diff --git a/src/main/java/picoded/dstack/mongodb/MongoDBStack.java b/src/main/java/picoded/dstack/mongodb/MongoDBStack.java index e7dddda2..6e835143 100644 --- a/src/main/java/picoded/dstack/mongodb/MongoDBStack.java +++ b/src/main/java/picoded/dstack/mongodb/MongoDBStack.java @@ -126,6 +126,12 @@ protected Core_DataStructure initDataStructure(String name, String type) { Core_DataStructure ret = null; if (type.equalsIgnoreCase("DataObjectMap")) { ret = new MongoDB_DataObjectMap(this, name); + } else if (type.equalsIgnoreCase("KeyValueMap")) { + ret = new MongoDB_KeyValueMap(this, name); + } else if (type.equalsIgnoreCase("KeyLongMap")) { + ret = new MongoDB_KeyLongMap(this, name); + } else if (type.equalsIgnoreCase("FileWorkspaceMap")) { + ret = new MongoDB_FileWorkspaceMap(this, name); } // If datastrucutre initialized, setup name diff --git a/src/main/java/picoded/dstack/mongodb/MongoDB_DataObjectMap.java b/src/main/java/picoded/dstack/mongodb/MongoDB_DataObjectMap.java index b08fbe0a..27d2a8ee 100644 --- a/src/main/java/picoded/dstack/mongodb/MongoDB_DataObjectMap.java +++ b/src/main/java/picoded/dstack/mongodb/MongoDB_DataObjectMap.java @@ -40,10 +40,10 @@ * ## Purpose * Support MongoDB implementation of DataObjectMap data structure. * - * Built ontop of the Core_DataObjectMap_struct implementation. + * Built ontop of the Core_DataObjectMap implementation. * * ## Dev Notes - * Developers of this class would need to reference the following + * Developers of this class would need to reference the following in MongoDB * * - Collection API : https://mongodb.github.io/mongo-java-driver/4.6/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCollection.html * - Filter API: https://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/Filters.html#where-java.lang.String- @@ -57,7 +57,6 @@ public class MongoDB_DataObjectMap extends Core_DataObjectMap { //-------------------------------------------------------------------------- /** MongoDB instance representing the backend connection */ - MongoDBStack hazelcastStack = null; MongoCollection collection = null; /** @@ -91,13 +90,20 @@ public void systemSetup() { IndexOptions opt = new IndexOptions(); opt = opt.unique(true); opt = opt.name("_oid"); - opt = opt.background(true); + + // Due to the need for _oid to ensure consistency, we would not be creating it in the background + // opt = opt.background(true); + + // Lets create the index collection.createIndex(Indexes.ascending("_oid"), opt); + // // Wildcard indexing // // This helps improve general performance for arbitary data - // at a huge cost of write performance + // at a huge cost of write performance. Useful for general purpose DataObjectMap + // but not useful when fine hand-tunning is requried for some use cases + // if (configMap.getBoolean("setupWildcardIndex", true)) { opt = new IndexOptions(); opt = opt.name("wildcard"); diff --git a/src/main/java/picoded/dstack/mongodb/MongoDB_FileWorkspaceMap.java b/src/main/java/picoded/dstack/mongodb/MongoDB_FileWorkspaceMap.java new file mode 100755 index 00000000..7019f068 --- /dev/null +++ b/src/main/java/picoded/dstack/mongodb/MongoDB_FileWorkspaceMap.java @@ -0,0 +1,879 @@ +package picoded.dstack.mongodb; + +// Java imports +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Date; + +// JavaCommons imports +import picoded.core.common.EmptyArray; +import picoded.core.file.FileUtil; +import picoded.dstack.FileWorkspace; +import picoded.dstack.core.Core_FileWorkspaceMap; + +import org.apache.commons.io.IOUtils; +// MongoDB imports +import org.bson.Document; +import org.bson.types.Binary; +import org.bson.types.ObjectId; +import org.bson.conversions.Bson; +import com.mongodb.client.*; +import com.mongodb.client.gridfs.*; +import com.mongodb.client.gridfs.GridFSBuckets; +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.client.gridfs.model.GridFSUploadOptions; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.IndexOptions; +import com.mongodb.client.model.Indexes; + +/** + * ## Purpose + * Support MongoDB implementation of FileWorkspaceMap + * + * Built ontop of the Core_FileWorkspaceMap implementation. + * + * ## Dev Notes + * Developers of this class would need to reference the following in MongoDB + * + * - GridFS : https://www.mongodb.com/docs/drivers/java/sync/current/fundamentals/gridfs/ + * - API: https://mongodb.github.io/mongo-java-driver/4.7/apidocs/mongodb-driver-sync/com/mongodb/client/gridfs/GridFSBucket.html + **/ +public class MongoDB_FileWorkspaceMap extends Core_FileWorkspaceMap { + + // -------------------------------------------------------------------------- + // + // Constructor + // + // -------------------------------------------------------------------------- + + /** MongoDB instance representing gridFS */ + GridFSBucket gridFSBucket = null; + + /** MongoDB instance representing the files and chunks collection (internal to the gridFSBucket) */ + MongoCollection filesCollection = null; + MongoCollection chunksCollection = null; + + /** + * Constructor, with name constructor + * + * @param inStack hazelcast stack to use + * @param name of data object map to use + */ + public MongoDB_FileWorkspaceMap(MongoDBStack inStack, String name) { + super(); + + // Initialize the gridfs bucket, + // with the relevent DB, name, and config + gridFSBucket = GridFSBuckets.create(inStack.db_conn, name) // + .withChunkSizeBytes(8 * 1000 * 1000); + + // + // Note that we intentionally chose 8*1000*1000 chunk sizes + // As this will give about 1-4kb space for chunk headers to + // help ensure overall efficent chunk storage usage. + // + // This is due to the underlying storage rounding up to power + // of 2 : https://jira.mongodb.org/browse/SERVER-13331 + // + // Meaning a full "8 * 1000 * 1000" chunk would use "8 * 1024 * 1024" + // worth of space, after adding the unknown headers (<=4kb of space : 8*24*24) + // + + filesCollection = inStack.db_conn.getCollection(name+".files"); + chunksCollection = inStack.db_conn.getCollection(name+".chunks"); + } + + //-------------------------------------------------------------------------- + // + // Backend system setup / teardown (DStackCommon) + // + //-------------------------------------------------------------------------- + + /** + * Setsup the backend storage table, etc. If needed + **/ + @Override + public void systemSetup() { + + // We insert a "root" object, to ensure the tables are initialized + // --- + if(!fullRawPathExist("root")) { + setupAnchorFile_withFullRawPath("root", "root", "root"); + } + + // Lets setup the index for the metadata fields (which is not enabled by default) + // --- + + // Lets create the index for the oid + IndexOptions opt = new IndexOptions(); + opt = opt.name("metadata.oid"); + filesCollection.createIndex(Indexes.ascending("oid"), opt); + + } + + /** + * Teardown and delete the backend storage table, etc. If needed + **/ + public void systemDestroy() { + gridFSBucket.drop(); + } + + /** + * Removes all data, without tearing down setup + **/ + @Override + public void clear() { + gridFSBucket.drop(); + } + + //-------------------------------------------------------------------------- + // + // Workspace setup / exist funcitons + // [Internal use, to be extended in future implementation] + // + //-------------------------------------------------------------------------- + + /** + * [Internal use, to be extended in future implementation] + * + * Checks and return of a workspace exists + * + * @param Object ID of workspace to get + * + * @return boolean to check if workspace exists + **/ + @Override + public boolean backend_workspaceExist(String oid) { + // The folder root, will only contain the "oid" + return fullRawPathExist(oid); + } + + /** + * Setup the current fileWorkspace within the fileWorkspaceMap, + * + * This ensures the workspace oid is registered within the map, + * even if there is 0 files. + * + * Does not throw any error if workspace was previously setup + */ + @Override + public void backend_setupWorkspace(String oid) { + // We setup a blank file with type root + if(!fullRawPathExist(oid)) { + setupAnchorFile_withFullRawPath(oid, oid, "space"); + } + } + + /** + * [Internal use, to be extended in future implementation] + * + * Removes the FileWorkspace, used to nuke an entire workspace + * + * @param ObjectID of workspace to remove + **/ + @Override + public void backend_workspaceRemove(String oid) { + removeFilePathRecursively(oid, null); + } + + //-------------------------------------------------------------------------- + // + // Utility functions + // + //-------------------------------------------------------------------------- + + /** Utility function used, to check if a workspace, or file exists **/ + protected boolean fullRawPathExist(String fullpath) { + // Lets build the query for the "root file" + Bson query = Filters.eq("filename", fullpath); + + // Lets prepare the search + GridFSFindIterable search = gridFSBucket.find(query).limit(1); + + // Lets iterate the search result, and return true on an item + try (MongoCursor cursor = search.iterator()) { + if (cursor.hasNext()) { + // ret.add(cursor.next().getString("oid")); + return true; + } + } + + // Fail, as the search found no iterations + return false; + } + + /** Utility function used, to check if a folder, or file with folder prefix exists **/ + protected boolean prefixPathExist(String oid, String path) { + // Lets build the query for the "root file" + Bson query = null; + + // Get the full prefixpath + String fullPrefixPath = oid+"/"+path; + + // Remove matching path + query = Filters.or( + Filters.eq("filename", fullPrefixPath), + Filters.and( + Filters.eq("metadata.oid", oid), + Filters.regex("filename", "^"+Pattern.quote(fullPrefixPath)+".*") + ) + ); + + // Lets prepare the search + GridFSFindIterable search = gridFSBucket.find(query).limit(1); + + // Lets iterate the search result, and return true on an item + try (MongoCursor cursor = search.iterator()) { + if (cursor.hasNext()) { + return true; + } + } + + // No match found, fail + return false; + } + + /** + * Setup an empty file, used for various use cases + * The extended funciton name is intentional to avoid confusion of "full path" with "path" + */ + public void setupAnchorFile_withFullRawPath(String oid, String fullPath, String type) { + // In general we will upload a blank file + // with the relevent oid, that can be easily lookedup + // + // This is done using a closable input stream, with an empty byte array + try (ByteArrayInputStream emptyStream = new ByteArrayInputStream(EmptyArray.BYTE)) { + // Setup the metadata for the file + Document metadata = new Document(); + metadata.append("oid", oid); + metadata.append("type", type); + + // Prepare the upload options + GridFSUploadOptions opt = (new GridFSUploadOptions()).metadata(metadata); + ObjectId objID = gridFSBucket.uploadFromStream(fullPath, emptyStream, opt); + + // Flush it? + objID.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Utility function used, to recursively delete all files within a specific path + **/ + protected void removeFilePathRecursively(String oid, String path) { + // Lets build the query for the "root file" + Bson query = null; + + if( path == null || path.equals("/") || path.isEmpty() ) { + // Remove everything under the oid + query = Filters.eq("metadata.oid", oid); + } else { + // Remove matching path + query = Filters.and( + Filters.eq("metadata.oid", oid), + Filters.regex("filename", "^"+Pattern.quote(oid+"/"+path)+".*") + ); + } + + // Lets prepare the search + GridFSFindIterable search = gridFSBucket.find(query); + + // Lets iterate the search result, and return true on an item + try (MongoCursor cursor = search.iterator()) { + while (cursor.hasNext()) { + GridFSFile fileObj = cursor.next(); + gridFSBucket.delete(fileObj.getId()); + } + } + } + + /** + * Utility function used, to remove a specific file + **/ + protected boolean removeFilePath(String oid, String path) { + // Lets build the query for the "root file" + Bson query = null; + + // Remove matching path + query = Filters.eq("filename", oid+"/"+path); + + // Lets prepare the search (removes all versions) + GridFSFindIterable search = gridFSBucket.find(query); + + // Lets iterate the search result, and return true on an item + boolean rmFlag = false; + try (MongoCursor cursor = search.iterator()) { + while (cursor.hasNext()) { + GridFSFile fileObj = cursor.next(); + gridFSBucket.delete(fileObj.getId()); + rmFlag = true; + } + } + + // Return the remove status + return rmFlag; + } + + /** + * Given the current path, enforce the parent pathing dir + * Used mainly to ensure "parent" folder exists on file write/rm + **/ + protected void ensureParentPath(String oid, String path) { + // Does nothing if path is empty + if( path == null || path.equals("/") || path.isEmpty() ) { + return; + } + + // Cleanup ending slash + if( path.endsWith("/") ) { + path = path.substring(0, path.length() - 1); + } + + // Get the parent path + String parPath = normalizeFolderPathString( FileUtil.getParentPath(path) ); + + // Does nothing if folder path is "blank" + if( parPath == null || parPath.equals("/") || parPath.isEmpty() ) { + return; + } + + // Path enforcement + backend_ensureFolderPath(oid, parPath); + } + + /** + * Because mongoDB does file versioining on each save, we would need to cleanup + * older file versions where applicable, in a safe way + * + * In general, due to the difficulty of possible race conditions that may occur + * when removing an "old version" immediately, that could be "read" mid-way. + * + * First we scan for the list of all the file versions. + * + * We find the latest that is at-least 10 seconds old (what we consider a safe window) + * and delete all version before it. + * + * If after the above, we found that there are still "10 versions", as there were + * 10 writes in the past 10 seconds. We force a thread.sleep in increments of 1 second, + * and remove any versions that matches the above criteria. Up to a full 10 seconds of delay. + * + * This will forcefully throttle down any write heavy flows, to avoid contentions. + * + * This safety measure is used in addition, to the checks performed on file write + */ + protected void performVersionedFileCleanup(String oid, String path) { + + // Lets get the list of files and their respective versions + // We query the file table directly, to reduce the required + // back and forth queries + + // Get the full filename + String filename = oid+"/"+path; + + // Get the current timestamp + long now = System.currentTimeMillis(); + long tenSecondsAgo = now - (10 * 1000); + + // Lets fetch the full list in descending date order + FindIterable search = filesCollection.find( Filters.eq("filename", filename) ); + search = search.sort( (new Document()).append("uploadDate", -1) ); + + // Lets remap from cursor to list + List searchList = new ArrayList<>(); + try (MongoCursor cursor = search.iterator()) { + while (cursor.hasNext()) { + searchList.add(cursor.next()); + } + } + + // Safe anchor point, all items after this is "safe to be deleted" + // if this is detected properly (do not delete the safeAnchorPoint file itself) + int safeAnchorPoint = -1; + + // Lets find the document thats atleast 10 seconds old + for( int i=1; i= 1 ) { + // Lets loop through all items after the safeAnchorPoint + while( searchList.size() > (safeAnchorPoint + 1) ) { + // Get and remove the last item + Document doc = searchList.remove( searchList.size() - 1 ); + ObjectId objID = doc.getObjectId("_id"); + + // Lets remove the file (and its chunks) + try { + gridFSBucket.delete(objID); + } catch(Exception e) { + // do nothing, as there could be a race condition delete + // (2 delete by seperate write commands happenign together) + } + } + } + + // If the list is less then 10, lets return + if( searchList.size() <= 10 ) { + return; + } + + // We have more then 10 files, that is less then 10 seconds old + // Lets do a forced 10 seconds halt, so we can forcefully clear the files + try { + Thread.sleep(10 * 1000); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } + + // And clear the various outdated files + // after the latest, and its immediate previous version + while( searchList.size() > 2 ) { + // Get and remove the last item + Document doc = searchList.remove( searchList.size() - 1 ); + ObjectId objID = doc.getObjectId("_id"); + + // Lets remove the file (and its chunks) + try { + gridFSBucket.delete(objID); + } catch(Exception e) { + // do nothing, as there could be a race condition delete + // (2 delete by seperate write commands happenign together) + } + } + + } + + //-------------------------------------------------------------------------- + // + // File write + // [Internal use, to be extended in future implementation] + // + //-------------------------------------------------------------------------- + + /** + * [Internal use, to be extended in future implementation] + * + * Writes the full byte array of a file in the backend + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * @param data to write the file with + **/ + @Override + public void backend_fileWrite(String oid, String filepath, byte[] data) { + + // Build the full path + String fullPath = oid + "/" + filepath; + + // + // Due to the rather huge penalty of writing files, without actual content changes, + // and the performance implications of a high number of back to back file changes. + // + // We will employ the following throttling safeguards + // + // 1) Throttling file writes, when the existing file is less then 2 seconds old + // 2) Check against the current values, and skip the write if they match. + // + // This prevents the creation of a "new version" unless its needed. And slow down + // any flooding of back to back file writes. + // + + // 1) Lets check the previous write timing, and throttle it if needed + // --- + + // Lets get the time "NOW" + long now = System.currentTimeMillis(); + + // Lets build the query for the file involved + Bson query = Filters.eq("filename", fullPath); + + // Read timestamp, and objectid + ObjectId readObjId = null; + long readUploadTimestamp = -1; + + // Lets iterate the search result, and return true on an item + try (MongoCursor cursor = gridFSBucket.find(query).limit(1).iterator()) { + if (cursor.hasNext()) { + GridFSFile fileObj = cursor.next(); + readUploadTimestamp = fileObj.getUploadDate().getTime(); + readObjId = fileObj.getObjectId(); + } + } + + // Check if the current file is less then 2 seconds old + // If so, we induce a wait for it to occur (if file exists) + if( readObjId != null && readUploadTimestamp + 2000 >= now ) { + try { + Thread.sleep( Math.min( Math.max( readUploadTimestamp + 2000 - now, 500), 2000 ) ); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } + + // And get the latest objectID again (in case of any changes) + try (MongoCursor cursor = gridFSBucket.find(query).limit(1).iterator()) { + if (cursor.hasNext()) { + GridFSFile fileObj = cursor.next(); + readUploadTimestamp = fileObj.getUploadDate().getTime(); + readObjId = fileObj.getObjectId(); + } + } + } + + // 2) Lets check against current value + // --- + + // Handle null byte[] + if( data == null ) { + data = EmptyArray.BYTE; + } + + // Lets map the current value to an inputstream, in closable blocks + // We intentionally use inputstream, to avoid needing 2 byte[] blocks in memory + // (if file exists) + if( readObjId == null ) { + // does nothing if the object does not exists + } else { + try (ByteArrayInputStream inBuffer = new ByteArrayInputStream(data) ) { + try(InputStream existingValue = gridFSBucket.openDownloadStream(readObjId)) { + if(IOUtils.contentEquals(inBuffer, existingValue)) { + // They are the same, skip the write + return; + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Finally, lets write the update + // --- + + try (ByteArrayInputStream inBuffer = new ByteArrayInputStream(data) ) { + // Setup the metadata for the file + Document metadata = new Document(); + metadata.append("oid", oid); + metadata.append("type", "file"); + + // Prepare the upload options + GridFSUploadOptions opt = (new GridFSUploadOptions()).metadata(metadata); + ObjectId objID = gridFSBucket.uploadFromStream(fullPath, inBuffer, opt); + objID.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Perform post file write cleanup (if there was a previous version) + if( readObjId != null ) { + performVersionedFileCleanup(oid, filepath); + } + } + + /** + * [Internal use, to be extended in future implementation] + * + * Writes the full byte array of a file in the backend + * + * This overwrite is useful for backends which supports this flow. + * Else it would simply be a wrapper over the non-stream version. + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * @param data to write the file with + **/ + @Override + public void backend_fileWriteInputStream(final String oid, final String filepath, InputStream data) { + // Converts it to bytearray respectively + byte[] rawBytes = null; + try { + rawBytes = IOUtils.toByteArray(data); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + data.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + // Does the bytearray writes + backend_fileWrite(oid, filepath, rawBytes); + } + + //-------------------------------------------------------------------------- + // + // File read / exists + // [Internal use, to be extended in future implementation] + // + //-------------------------------------------------------------------------- + + /** + * [Internal use, to be extended in future implementation] + * + * Get and return the stored data as a byte[] + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * + * @return the stored byte array of the file + **/ + @Override + public byte[] backend_fileRead(String oid, String filepath) { + InputStream buffer = backend_fileReadInputStream(oid, filepath); + byte[] ret = null; + try { + ret = IOUtils.toByteArray(buffer); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + buffer.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return ret; + } + + /** + * [Internal use, to be extended in future implementation] + * + * Get and return the stored data as a byte stream. + * + * This overwrite is useful for backends which supports this flow. + * Else it would simply be a wrapper over the non-stream version. + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * + * @return the stored byte array of the file + **/ + public InputStream backend_fileReadInputStream(String oid, String filepath) { + return gridFSBucket.openDownloadStream(oid + "/" + filepath); + } + + @Override + public boolean backend_fileExist(String oid, String filepath) { + // Check against the full file path + return fullRawPathExist(oid + "/" + filepath); + } + + @Override + public void backend_removeFile(String oid, String filepath) { + ensureParentPath(oid, filepath); + removeFilePath(oid, filepath); + } + + // Folder Pathing support + //-------------------------------------------------------------------------- + + /** + * [Internal use, to be extended in future implementation] + * + * Delete an existing path from the workspace. + * This recursively removes all file content under the given path prefix + * + * @param ObjectID of workspace + * @param folderPath in the workspace (note, folderPath is normalized to end with "/") + * + * @return the stored byte array of the file + **/ + public void backend_removeFolderPath(final String oid, final String folderPath) { + ensureParentPath(oid, folderPath); + removeFilePathRecursively(oid, folderPath); + } + + /** + * [Internal use, to be extended in future implementation] + * + * Validate the given folder path exists. + * + * @param ObjectID of workspace + * @param folderPath in the workspace (note, folderPath is normalized to end with "/") + * + * @return the stored byte array of the file + **/ + public boolean backend_folderPathExist(final String oid, final String folderPath) { + // Note that this passes if any of the files were created directly without folders + return prefixPathExist(oid, folderPath); + } + + /** + * [Internal use, to be extended in future implementation] + * + * Automatically generate a given folder path if it does not exist + * + * @param ObjectID of workspace + * @param folderPath in the workspace (note, folderPath is normalized to end with "/") + * + * @return the stored byte array of the file + **/ + public void backend_ensureFolderPath(final String oid, final String folderPath) { + // We setup a blank file with type root, this checks only for the anchor file + // if it does not exists, we will make it + if(fullRawPathExist(oid+"/"+folderPath) == false) { + setupAnchorFile_withFullRawPath(oid, oid+"/"+folderPath, "dir"); + } + } + + //-------------------------------------------------------------------------- + // + // Create and updated timestamp support + // + // Note that this feature does not have "normalized" support across + // backend implementation, and is provided "as-it-is" for applicable + // backend implementations. + // + //-------------------------------------------------------------------------- + + /** + * [Internal use, to be extended in future implementation] + + * The created timestamp of the map in ms, + * note that -1 means the current backend does not support this feature + * + * @param ObjectID of workspace + * @param filepath in the workspace to check + * + * @return DataObject created timestamp in ms + */ + public long backend_createdTimestamp(final String oid, final String filepath) { + + // Currently only modified timestamp is supported + return backend_modifiedTimestamp(oid, filepath); + } + + /** + * [Internal use, to be extended in future implementation] + + * The modified timestamp of the map in ms, + * note that -1 means the current backend does not support this feature + * + * @param ObjectID of workspace + * @param filepath in the workspace to check + * + * @return DataObject created timestamp in ms + */ + public long backend_modifiedTimestamp(final String oid, final String filepath) { + // Lets build the query for the "root file" + Bson query = Filters.eq("filename", filepath); + + // Lets prepare the search + GridFSFindIterable search = gridFSBucket.find(query).limit(1); + + // Lets iterate the search result, and return true on an item + try (MongoCursor cursor = search.iterator()) { + if (cursor.hasNext()) { + GridFSFile fileObj = cursor.next(); + return fileObj.getUploadDate().getTime(); + } + } + + // Fail, as the search found no iterations + return -1; + } + + //-------------------------------------------------------------------------- + // + // Query, and listing support + // + //-------------------------------------------------------------------------- + + /** + * List all the various files and folders found in the given folderPath + * + * @param ObjectID of workspace + * @param folderPath in the workspace (note, folderPath is normalized to end with "/") + * @param minDepth minimum depth count, before outputing the listing (uses a <= match) + * @param maxDepth maximum depth count, to stop the listing (-1 for infinite, uses a >= match) + * + * @return list of path strings - relative to the given folderPath (folders end with "/") + */ + @Override + public Set backend_getFileAndFolderPathSet(final String oid, String folderPath, + final int minDepth, final int maxDepth) { + + // Lets build the query for the "root file" + Bson query = null; + + // The fulle prefix path + String fullPrefixPath = oid+"/"; + + if( folderPath == null || folderPath.equals("/") || folderPath.isEmpty() ) { + // Query everything (using only the oid) + query = Filters.eq("metadata.oid", oid); + } else { + // Query using oid and the path + fullPrefixPath = fullPrefixPath+folderPath; + + // Filter for matching path + query = Filters.and( + Filters.eq("metadata.oid", oid), + Filters.regex("filename", "^"+Pattern.quote(fullPrefixPath)+".*") + ); + } + + // The return set + Set ret = new HashSet<>(); + + // Lets prepare the search + GridFSFindIterable search = gridFSBucket.find(query); + + // Lets iterate the search result, and return true on an item + try (MongoCursor cursor = search.iterator()) { + while (cursor.hasNext()) { + // Get the fileobj and filename + GridFSFile fileObj = cursor.next(); + String fullFilename = fileObj.getFilename(); + + // Skip the oid anchor + if( fullFilename.equals(oid) ) { + continue; + } + + // Remove the oid prefix + String filepath = fullFilename.substring( oid.length()+1 ); + + // Register the validpath + ret.add(filepath); + + // Prepare a clean path without ending slash + String cleanPath = filepath; + if( cleanPath.endsWith("/") ) { + cleanPath = cleanPath.substring(0, cleanPath.length()-1); + } + + // Lets split the filepath + String[] cleanPathArr = cleanPath.split("/"); + List cleanPathList = Arrays.asList(cleanPathArr); + + // Lets handle parent folders, note that i collection = null; + + /** + * Constructor, with name constructor + * + * @param inStack hazelcast stack to use + * @param name of data object map to use + */ + public MongoDB_KeyLongMap(MongoDBStack inStack, String name) { + super(); + collection = inStack.db_conn.getCollection(name); + } + + @Override + public void systemSetup() { + // + // By mongodb default we use its native _id implementation + // and handle our _oid seperately. + // + // We intentionally DO NOT use mongodb _id, allowing it retain optimal performance. + // + + // Lets create the unique key index + IndexOptions opt = new IndexOptions().unique(true).name("key"); + collection.createIndex(Indexes.ascending("key"), opt); + + // Expirary key support + opt = new IndexOptions().expireAfter(0L, TimeUnit.SECONDS); + collection.createIndex(Indexes.ascending("expireAt"), opt); + } + + /** + * Teardown and delete the backend storage table, etc. If needed + **/ + public void systemDestroy() { + collection.drop(); + } + + /** + * Removes all data, without tearing down setup + **/ + @Override + public void clear() { + // Delete all items + // + // Due to the lack of an all * wildcard + // we are using a exists OR condition, which is true + // for all objects + collection.deleteMany( // + Filters.or( // + Filters.exists("key", true), // + Filters.exists("key", false) // + ) // + ); // + } + + //-------------------------------------------------------------------------- + // + // Internal functions, used by DataObject + // + //-------------------------------------------------------------------------- + + /** + * Generate a BSON filter set, for unexpired items + * this should be used in combination with an AND clause filter + **/ + protected Bson filterForUnexpired(Date now) { + // the or array to join + return Filters.or( // + Filters.exists("expireAt", false), // + Filters.gt("expireAt", now), // + Filters.lte("expireAt", 0) // + ); // + } + + /** + * Search using the value, all the relevent key mappings + * + * Handles re-entrant lock where applicable + * + * @param key, note that null matches ALL + * + * @return array of keys + **/ + @Override + public Set keySet(Long value) { + // The return hashset + HashSet ret = new HashSet(); + + // Search result + FindIterable search = null; + + // Lets either fetch with a value, or everything + if (value == null) { + // Lets fetch everything ... D= + search = collection.find(filterForUnexpired(new Date())); + } else { + search = collection.find(Filters.and( // + filterForUnexpired(new Date()), // + Filters.eq("val", value) // + )); // + } + + // Get all the various keys + search = search.projection(Projections.include("key")); + + // Lets iterate the search + try (MongoCursor cursor = search.iterator()) { + while (cursor.hasNext()) { + ret.add(cursor.next().getString("key")); + } + } + + // Return the full keyset + return ret; + } + + //-------------------------------------------------------------------------- + // + // Fundemental set/get value (core) + // + //-------------------------------------------------------------------------- + + /** + * [Internal use, to be extended in future implementation] + * Sets the value, with validation + * + * @param key + * @param value, null means removal + * @param expire timestamp, 0 means not timestamp + * + * @return null + **/ + @Override + public Long setValueRaw(String key, Long value, long expireAt) { + // Configure this to be an "upsert" query + FindOneAndUpdateOptions opt = new FindOneAndUpdateOptions(); + opt.upsert(true); + + // Generate the document of changes + // See: https://www.mongodb.com/docs/manual/reference/operator/update/setOnInsert/ + + // Generate the "update" doc + Document updateDoc = new Document(); + Document set_doc = new Document(); + + // Expire timestamp if its configured, else it should be removed + if (expireAt > 0) { + set_doc.append("expireAt", new Date(expireAt)); + } else { + Document unset_doc = new Document(); + unset_doc.append("expireAt", ""); + updateDoc.append("$unset", unset_doc); + } + + // Setup the value on update/insert/upsert + set_doc.append("val", value); + updateDoc.append("$set", set_doc); + + // Set the key on insert + Document setOnInsert_doc = new Document(); + setOnInsert_doc.append("key", key); + updateDoc.append("$setOnInsert", setOnInsert_doc); + + // Upsert the document + collection.findOneAndUpdate(Filters.eq("key", key), updateDoc, opt); + return null; + } + + /** + * [Internal use, to be extended in future implementation] + * + * Returns the value and expiry, with validation against the current timestamp + * + * @param key as String + * @param now timestamp + * + * @return String value + **/ + @Override + public MutablePair getValueExpiryRaw(String key, long now) { + // Get the find result + FindIterable res = collection.find(Filters.eq("key", key)); + + // Get the Document object + Document resObj = res.first(); + if (resObj == null) { + return null; + } + + // Lets get all the key values + Long val = resObj.getLong("val"); + Date expireAt_date = resObj.getDate("expireAt"); + long expireAt_long = 0; + + // Check if expireAt date is set + if (expireAt_date != null) { + expireAt_long = expireAt_date.getTime(); + } + + // Check for null objects + if (val == null) { + return null; + } + + // No valid value found, return null + if (expireAt_long < 0) { + return null; + } + + // Expired value, return null + if (expireAt_long != 0 && expireAt_long < now) { + return null; + } + + // Get the value, and return the pair + return new MutablePair(val, expireAt_long); + } + + /** + * [Internal use, to be extended in future implementation] + * Sets the expire time stamp value, raw without validation + * + * @param key as String + * @param expireAt timestamp in seconds, 0 means NO expire + **/ + @Override + public void setExpiryRaw(String key, long expireAt) { + // Configure this to be an "update" query + FindOneAndUpdateOptions opt = new FindOneAndUpdateOptions(); + + // Generate the document of changes + // See: https://www.mongodb.com/docs/manual/reference/operator/update/setOnInsert/ + + // Generate the "update" doc + Document updateDoc = new Document(); + + // Expire timestamp if its configured, else it should be ignored + if (expireAt > 0) { + Document set_doc = new Document(); + set_doc.append("expireAt", new Date(expireAt)); + updateDoc.append("$set", set_doc); + } else { + Document unset_doc = new Document(); + unset_doc.append("expireAt", ""); + updateDoc.append("$unset", unset_doc); + } + + // Upsert the document + collection.findOneAndUpdate(Filters.eq("key", key), updateDoc, opt); + } + + //-------------------------------------------------------------------------- + // + // Incremental operations + // + //-------------------------------------------------------------------------- + + /** + * Stores (and overwrites if needed) key, value pair + * + * Important note: It does not return the previously stored value + * + * @param key as String + * @param expect as Long + * @param update as Long + * + * @return true if successful + **/ + public boolean weakCompareAndSet(String key, Long expect, Long update) { + // now timestamp + Date now = new Date(); + + // Configure this to be an "update" query + FindOneAndUpdateOptions opt = new FindOneAndUpdateOptions(); + + // Lets generate the mongodb "update" document rule + Document updateDoc = new Document(); + + // Disable expire timestamp when using weakCompareAndSet + Document unset_doc = new Document(); + unset_doc.append("expireAt", ""); + updateDoc.append("$unset", unset_doc); + + // Setup the value on update/insert/upsert + Document set_doc = new Document(); + set_doc.append("val", update); + updateDoc.append("$set", set_doc); + + // + // In general there are the following compare and set scenerios to handle + // + // 1) expecting value is 0 + // a) existing record is expired + // b) existing record does not exist + // c) existing record is NOT expired, and is 0 + // 2) expecting value is non-zero + // a) existing record is NOT expired, and is expected value. + // + + // Potentially a new upsert, ensure there is something to "update" atleast + // initializing an empty row if it does not exist + if (expect == null || expect == 0l) { + // Expect is now atleast 0 + expect = 0l; + } + + // + // We update any existing values + // this handle scenerio 1a, 1c & 2a + // + // We can do this safely here, as mongodb handles the expire + // natively, so we do not need to worry about race conditions. + // + + // + // Upsert the document + // + Object ret = collection.findOneAndUpdate(Filters.and(Filters.eq("key", key), // + Filters.or( // + // Handles an expired record + Filters.and(Filters.gt("expireAt", new Date(0l)), Filters.lt("expireAt", now)), + // Handles an non-expired record + Filters.and(Filters.eq("val", expect), filterForUnexpired(now)))), updateDoc, opt); + + // Return true on succesful update + if (ret != null) { + return true; + } + + // + // We insert a record if possible, this handle sceneric 1b + // + if (expect == 0l) { + try { + InsertOneResult res = collection.insertOne( // + new Document().append("key", key).append("val", update) // + ); + + if (res.wasAcknowledged()) { + return true; + } + } catch (Exception e) { + // This is probably due to a conflicting index + return false; + } + } + + // All Failed + return false; + } + + //-------------------------------------------------------------------------- + // + // @TODO : Optimized getAndAdd + // + //-------------------------------------------------------------------------- + + // /** + // * Returns the value, given the key + // * + // * @param key param find the meta key + // * @param delta value to add + // * + // * @return value of the given key after adding + // **/ + // public Long getAndAdd(Object key, Object delta) { + + // // This is the more optimized varient for weakCompareAndSet + // // --- + + // // Validate and convert the key to String + // if (key == null) { + // throw new IllegalArgumentException("key cannot be null in"); + // } + // String keyAsString = key.toString(); + + // // Normalize the delta to a long + // Long deltaLong = GenericConvert.toLong(delta); + + // // Configure this to be an "upsert" query + // FindOneAndUpdateOptions opt = new FindOneAndUpdateOptions(); + // opt.upsert(true); + + // // !! THE FOLLOWING BELOW IS INCOMPLETE CODE (AND IS BROKEN) + // // --- + + // // now timestamp + // Date now = new Date(); + + // // Lets generate the mongodb "update" document rule + // Document updateDoc = new Document(); + + // // Disable expire timestamp when using + // // weakCompareAndSet/getAndAdd/addAndGet + // Document unset_doc = new Document(); + // unset_doc.append("expireAt", ""); + // updateDoc.append("$unset", unset_doc); + + // // Setup the value on update/insert/upsert + // Document inc_doc = new Document(); + // inc_doc.append("val", deltaLong); + // updateDoc.append("$set", set_doc); + + // // + // // In general there are the following compare and set scenerios to handle + // // + // // 1) expecting value is 0 + // // a) existing record is expired + // // b) existing record does not exist + // // c) existing record is NOT expired, and is 0 + // // 2) expecting value is non-zero + // // a) existing record is NOT expired, and is expected value. + // // + + // // Potentially a new upsert, ensure there is something to "update" atleast + // // initializing an empty row if it does not exist + // if (expect == null || expect == 0l) { + // // Expect is now atleast 0 + // expect = 0l; + // } + + // // + // // We update any existing values + // // this handle scenerio 1a, 1c & 2a + // // + // // We can do this safely here, as mongodb handles the expire + // // natively, so we do not need to worry about race conditions. + // // + + // // + // // Upsert the document + // // + // Object ret = collection.findOneAndUpdate(Filters.and(Filters.eq("key", key), // + // Filters.or( // + // // Handles an expired record + // Filters.and(Filters.gt("expireAt", new Date(0l)), Filters.lt("expireAt", now)), + // // Handles an non-expired record + // Filters.and(Filters.eq("val", expect), filterForUnexpired(now)))), updateDoc, opt); + + // // Return true on succesful update + // if (ret != null) { + // return true; + // } + + // // + // // We insert a record if possible, this handle sceneric 1b + // // + // if (expect == 0l) { + // try { + // InsertOneResult res = collection.insertOne( // + // new Document().append("key", key).append("val", update) // + // ); + + // if (res.wasAcknowledged()) { + // return true; + // } + // } catch (Exception e) { + // // This is probably due to a conflicting index + // return false; + // } + // } + + // // All Failed + // return false; + // } + + //-------------------------------------------------------------------------- + // + // Remove call + // + //-------------------------------------------------------------------------- + + /** + * Remove the value, given the key + * + * @param key param find the thae meta key + * + * @return null + **/ + @Override + public KeyLong remove(Object key) { + removeValue(key); + return null; + } + + /** + * Remove the value, given the key + * + * Important note: It does not return the previously stored value + * Its return String type is to maintain consistency with Map interfaces + * + * @param key param find the thae meta key + * + * @return null + **/ + @Override + public Long removeValue(Object key) { + if (key == null) { + throw new IllegalArgumentException("delete 'key' cannot be null"); + } + + // Delete the data + collection.deleteOne(Filters.eq("key", key)); + return null; + } + + //-------------------------------------------------------------------------- + // + // Maintenance calls + // + //-------------------------------------------------------------------------- + + /** + * Incremental maintainance should not trigger maintenance. + * As its potentially blocking with a very long call + **/ + public void incrementalMaintenance() { + // does nothing + } + + @Override + public void maintenance() { + // @TODO : something? (not sure what needs to be done) + } + +} \ No newline at end of file diff --git a/src/main/java/picoded/dstack/mongodb/MongoDB_KeyValueMap.java b/src/main/java/picoded/dstack/mongodb/MongoDB_KeyValueMap.java new file mode 100644 index 00000000..3458d52f --- /dev/null +++ b/src/main/java/picoded/dstack/mongodb/MongoDB_KeyValueMap.java @@ -0,0 +1,370 @@ +package picoded.dstack.mongodb; + +// Java imports +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +// JavaCommons imports +import picoded.core.conv.ConvertJSON; +import picoded.core.conv.GenericConvert; +import picoded.core.conv.NestedObjectFetch; +import picoded.core.conv.NestedObjectUtil; +import picoded.core.conv.StringEscape; +import picoded.core.struct.MutablePair; +import picoded.core.struct.query.OrderBy; +import picoded.core.struct.query.Query; +import picoded.core.struct.query.QueryType; +import picoded.core.common.ObjectToken; +import picoded.dstack.*; +import picoded.dstack.core.*; + +// MongoDB imports +import org.bson.Document; +import org.bson.types.Binary; +import org.bson.conversions.Bson; +import com.mongodb.client.*; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.IndexOptions; +import com.mongodb.client.model.Indexes; +import com.mongodb.client.model.FindOneAndUpdateOptions; +import com.mongodb.client.model.Aggregates; + +/** + * ## Purpose Support MongoDB implementation of KeyValueMap + * + * Built ontop of the Core_KeyValueMap implementation. + **/ +public class MongoDB_KeyValueMap extends Core_KeyValueMap { + + // -------------------------------------------------------------------------- + // + // Constructor + // + // -------------------------------------------------------------------------- + + /** MongoDB instance representing the backend connection */ + MongoCollection collection = null; + + /** + * Constructor, with name constructor + * + * @param inStack hazelcast stack to use + * @param name of data object map to use + */ + public MongoDB_KeyValueMap(MongoDBStack inStack, String name) { + super(); + collection = inStack.db_conn.getCollection(name); + } + + @Override + public void systemSetup() { + // + // By mongodb default we use its native _id implementation + // and handle our _oid seperately. + // + // We intentionally DO NOT use mongodb _id, allowing it retain optimal performance. + // + + // Lets create the unique key index + IndexOptions opt = new IndexOptions().unique(true).name("key"); + collection.createIndex(Indexes.ascending("key"), opt); + + // Expirary key support + opt = new IndexOptions().expireAfter(0L, TimeUnit.SECONDS); + collection.createIndex(Indexes.ascending("expireAt"), opt); + } + + /** + * Teardown and delete the backend storage table, etc. If needed + **/ + public void systemDestroy() { + collection.drop(); + } + + /** + * Removes all data, without tearing down setup + **/ + @Override + public void clear() { + // Delete all items + // + // Due to the lack of an all * wildcard + // we are using a exists OR condition, which is true + // for all objects + collection.deleteMany( // + Filters.or( // + Filters.exists("key", true), // + Filters.exists("key", false) // + ) // + ); // + } + + //-------------------------------------------------------------------------- + // + // Internal functions, used by DataObject + // + //-------------------------------------------------------------------------- + + /** + * Generate a BSON filter set, for unexpired items + * this should be used in combination with an AND clause filter + **/ + protected Bson filterForUnexpired() { + // Current timestamp + Date now = new Date(); + + // the or array to join + return Filters.or( // + Filters.exists("expireAt", false), // + Filters.gt("expireAt", now), // + Filters.lte("expireAt", 0) // + ); // + } + + /** + * Search using the value, all the relevent key mappings + * + * Handles re-entrant lock where applicable + * + * @param key, note that null matches ALL + * + * @return array of keys + **/ + @Override + public Set keySet(String value) { + // The return hashset + HashSet ret = new HashSet(); + + // Search result + FindIterable search = null; + + // Lets either fetch with a value, or everything + if (value == null) { + // Lets fetch everything ... D= + search = collection.find(filterForUnexpired()); + } else { + search = collection.find(Filters.and( // + filterForUnexpired(), // + Filters.eq("val", value) // + )); // + } + + // Get all the various keys + search = search.projection(Projections.include("key")); + + // Lets iterate the search + try (MongoCursor cursor = search.iterator()) { + while (cursor.hasNext()) { + ret.add(cursor.next().getString("key")); + } + } + + // Return the full keyset + return ret; + } + + //-------------------------------------------------------------------------- + // + // Fundemental set/get value (core) + // + //-------------------------------------------------------------------------- + + /** + * [Internal use, to be extended in future implementation] + * Sets the value, with validation + * + * @param key + * @param value, null means removal + * @param expire timestamp, 0 means not timestamp + * + * @return null + **/ + @Override + public String setValueRaw(String key, String value, long expireAt) { + // Configure this to be an "upsert" query + FindOneAndUpdateOptions opt = new FindOneAndUpdateOptions(); + opt.upsert(true); + + // Generate the document of changes + // See: https://www.mongodb.com/docs/manual/reference/operator/update/setOnInsert/ + + // Generate the "update" doc + Document updateDoc = new Document(); + Document set_doc = new Document(); + + // Expire timestamp if its configured, else it should be removed + if (expireAt > 0) { + set_doc.append("expireAt", new Date(expireAt)); + } else { + Document unset_doc = new Document(); + unset_doc.append("expireAt", ""); + updateDoc.append("$unset", unset_doc); + } + + // Setup the value on update/insert/upsert + set_doc.append("val", value); + updateDoc.append("$set", set_doc); + + // Set the key on insert + Document setOnInsert_doc = new Document(); + setOnInsert_doc.append("key", key); + updateDoc.append("$setOnInsert", setOnInsert_doc); + + // Upsert the document + collection.findOneAndUpdate(Filters.eq("key", key), updateDoc, opt); + return null; + } + + /** + * [Internal use, to be extended in future implementation] + * + * Returns the value and expiry, with validation against the current timestamp + * + * @param key as String + * @param now timestamp + * + * @return String value + **/ + @Override + public MutablePair getValueExpiryRaw(String key, long now) { + // Get the find result + FindIterable res = collection.find(Filters.eq("key", key)); + + // Get the Document object + Document resObj = res.first(); + if (resObj == null) { + return null; + } + + // Lets get all the key values + String val = GenericConvert.toString(resObj.get("val"), null); + Date expireAt_date = resObj.getDate("expireAt"); + long expireAt_long = 0; + + // Check if expireAt date is set + if (expireAt_date != null) { + expireAt_long = expireAt_date.getTime(); + } + + // Check for null objects + if (val == null || val.isEmpty()) { + return null; + } + + // No valid value found, return null + if (expireAt_long < 0) { + return null; + } + + // Expired value, return null + if (expireAt_long != 0 && expireAt_long < now) { + return null; + } + + // Get the value, and return the pair + return new MutablePair(val, expireAt_long); + } + + /** + * [Internal use, to be extended in future implementation] + * Sets the expire time stamp value, raw without validation + * + * @param key as String + * @param expireAt timestamp in seconds, 0 means NO expire + **/ + @Override + public void setExpiryRaw(String key, long expireAt) { + // Configure this to be an "update" query + FindOneAndUpdateOptions opt = new FindOneAndUpdateOptions(); + + // Generate the document of changes + // See: https://www.mongodb.com/docs/manual/reference/operator/update/setOnInsert/ + + // Generate the "update" doc + Document updateDoc = new Document(); + + // Expire timestamp if its configured, else it should be ignored + if (expireAt > 0) { + Document set_doc = new Document(); + set_doc.append("expireAt", new Date(expireAt)); + updateDoc.append("$set", set_doc); + } else { + Document unset_doc = new Document(); + unset_doc.append("expireAt", ""); + updateDoc.append("$unset", unset_doc); + } + + // Upsert the document + collection.findOneAndUpdate(Filters.eq("key", key), updateDoc, opt); + } + + //-------------------------------------------------------------------------- + // + // Remove call + // + //-------------------------------------------------------------------------- + + /** + * Remove the value, given the key + * + * @param key param find the thae meta key + * + * @return null + **/ + @Override + public KeyValue remove(Object key) { + removeValue(key); + return null; + } + + /** + * Remove the value, given the key + * + * Important note: It does not return the previously stored value + * Its return String type is to maintain consistency with Map interfaces + * + * @param key param find the thae meta key + * + * @return null + **/ + @Override + public String removeValue(Object key) { + if (key == null) { + throw new IllegalArgumentException("delete 'key' cannot be null"); + } + + // Delete the data + collection.deleteOne(Filters.eq("key", key)); + return null; + } + + //-------------------------------------------------------------------------- + // + // Maintenance calls + // + //-------------------------------------------------------------------------- + + /** + * Incremental maintainance should not trigger maintenance. + * As its potentially blocking with a very long call + **/ + public void incrementalMaintenance() { + // does nothing + } + + @Override + public void maintenance() { + // @TODO : something? (not sure what needs to be done) + } + +} \ No newline at end of file diff --git a/src/main/java/picoded/dstack/stack/Stack_FileWorkspaceMap.java b/src/main/java/picoded/dstack/stack/Stack_FileWorkspaceMap.java index 61cd347a..0bce15cf 100755 --- a/src/main/java/picoded/dstack/stack/Stack_FileWorkspaceMap.java +++ b/src/main/java/picoded/dstack/stack/Stack_FileWorkspaceMap.java @@ -3,9 +3,16 @@ import picoded.dstack.CommonStructure; import picoded.dstack.core.Core_FileWorkspaceMap; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.List; import java.util.Set; +import org.apache.commons.io.IOUtils; + /** * Stacked implementation of KeyValueMap data structure. * @@ -74,6 +81,10 @@ public CommonStructure[] commonStructureStack() { // [Internal use, to be extended in future implementation] // //-------------------------------------------------------------------------- + + // Workspace operations + //-------------------------------------------------------------------------- + /** * [Internal use, to be extended in future implementation] * @@ -111,8 +122,24 @@ public boolean backend_workspaceExist(String oid) { } /** - * [Internal use, to be extended in future implementation] + * Setup the current fileWorkspace within the fileWorkspaceMap, * + * This ensures the workspace _oid is registered within the map, + * even if there is 0 files. + * + * Does not throw any error if workspace was previously setup + */ + @Override + public void backend_setupWorkspace(String oid) { + for (int i = dataLayers.length - 1; i >= 0; --i) { + dataLayers[i].backend_setupWorkspace(oid); + } + } + + // File read and write using byte array + //-------------------------------------------------------------------------- + + /** * Get and return the stored data as a byte[] * * @param ObjectID of workspace @@ -141,6 +168,95 @@ public byte[] backend_fileRead(String oid, String filepath) { } + /** + * Writes the full byte array of a file in the backend + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * @param data to write the file with + **/ + @Override + public void backend_fileWrite(String oid, String filepath, byte[] data) { + // Write the data starting from the lowest layer + for (int i = dataLayers.length - 1; i >= 0; --i) { + dataLayers[i].backend_fileWrite(oid, filepath, data); + } + } + + // File read and write using byte stream + //-------------------------------------------------------------------------- + + /** + * Get and return the stored data as a InputStream + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * + * @return the stored byte array of the file + **/ + @Override + public InputStream backend_fileReadInputStream(final String oid, final String filepath) { + + // Due to the behaviour of how the file data needs to be handled across multiple layers + // we only use an optimized "readStream" call if the filesystem is a single stack layer + if (dataLayers.length == 1) { + return dataLayers[0].backend_fileReadInputStream(oid, filepath); + } + + // Fallback behaviour, polyfill the byte[] implementation + //------------------------------------------------------------ + byte[] rawBytes = backend_fileRead(oid, filepath); + if (rawBytes == null) { + return null; + } + return new ByteArrayInputStream(rawBytes); + } + + /** + * Writes the full by of a file in the backend + * + * @param ObjectID of workspace + * @param filepath to use for the workspace + * @param data to write the file with + **/ + @Override + public void backend_fileWriteInputStream(final String oid, final String filepath, + final InputStream data) { + + // + // Due to the behaviour of how the file data needs to be handled across multiple layers + // we only use an optimized "writeStream" call ONLY if the filesystem is a single stack layer + // + // Else we will revert to byte[] that can be applied multiple times across the stack + // + if (dataLayers.length == 1) { + dataLayers[0].backend_fileWriteInputStream(oid, filepath, data); + return; + } + + // Fallback behaviour, polyfill the byte[] implementation + //------------------------------------------------------------ + + // forward the null, and let the error handling below settle it + if (data == null) { + backend_fileWrite(oid, filepath, null); + } + + // Converts it to bytearray respectively + byte[] rawBytes = null; + try { + rawBytes = IOUtils.toByteArray(data); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Does the bytearray writes + backend_fileWrite(oid, filepath, rawBytes); + } + + // File exist / removal + //-------------------------------------------------------------------------- + /** * [Internal use, to be extended in future implementation] * @@ -167,23 +283,6 @@ public boolean backend_fileExist(final String oid, final String filepath) { return false; } - /** - * [Internal use, to be extended in future implementation] - * - * Writes the full byte array of a file in the backend - * - * @param ObjectID of workspace - * @param filepath to use for the workspace - * @param data to write the file with - **/ - @Override - public void backend_fileWrite(String oid, String filepath, byte[] data) { - // Write the data starting from the lowest layer - for (int i = dataLayers.length - 1; i >= 0; --i) { - dataLayers[i].backend_fileWrite(oid, filepath, data); - } - } - /** * [Internal use, to be extended in future implementation] * @@ -200,21 +299,6 @@ public void backend_removeFile(String oid, String filepath) { } } - /** - * Setup the current fileWorkspace within the fileWorkspaceMap, - * - * This ensures the workspace _oid is registered within the map, - * even if there is 0 files. - * - * Does not throw any error if workspace was previously setup - */ - @Override - public void backend_setupWorkspace(String oid) { - for (int i = dataLayers.length - 1; i >= 0; --i) { - dataLayers[i].backend_setupWorkspace(oid); - } - } - //-------------------------------------------------------------------------- // // Folder Pathing support diff --git a/src/main/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap.java b/src/main/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap.java index a5c4e6c0..67e742cd 100755 --- a/src/main/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap.java +++ b/src/main/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap.java @@ -614,7 +614,7 @@ public Set backend_getFileAndFolderPathSet(final String oid, final Strin } // Return a filtered set - return backend_filtterPathSet(workspace.keySet(), searchPath, minDepth, maxDepth, 0); + return backend_filterPathSet(workspace.keySet(), searchPath, minDepth, maxDepth, 0); } finally { accessLock.readLock().unlock(); } diff --git a/src/test/README.md b/src/test/README.md new file mode 100644 index 00000000..cff49382 --- /dev/null +++ b/src/test/README.md @@ -0,0 +1,22 @@ +# Docker Commands to setup "testing DB" locally + +## MongoDB + +DB Setup + +``` +sudo docker run --name dstack-mongodb -p 27017:27017 -d mongo:5 +``` + +Run Tests + +``` +./gradlew test -Ptest_all --tests picoded.dstack.mongodb.* +``` + +Cleanup DB + +``` +sudo docker stop dstack-mongodb; +sudo docker rm dstack-mongodb; +``` \ No newline at end of file diff --git a/src/test/java/picoded/dstack/mongodb/MongoDB_FileWorkspaceMap_test.java b/src/test/java/picoded/dstack/mongodb/MongoDB_FileWorkspaceMap_test.java new file mode 100755 index 00000000..00b034eb --- /dev/null +++ b/src/test/java/picoded/dstack/mongodb/MongoDB_FileWorkspaceMap_test.java @@ -0,0 +1,47 @@ +package picoded.dstack.mongodb; + +import picoded.core.struct.*; +import picoded.dstack.*; +import picoded.dstack.struct.simple.*; + +/** + * ## Purpose + * This class is meant to test the MongoDB_FileWorkspaceMap implementation, + * and ensure that it passes all the test layed out in StructSImple_FileWorkspaceMap_test + * + */ +public class MongoDB_FileWorkspaceMap_test extends StructSimple_FileWorkspaceMap_test { + + // Hazelcast stack instance + protected static volatile MongoDBStack instance = null; + + // To override for implementation + //----------------------------------------------------- + + /// Impomentation constructor + public FileWorkspaceMap implementationConstructor() { + + // Initialize server + synchronized (MongoDB_FileWorkspaceMap_test.class) { + if (instance == null) { + // The default config uses localhost, 27017 + GenericConvertMap mongodbConfig = new GenericConvertHashMap<>(); + mongodbConfig.put("host", DStackTestConfig.MONGODB_HOST()); + mongodbConfig.put("port", DStackTestConfig.MONGODB_PORT()); + + // Use a random DB name + mongodbConfig.put("name", DStackTestConfig.randomTablePrefix()); + + GenericConvertMap stackConfig = new GenericConvertHashMap<>(); + stackConfig.put("name", "MongoDB_FileWorkspaceMap_test"); + stackConfig.put("mongodb", mongodbConfig); + + instance = new MongoDBStack(stackConfig); + } + } + + // Load the FileWorkspaceMap + return instance.fileWorkspaceMap(DStackTestConfig.randomTablePrefix()); + } + +} diff --git a/src/test/java/picoded/dstack/mongodb/MongoDB_KeyLongMap_test.java b/src/test/java/picoded/dstack/mongodb/MongoDB_KeyLongMap_test.java new file mode 100755 index 00000000..27a05252 --- /dev/null +++ b/src/test/java/picoded/dstack/mongodb/MongoDB_KeyLongMap_test.java @@ -0,0 +1,47 @@ +package picoded.dstack.mongodb; + +import picoded.core.struct.*; +import picoded.dstack.*; +import picoded.dstack.struct.simple.*; + +/** + * ## Purpose + * This class is meant to test the MongoDB_KeyLongMap implementation, + * and ensure that it passes all the test layed out in StructSImple_KeyLongMap_test + * + */ +public class MongoDB_KeyLongMap_test extends StructSimple_KeyLongMap_test { + + // Hazelcast stack instance + protected static volatile MongoDBStack instance = null; + + // To override for implementation + //----------------------------------------------------- + + /// Impomentation constructor + public KeyLongMap implementationConstructor() { + + // Initialize server + synchronized (MongoDB_KeyLongMap_test.class) { + if (instance == null) { + // The default config uses localhost, 27017 + GenericConvertMap mongodbConfig = new GenericConvertHashMap<>(); + mongodbConfig.put("host", DStackTestConfig.MONGODB_HOST()); + mongodbConfig.put("port", DStackTestConfig.MONGODB_PORT()); + + // Use a random DB name + mongodbConfig.put("name", DStackTestConfig.randomTablePrefix()); + + GenericConvertMap stackConfig = new GenericConvertHashMap<>(); + stackConfig.put("name", "MongoDB_KeyLongMap_test"); + stackConfig.put("mongodb", mongodbConfig); + + instance = new MongoDBStack(stackConfig); + } + } + + // Load the KeyLongMap + return instance.keyLongMap(DStackTestConfig.randomTablePrefix()); + } + +} diff --git a/src/test/java/picoded/dstack/mongodb/MongoDB_KeyValueMap_test.java b/src/test/java/picoded/dstack/mongodb/MongoDB_KeyValueMap_test.java new file mode 100755 index 00000000..5976b97d --- /dev/null +++ b/src/test/java/picoded/dstack/mongodb/MongoDB_KeyValueMap_test.java @@ -0,0 +1,47 @@ +package picoded.dstack.mongodb; + +import picoded.core.struct.*; +import picoded.dstack.*; +import picoded.dstack.struct.simple.*; + +/** + * ## Purpose + * This class is meant to test the MongoDB_KeyValueMap implementation, + * and ensure that it passes all the test layed out in StructSImple_KeyValueMap_test + * + */ +public class MongoDB_KeyValueMap_test extends StructSimple_KeyValueMap_test { + + // Hazelcast stack instance + protected static volatile MongoDBStack instance = null; + + // To override for implementation + //----------------------------------------------------- + + /// Impomentation constructor + public KeyValueMap implementationConstructor() { + + // Initialize server + synchronized (MongoDB_KeyValueMap_test.class) { + if (instance == null) { + // The default config uses localhost, 27017 + GenericConvertMap mongodbConfig = new GenericConvertHashMap<>(); + mongodbConfig.put("host", DStackTestConfig.MONGODB_HOST()); + mongodbConfig.put("port", DStackTestConfig.MONGODB_PORT()); + + // Use a random DB name + mongodbConfig.put("name", DStackTestConfig.randomTablePrefix()); + + GenericConvertMap stackConfig = new GenericConvertHashMap<>(); + stackConfig.put("name", "MongoDB_KeyValueMap_test"); + stackConfig.put("mongodb", mongodbConfig); + + instance = new MongoDBStack(stackConfig); + } + } + + // Load the KeyValueMap + return instance.keyValueMap(DStackTestConfig.randomTablePrefix()); + } + +} diff --git a/src/test/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap_test.java b/src/test/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap_test.java index 32da49c2..8d5ad24b 100755 --- a/src/test/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap_test.java +++ b/src/test/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap_test.java @@ -7,9 +7,13 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.util.Arrays; import java.util.HashSet; +import org.apache.commons.io.IOUtils; // Test Case include import org.junit.After; import org.junit.Before; @@ -156,9 +160,51 @@ public void fileWrite_andProperlySetupFolder() { // Remove and assert fileWorkspace.removeFolderPath("test/folder"); + assertFalse(fileWorkspace.fileExist("test/folder/file.txt")); assertFalse(fileWorkspace.folderPathExist("test/folder")); + assertTrue(fileWorkspace.folderPathExist("test")); - assertFalse(fileWorkspace.fileExist("test/folder/file.txt")); + } + + //----------------------------------------------------------------------------------- + // + // Multiple Writes + // + //----------------------------------------------------------------------------------- + + @Test + public void fileWrite_fiveTimes() { + // Get the file workspace to use + FileWorkspace fileWorkspace = testObj.newEntry(); + assertNotNull(fileWorkspace); + + // Folder does not exist first + assertFalse(fileWorkspace.folderPathExist("test/folder")); + + // Write and read file + for(int i=0; i < 5; ++i) { + fileWorkspace.writeString("test/folder/file.txt", "ver-"+i); + assertEquals("ver-"+i, fileWorkspace.readString("test/folder/file.txt")); + fileWorkspace.writeString("test/folder/file.txt", "ver-"+i); + assertEquals("ver-"+i, fileWorkspace.readString("test/folder/file.txt")); + } + } + @Test + public void fileWrite_twentyTimes() { + // Get the file workspace to use + FileWorkspace fileWorkspace = testObj.newEntry(); + assertNotNull(fileWorkspace); + + // Folder does not exist first + assertFalse(fileWorkspace.folderPathExist("test/folder")); + + // Write and read file + for(int i=0; i < 20; ++i) { + fileWorkspace.writeString("test/folder/file.txt", "ver-"+i); + assertEquals("ver-"+i, fileWorkspace.readString("test/folder/file.txt")); + fileWorkspace.writeString("test/folder/file.txt", "ver-"+i); + assertEquals("ver-"+i, fileWorkspace.readString("test/folder/file.txt")); + } } //----------------------------------------------------------------------------------- @@ -329,6 +375,20 @@ public void readNonExistenceFile() { } + @Test + public void writeAndReadToFile_stream() throws Exception { + // Output stream to use for content + ByteArrayInputStream buffer = new ByteArrayInputStream("data to write".getBytes()); + + FileWorkspace fileWorkspace = testObj.newEntry(); + fileWorkspace.writeInputStream("testPath", buffer); + assertNotNull(testObj.get(fileWorkspace._oid()).readByteArray("testPath")); + + InputStream readData = testObj.get(fileWorkspace._oid()).readInputStream("testPath"); + byte[] readArray = IOUtils.toByteArray(readData); + assertEquals(new String(readArray), "data to write"); + } + @Test public void deleteExistingFile() { FileWorkspace fileWorkspace = testObj.newEntry();