Skip to content

Commit

Permalink
Fixes for SCAMPserver
Browse files Browse the repository at this point in the history
  • Loading branch information
zpzim committed Jul 22, 2019
1 parent 10dd3f9 commit 9f44602
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 34 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -166,6 +166,7 @@ jobs:
- make -j4
- ./kubernetes/SCAMPserver &
- ./kubernetes/SCAMPclient &
- sleep 10
- cd ../test
- ./run_tests.sh ../build/kubernetes/SCAMP_distributed

Expand Down
48 changes: 14 additions & 34 deletions kubernetes/scamp_server.cc
Expand Up @@ -38,7 +38,7 @@ constexpr uint64_t DISTRIBUTED_TILE_SIZE = 500000;
constexpr uint64_t MAX_TILE_RETRIES = 3;
constexpr uint64_t GIGAFLOP = 1000000000;
constexpr uint64_t MIN_THROUGHPUT = 3 * GIGAFLOP;
constexpr uint64_t SLEEP_TILE_TIME = 100;
constexpr uint64_t TIMEOUT_CHECK_FREQUENCY = 20;
constexpr uint64_t TILE_TIMEOUT_SECONDS =
DISTRIBUTED_TILE_SIZE * DISTRIBUTED_TILE_SIZE / MIN_THROUGHPUT;

Expand Down Expand Up @@ -392,10 +392,10 @@ class Job {
SCAMPArgs job_args;
};

// chad
void check_time_out() {
while (true) {
usleep(SLEEP_TILE_TIME * 1000000);
std::this_thread::sleep_for(std::chrono::seconds(TIMEOUT_CHECK_FREQUENCY));
std::lock_guard<std::mutex> lockGuard(jobVecLock);
for (int i = 0; i < jobVec.size(); i++) {
if (!jobVec[i].job_done()) {
jobVec[i].check_time_tile();
Expand All @@ -404,6 +404,8 @@ void check_time_out() {
}
}

/*
// Creates a dummy test job
void createTestJob() {
int window = 100;
Expand Down Expand Up @@ -446,6 +448,7 @@ void createTestJob() {
filestream << args.DebugString();
filestream.close();
}
*/

template <typename T>
void elementwise_sum(T *mp_full, uint64_t merge_start, uint64_t tile_sz,
Expand Down Expand Up @@ -479,10 +482,7 @@ void elementwise_max(T *mp_full, uint64_t merge_start, uint64_t tile_sz,
}
}

// TODO(zpzim): move this back into SCAMP_Operation, we shouldn't have the
// merging be functionality of the individual tile
// Merges a local result "tile_profile" with the global matrix profile
// "full_profile"

void MergeTileIntoFullProfile(Profile *tile_profile, uint64_t position,
uint64_t length, Profile *full_profile,
uint64_t index_start) {
Expand Down Expand Up @@ -559,29 +559,6 @@ void MergeProfile(const SCAMPArgs &tile_args, SCAMPArgs *job_args,

// Logic and data behind the server's behavior.
class SCAMPServiceImpl final : public SCAMPService::Service {
public:
int counter = 0;
int arrpos = 0;
int combine = 0;
int idcnt = 0;

int reload = 1;
int generate = 100;

std::vector<std::vector<int>> vec1;

static const int globarrsize = 1000;
double globarr[globarrsize];

SCAMPServiceImpl() {
counter = 0;
arrpos = 0;
idcnt = 0;
for (int i = 0; i < globarrsize; i++) {
globarr[i] = i;
}
}

public:
private:
// Takes a SCAMPArgs proto and tries to create a SCAMP job and add it to the
Expand Down Expand Up @@ -689,24 +666,30 @@ class SCAMPServiceImpl final : public SCAMPService::Service {
};

void RunServer() {
std::string server_address("0.0.0.0:30078");
std::string server_address("localhost:30078");

SCAMPServiceImpl service;

ServerBuilder builder;

// Listen on the given address without any authentication mechanism.
std::cout << "Add Listening Port" << std::endl;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());

// Do not limit input size
builder.SetMaxReceiveMessageSize(INT_MAX);

// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
std::cout << "Register Service" << std::endl;
builder.RegisterService(&service);

// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
if (server == nullptr) {
std::cout << "Error building server." << std::endl;
exit(1);
}
std::cout << "Server listening on " << server_address << std::endl;

// Wait for the server to shutdown. Note that some other thread must be
Expand All @@ -715,9 +698,6 @@ void RunServer() {
}

int main(int argc, char **argv) {
// TODO: move this into an asynchronous rpc which appends a new job to jobVec
createTestJob();

std::thread check_time_out();

RunServer();
Expand Down

0 comments on commit 9f44602

Please sign in to comment.