Skip to content

Commit

Permalink
Mfs (#73)
Browse files Browse the repository at this point in the history
* * Added madara::utility::file_crc
* Added madara::utility::file_size for file streams
* Added internals of FragmentsToFilesFilter to look for MFS file fragments
* Added tests for Fragments filter and new utility functions
* [skip ci]

* * [skip ci]
* FragmentsToFilesFilter now takes fragments and saves them to disk in the format {filename}.{frag}.{crc}.frag
  * Still need to piece everything together when the fragments are done
* Updated test_filters.cpp to test changes to FragmentsToFilesFilter
* Added filename_has_redirects to madara::utility for checking for .., ~, etc.

* * Added file_from_fragments and the ability to merge fragments and delete them through the utility function
* Added calls to new utility function from FragmentsToFilesFilter.cpp

* * Added FragmentsToFilesFilter implementation for piecing together files from MFS transfers
* Updated python port documentation
* Added read_thread_hertz to Python TransportSettings
* Added AggregateFilter to Python port
* Added FragmentsToFilesFilter to Python port
* Added utility functions to Python port
* Added read thread hertz setting capability to karl and mfs
* Updated test_filters to properly test for MFS file transfer
* Added FileReceiver python test for MFS transfer

* * Moved python test into same folder that David put the test_any.py into

* * Using same naming convention as other test

* * correcting issue with the file_receiver test being saved to wrong dir

* * Removed ALWAYS logging in FragmentsToFilesFilter
* Removed boost::filesystem usage in karl
* Removed boost::filesystem usage in Utility
* Updated test_filters to test for SUCCESS/FAIL with FragmentsToFilesFilter

* * Cleaned up tools/karl segfault and style issues

* * Implementing equivalent of path.filename from boost::filesystem in karl.cpp

* * Added FileStreamer
* Updated MFS to use FileStreamer
* Added reset to containers::Map
* Added clear by VariableReference to ThreadSafeContent
* Segfault occurs if I try to clear map entries in any way

* * Fixed memory corruption issue with containers::Map and the clear/reset methods
* Fixed heap related issues for FileStreamer. FileStreamer should fix the issue with instability with large files that was being experienced by FileFragmenter in MFS
  • Loading branch information
jredmondson committed Sep 14, 2018
1 parent 1cc4d47 commit 16d02f7
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 76 deletions.
186 changes: 186 additions & 0 deletions include/madara/knowledge/FileStreamer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#ifndef _MADARA_KNOWLEDGE_FILESTREAMER_H_
#define _MADARA_KNOWLEDGE_FILESTREAMER_H_

#include <string>
#include <fstream>
#include <vector>
#include "madara/knowledge/KnowledgeUpdateSettings.h"
#include "madara/knowledge/KnowledgeRecord.h"
#include "madara/utility/Utility.h"
#include "madara/utility/ScopedArray.h"
#include "madara/knowledge/containers/FlexMap.h"
#include "madara/logger/GlobalLogger.h"

namespace madara
{
namespace knowledge
{
/**
* @class FileStreamer
* @brief Splits files into fragments that can be saved to and loaded from
* a knowledge base
*/
class FileStreamer
{
public:
/**
* Constructor
**/
FileStreamer ()
{
}

/**
* Constructor
* @param key the location in the knowledge base to save to
* @param filename the file to open and read from
* @param kb the knowledge base to stream to
**/
FileStreamer (const std::string & key, const std::string & filename,
KnowledgeBase kb)
{
init (key, filename, kb);
}

/**
* Initializes the file stream and all containers
* @param key the location in the knowledge base to save to
* @param filename the file to open and read from
* @param kb the knowledge base to stream to
**/
inline void init (
const std::string & key, const std::string & filename,
KnowledgeBase kb)
{
// setup containers
file_space.set_name (key, kb);
file_space["size"].to_container (file_size);
file_space["crc"].to_container (file_crc);
file_space["contents"].to_container (file_fragments);
file_fragments.clear ();

file_crc.set_name (file_space["crc"].get_name (), kb);
file_size.set_name (file_space["size"].get_name (), kb);

// setup file information
filename_ = filename;
file_crc = (KnowledgeRecord::Integer) utility::file_crc (filename);
file_size = (KnowledgeRecord::Integer) utility::file_size (filename);

stream_.open (filename, std::ios::in | std::ios::binary);
}

/**
* Loads a fragment into the knowledge base
**/
inline size_t load (size_t index, size_t frag_size = 60000)
{
size_t bytes_read = 0;

if (stream_)
{
// seek to the index position in the file
stream_.seekg ((std::streampos)index * frag_size, std::ios::beg);

// read the fragment into a local buffer
std::vector<unsigned char> frag (frag_size);
// char * frag = new char [frag_size];
stream_.read ((char *)frag.data (), frag_size);
bytes_read = stream_.gcount ();
frag.resize (bytes_read);

KnowledgeRecord record;
record.emplace_file (std::move (frag));

// set the file fragment into the KB
file_fragments.set (
std::to_string ((unsigned long long)index), record);
// file_fragments.emplace_file ()
}

return bytes_read;
}

/**
* Returns the crc of the file
* @return the crc of the file
**/
inline uint32_t get_crc (void)
{
return (uint32_t) file_crc.to_integer ();
}

/**
* Returns the size of the file
* @return the size of the file
**/
inline size_t get_size (void)
{
return (size_t) file_size.to_integer ();
}

/**
* Returns the name of the file
* @return the name of the file being streamed
**/
inline std::string get_filename (void)
{
return filename_;
}

/**
* Clears the fragment list
**/
inline void clear_fragments (void)
{
// option 1. seg fault
file_fragments.clear ();

// option 2. seg fault
// knowledge::KnowledgeRules keys;
// file_fragments.keys (keys);

// for (auto key : keys)
// {
// file_fragments.erase (key);
// }

// option 3. seg fault
//file_fragments.reset ();

// option 4. nothing doesn't segfault
}

/**
* Remodifies the file size and crc
**/
inline void modify (void)
{
file_size.modify ();
file_crc.modify ();
}

/// records that contain the file fragments
containers::FlexMap file_space;

/// the size of the file contents
containers::Integer file_size;

/// the crc of the file
containers::Integer file_crc;

/// the crc of the file
containers::Map file_fragments;

private:

/// the name of the file
std::string filename_;

/// the stream to read from
std::ifstream stream_;
};
}
}

#endif // _MADARA_KNOWLEDGE_FILESTREAMER_H_
9 changes: 9 additions & 0 deletions include/madara/knowledge/ThreadSafeContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,15 @@ namespace madara
const KnowledgeReferenceSettings & settings =
KnowledgeReferenceSettings ());

