Skip to content

Commit

Permalink
Moved rack specific logic from the data layer to the DFS.
Browse files Browse the repository at this point in the history
The DFSs require access to rack-related information. Previously,
we had no way to access this data because the DFS is a field
of the DataLayerManager which was storing the rack information.

Affected modules: scheduling/flow sim/dfs

Change-Id: I9fd063e3f3a5628c2dd4e65eb910ffce93e1eae1
  • Loading branch information
ICGog committed Apr 22, 2016
1 parent 7eed57a commit e39ef2a
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 153 deletions.
2 changes: 0 additions & 2 deletions src/scheduling/flow/quincy_cost_model.cc
Expand Up @@ -19,8 +19,6 @@
#include "scheduling/knowledge_base.h"
#include "scheduling/flow/cost_model_interface.h"

// Racks contain "between 29 and 31 computers" in Quincy test setup
DEFINE_uint64(machines_per_rack, 30, "Number of machines per rack");
DEFINE_double(quincy_wait_time_factor, 0.5, "The Quincy wait time factor");
DEFINE_double(quincy_preferred_machine_data_fraction, 0.1,
"Threshold of proportion of data stored on machine for it to be "
Expand Down
1 change: 1 addition & 0 deletions src/sim/CMakeLists.txt
Expand Up @@ -4,6 +4,7 @@ set(SIM_DFS_SRC
sim/dfs/simulated_data_layer_manager.cc
sim/dfs/google_block_distribution.cc
sim/dfs/simulated_bounded_dfs.cc
sim/dfs/simulated_dfs.cc
sim/dfs/simulated_uniform_dfs.cc
)

Expand Down
6 changes: 3 additions & 3 deletions src/sim/dfs/simulated_bounded_dfs.cc
Expand Up @@ -48,9 +48,9 @@ void SimulatedBoundedDFS::AddBlocksForTask(const TaskDescriptor& td,
FindOrNull(tasks_on_machine_, machine_res_id);
CHECK_NOTNULL(tasks_machine);
tasks_machine->insert(task_id);
// TODO(ionel): Update the code so that DataLocation's rack_id_ is
// correctly initialized.
DataLocation data_location(machine_res_id, 0, block_id,
DataLocation data_location(machine_res_id,
GetRackForMachine(machine_res_id),
block_id,
FLAGS_simulated_block_size);
task_to_data_locations_.insert(
pair<TaskID_t, DataLocation>(task_id, data_location));
Expand Down
63 changes: 5 additions & 58 deletions src/sim/dfs/simulated_data_layer_manager.cc
Expand Up @@ -27,29 +27,26 @@ DEFINE_uint64(simulated_dfs_replication_factor, 3,
DEFINE_string(simulated_dfs_type, "bounded", "The type of DFS to simulated. "
"Options: uniform | bounded");

DECLARE_uint64(machines_per_rack);

namespace firmament {
namespace sim {

SimulatedDataLayerManager::SimulatedDataLayerManager(
TraceGenerator* trace_generator)
: trace_generator_(trace_generator), unique_rack_id_(0) {
TraceGenerator* trace_generator) {
input_block_dist_ = new GoogleBlockDistribution();
runtime_dist_ =
new GoogleRuntimeDistribution(FLAGS_simulated_quincy_runtime_factor,
FLAGS_simulated_quincy_runtime_power);
if (!FLAGS_simulated_dfs_type.compare("uniform")) {
dfs_ = new SimulatedUniformDFS(trace_generator_);
dfs_ = new SimulatedUniformDFS(trace_generator);
} else if (!FLAGS_simulated_dfs_type.compare("bounded")) {
dfs_ = new SimulatedBoundedDFS(trace_generator_);
dfs_ = new SimulatedBoundedDFS(trace_generator);
} else {
LOG(FATAL) << "Unexpected simulated DFS type: " << FLAGS_simulated_dfs_type;
}
}

SimulatedDataLayerManager::~SimulatedDataLayerManager() {
// trace_generator_ is not owned by SimulatedDataLayerManager.
delete input_block_dist_;
delete runtime_dist_;
delete dfs_;
Expand All @@ -59,71 +56,21 @@ EquivClass_t SimulatedDataLayerManager::AddMachine(
const string& hostname,
ResourceID_t machine_res_id) {
CHECK(InsertIfNotPresent(&hostname_to_res_id_, hostname, machine_res_id));
EquivClass_t rack_ec;
if (racks_with_spare_links_.size() > 0) {
// Assign the machine to a rack that has spare links.
rack_ec = *(racks_with_spare_links_.begin());
} else {
// Add a new rack.
rack_ec = unique_rack_id_;
unique_rack_id_++;
CHECK(InsertIfNotPresent(
&rack_to_machine_res_, rack_ec,
unordered_set<ResourceID_t, boost::hash<ResourceID_t>>()));
racks_with_spare_links_.insert(rack_ec);
}
auto machines_in_rack = FindOrNull(rack_to_machine_res_, rack_ec);
CHECK_NOTNULL(machines_in_rack);
machines_in_rack->insert(machine_res_id);
// Erase the rack from the spare_links set if the rack is now full.
if (machines_in_rack->size() == FLAGS_machines_per_rack) {
racks_with_spare_links_.erase(rack_ec);
}
CHECK(InsertIfNotPresent(&machine_to_rack_ec_, machine_res_id, rack_ec));
dfs_->AddMachine(machine_res_id);
return rack_ec;
return dfs_->AddMachine(machine_res_id);
}

void SimulatedDataLayerManager::GetFileLocations(
const string& file_path, list<DataLocation>* locations) {
CHECK_NOTNULL(locations);
dfs_->GetFileLocations(file_path, locations);
// TODO(ionel): Remove the following code once rack_id_ is correctly set by
// the file system.
for (list<DataLocation>::iterator it = locations->begin();
it != locations->end();
++it) {
EquivClass_t* rack_ec =
FindOrNull(machine_to_rack_ec_, it->machine_res_id_);
CHECK_NOTNULL(rack_ec);
it->rack_id_ = *rack_ec;
}
}

bool SimulatedDataLayerManager::RemoveMachine(const string& hostname) {
bool rack_removed = false;
ResourceID_t* machine_res_id = FindOrNull(hostname_to_res_id_, hostname);
CHECK_NOTNULL(machine_res_id);
EquivClass_t rack_ec = GetRackForMachine(*machine_res_id);
auto machines_in_rack = FindOrNull(rack_to_machine_res_, rack_ec);
CHECK_NOTNULL(machines_in_rack);
ResourceID_t res_id_tmp = *machine_res_id;
machines_in_rack->erase(res_id_tmp);
if (machines_in_rack->size() == 0) {
// The rack doesn't have any machines left. Delete it!
// We have to delete empty racks because we're using the number
// of racks to efficiently find if there's a rack on which a task has no
// data.
rack_to_machine_res_.erase(rack_ec);
rack_removed = true;
} else {
racks_with_spare_links_.insert(rack_ec);
rack_removed = false;
}
machine_to_rack_ec_.erase(*machine_res_id);
dfs_->RemoveMachine(*machine_res_id);
hostname_to_res_id_.erase(hostname);
return rack_removed;
return dfs_->RemoveMachine(res_id_tmp);
}

uint64_t SimulatedDataLayerManager::AddFilesForTask(
Expand Down
32 changes: 6 additions & 26 deletions src/sim/dfs/simulated_data_layer_manager.h
Expand Up @@ -9,7 +9,7 @@

#include "misc/trace_generator.h"
#include "sim/dfs/google_block_distribution.h"
#include "sim/dfs/simulated_dfs_interface.h"
#include "sim/dfs/simulated_dfs.h"
#include "sim/google_runtime_distribution.h"

namespace firmament {
Expand Down Expand Up @@ -37,43 +37,23 @@ class SimulatedDataLayerManager : public DataLayerManagerInterface {
bool RemoveMachine(const string& hostname);
inline const unordered_set<ResourceID_t, boost::hash<ResourceID_t>>&
GetMachinesInRack(EquivClass_t rack_ec) {
auto machines_in_rack = FindOrNull(rack_to_machine_res_, rack_ec);
CHECK_NOTNULL(machines_in_rack);
return *machines_in_rack;
return dfs_->GetMachinesInRack(rack_ec);
}

inline uint64_t GetNumRacks() {
return rack_to_machine_res_.size();
return dfs_->GetNumRacks();
}
inline void GetRackIDs(vector<EquivClass_t>* rack_ids) {
for (auto& rack_to_machines : rack_to_machine_res_) {
rack_ids->push_back(rack_to_machines.first);
}
return dfs_->GetRackIDs(rack_ids);
}
inline EquivClass_t GetRackForMachine(ResourceID_t machine_res_id) {
EquivClass_t* rack_ec =
FindOrNull(machine_to_rack_ec_, machine_res_id);
CHECK_NOTNULL(rack_ec);
return *rack_ec;
return dfs_->GetRackForMachine(machine_res_id);
}

private:
// Set storing the racks to which we can still connect machines.
unordered_set<EquivClass_t> racks_with_spare_links_;
// Map storing the machine resource ids associated with each rack.
unordered_map<EquivClass_t,
unordered_set<ResourceID_t, boost::hash<ResourceID_t>>>
rack_to_machine_res_;
// Map storing the rack EC associated with each machine.
unordered_map<ResourceID_t, EquivClass_t, boost::hash<ResourceID_t>>
machine_to_rack_ec_;
GoogleBlockDistribution* input_block_dist_;
GoogleRuntimeDistribution* runtime_dist_;
SimulatedDFSInterface* dfs_;
SimulatedDFS* dfs_;
unordered_map<string, ResourceID_t> hostname_to_res_id_;
TraceGenerator* trace_generator_;
// Counter used to generate unique rack ids.
EquivClass_t unique_rack_id_;
};

} // namespace sim
Expand Down
63 changes: 63 additions & 0 deletions src/sim/dfs/simulated_dfs.cc
@@ -0,0 +1,63 @@
// The Firmament project
// Copyright (c) 2016 Ionel Gog <ionel.gog@cl.cam.ac.uk>

#include "sim/dfs/simulated_dfs.h"

namespace firmament {
namespace sim {

// Racks contain "between 29 and 31 computers" in Quincy test setup
DEFINE_uint64(machines_per_rack, 30, "Number of machines per rack");

SimulatedDFS::SimulatedDFS() : unique_rack_id_(0) {
}

EquivClass_t SimulatedDFS::AddMachine(ResourceID_t machine_res_id) {
EquivClass_t rack_ec;
if (racks_with_spare_links_.size() > 0) {
// Assign the machine to a rack that has spare links.
rack_ec = *(racks_with_spare_links_.begin());
} else {
// Add a new rack.
rack_ec = unique_rack_id_;
unique_rack_id_++;
CHECK(InsertIfNotPresent(
&rack_to_machine_res_, rack_ec,
unordered_set<ResourceID_t, boost::hash<ResourceID_t>>()));
racks_with_spare_links_.insert(rack_ec);
}
auto machines_in_rack = FindOrNull(rack_to_machine_res_, rack_ec);
CHECK_NOTNULL(machines_in_rack);
machines_in_rack->insert(machine_res_id);
// Erase the rack from the spare_links set if the rack is now full.
if (machines_in_rack->size() == FLAGS_machines_per_rack) {
racks_with_spare_links_.erase(rack_ec);
}
CHECK(InsertIfNotPresent(&machine_to_rack_ec_, machine_res_id, rack_ec));
return rack_ec;
}

bool SimulatedDFS::RemoveMachine(ResourceID_t machine_res_id) {
bool rack_removed = false;
EquivClass_t rack_ec = GetRackForMachine(machine_res_id);
auto machines_in_rack = FindOrNull(rack_to_machine_res_, rack_ec);
CHECK_NOTNULL(machines_in_rack);
ResourceID_t res_id_tmp = machine_res_id;
machines_in_rack->erase(res_id_tmp);
if (machines_in_rack->size() == 0) {
// The rack doesn't have any machines left. Delete it!
// We have to delete empty racks because we're using the number
// of racks to efficiently find if there's a rack on which a task has no
// data.
rack_to_machine_res_.erase(rack_ec);
rack_removed = true;
} else {
racks_with_spare_links_.insert(rack_ec);
rack_removed = false;
}
machine_to_rack_ec_.erase(machine_res_id);
return rack_removed;
}

} // namespace sim
} // namespace firmament
90 changes: 90 additions & 0 deletions src/sim/dfs/simulated_dfs.h
@@ -0,0 +1,90 @@
// The Firmament project
// Copyright (c) 2016 Ionel Gog <ionel.gog@cl.cam.ac.uk>

#ifndef FIRMAMENT_SIM_DFS_SIMULATED_DFS_H
#define FIRMAMENT_SIM_DFS_SIMULATED_DFS_H

#include "base/common.h"
#include "base/types.h"
#include "misc/map-util.h"
#include "scheduling/data_layer_manager_interface.h"

namespace firmament {
namespace sim {

class SimulatedDFS {
public:
SimulatedDFS();
virtual ~SimulatedDFS() {};
/**
* Add num_blocks for a new task.
* @param td the descriptor of the new task
* @param num_blocks the number of blocks to add
* @param max_machine_spread the maximum number of machines over which
* the task's inputs should be spread.
*/
virtual void AddBlocksForTask(const TaskDescriptor& td,
uint64_t num_blocks,
uint64_t max_machine_spread) = 0;

/**
* Add a new machine to the DFS.
* @param machine_res_id the resource id of the new machine
* @return the id of the rack in which the machine is located
*/
virtual EquivClass_t AddMachine(ResourceID_t machine_res_id);
virtual void GetFileLocations(const string& file_path,
list<DataLocation>* locations) = 0;
/**
* Remove all the blocks of a task.
* @param task_id the id of the task for which to remove the blocks
*/
virtual void RemoveBlocksForTask(TaskID_t task_id) = 0;

/**
* Remove a machine from the DFS. This method also removes all the blocks from
* the machine and makes sure they're again replicated.
* @param machine_res_id the resource id of the machine to be removed
* @return true if the machine's rack no longer contains machines
*/
virtual bool RemoveMachine(ResourceID_t machine_res_id);

inline const unordered_set<ResourceID_t, boost::hash<ResourceID_t>>&
GetMachinesInRack(EquivClass_t rack_ec) {
auto machines_in_rack = FindOrNull(rack_to_machine_res_, rack_ec);
CHECK_NOTNULL(machines_in_rack);
return *machines_in_rack;
}
inline uint64_t GetNumRacks() {
return rack_to_machine_res_.size();
}
inline void GetRackIDs(vector<EquivClass_t>* rack_ids) {
for (auto& rack_to_machines : rack_to_machine_res_) {
rack_ids->push_back(rack_to_machines.first);
}
}
inline EquivClass_t GetRackForMachine(ResourceID_t machine_res_id) {
EquivClass_t* rack_ec =
FindOrNull(machine_to_rack_ec_, machine_res_id);
CHECK_NOTNULL(rack_ec);
return *rack_ec;
}

private:
// Set storing the racks to which we can still connect machines.
unordered_set<EquivClass_t> racks_with_spare_links_;
// Map storing the machine resource ids associated with each rack.
unordered_map<EquivClass_t,
unordered_set<ResourceID_t, boost::hash<ResourceID_t>>>
rack_to_machine_res_;
// Map storing the rack EC associated with each machine.
unordered_map<ResourceID_t, EquivClass_t, boost::hash<ResourceID_t>>
machine_to_rack_ec_;
// Counter used to generate unique rack ids.
EquivClass_t unique_rack_id_;
};

} // namespace sim
} // namespace firmament

#endif // FIRMAMENT_SIM_DFS_SIMULATED_DFS_H

0 comments on commit e39ef2a

Please sign in to comment.