/**
* Clears a variable. This is much safer than @see delete_variable.
* It clears the memory used in the variable and marks it as UNCREATED,
* meaning that it is effectively deleted, will not show up in
* @see print statements or @see save_checkpoint.
* @param variable reference to a variable (@see get_ref)
**/
bool clear (const VariableReference & variable);

/**
* Deletes the key. Note that this is extremely unsafe. You
* can cause illegal operations in the knowledge base by using
Expand Down
22 changes: 22 additions & 0 deletions include/madara/knowledge/ThreadSafeContext.inl
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,28 @@ const KnowledgeReferenceSettings & settings)
return found;
}

inline bool
ThreadSafeContext::clear (
const VariableReference & variable)
{
if (variable.is_valid ())
{
MADARA_GUARD_TYPE guard (mutex_);

// erase any changed or local changed map entries
// changed_map_.erase (variable.entry_->first.c_str ());
// local_changed_map_.erase (variable.entry_->first.c_str ());

variable.entry_->second.clear_value ();

return true;
}
else
{
return false;
}
}

// return whether or not the key exists
inline bool
ThreadSafeContext::delete_variable (
Expand Down
59 changes: 57 additions & 2 deletions include/madara/knowledge/containers/Map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ madara::knowledge::containers::Map::exchange (
void
madara::knowledge::containers::Map::clear (bool clear_knowledge)
{
if (clear_knowledge)
if (context_ && clear_knowledge)
{
ContextGuard context_guard (*context_);
MADARA_GUARD_TYPE guard (mutex_);
Expand All @@ -385,10 +385,27 @@ madara::knowledge::containers::Map::clear (bool clear_knowledge)

for (size_t i = 0; i < keys.size (); ++i)
this->erase (keys[i]);

map_.clear (); }

else if (context_)
{
MADARA_GUARD_TYPE guard (mutex_);
map_.clear ();
}
else
}

void
madara::knowledge::containers::Map::reset (void)
{
if (context_)
{
ContextGuard context_guard (*context_);
MADARA_GUARD_TYPE guard (mutex_);

for (auto entry : map_)
context_->clear (entry.first);

map_.clear ();
}
}
Expand All @@ -399,6 +416,7 @@ madara::knowledge::containers::Map::erase (const std::string & key)
if (context_)
{
ContextGuard context_guard (*context_);
MADARA_GUARD_TYPE guard (mutex_);

// find the key in the internal map
InternalMap::iterator found = map_.find (key);
Expand All @@ -407,6 +425,7 @@ madara::knowledge::containers::Map::erase (const std::string & key)
if (found != map_.end ())
{
context_->delete_variable (found->second.get_name ());
map_.erase (found);
}
}
}
Expand Down Expand Up @@ -671,6 +690,42 @@ madara::knowledge::containers::Map::set (const std::string & key,
return result;
}

int
madara::knowledge::containers::Map::set (const std::string & key,
const KnowledgeRecord & value,
const KnowledgeUpdateSettings & settings)
{
int result = -1;

if (context_ && key != "")
{
ContextGuard context_guard (*context_);
MADARA_GUARD_TYPE guard (mutex_);

std::stringstream buffer;
buffer << name_;
buffer << delimiter_;
buffer << key;

std::string final_key = buffer.str ();
std::map <std::string, VariableReference>::iterator entry =
map_.find (final_key);

if (entry == map_.end ())
{
VariableReference ref = context_->get_ref (final_key, settings);
map_[key] = ref;
result = context_->set (ref, value, settings);
}
else
{
result = context_->set (entry->second, value, settings);
}
}

return result;
}

int
madara::knowledge::containers::Map::set_index (
const std::string & key,
Expand Down
20 changes: 20 additions & 0 deletions include/madara/knowledge/containers/Map.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ namespace madara
**/
void clear (bool clear_knowledge = true);

/**
* Resets the map. This will reset all the map variables to empty
* in the underlying context and also clear the containers keys
**/
void reset (void);

/**
* Erases a variable from the map
* @param key the variable to delete from the map
Expand Down Expand Up @@ -254,6 +260,20 @@ namespace madara
madara::knowledge::KnowledgeRecord::Integer value,
const KnowledgeUpdateSettings & settings);

/**
* Sets a value to a knowledge record in the map
*
* @param key location within the map
* @param value value to set at location
* @param settings settings for applying the update
* @return 0 if successful, -1 if key is null, and
* -2 if quality isn't high enough
**/
int set (const std::string & key,
const KnowledgeRecord & value,
const KnowledgeUpdateSettings & settings =
KnowledgeUpdateSettings ());

/**
* Sets an index within an array to a specified value
*
Expand Down
22 changes: 0 additions & 22 deletions tests/test_filters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,7 @@ void test_fragments_to_files_filter (void)
std::cerr << "FAIL\n";
++madara_fails;
}

utility::file_from_fragments ("files/images/manaus.jpg", crc);

filter.filter (args_less_1, context, vars);

std::cerr << "Testing get_file_progress... ";

size_t received = utility::get_file_progress ("files/images/manaus.jpg",
crc, fragmenter.file_size);

if (received == fragmenter.file_size - 60000)
{
std::cerr << "SUCCESS\n";
}
else
{
std::cerr << "FAIL." << received << " bytes out of " <<
fragmenter.file_size << ", instead of " <<
(fragmenter.file_size - received) << "\n";
++madara_fails;
}
}

void test_dynamic_predicate_filter (void)
{
madara::knowledge::KnowledgeBase kb;
Expand Down

0 comments on commit 16d02f7

Please sign in to comment.