Skip to content

Commit

Permalink
Use new kake to generate the general Makefile.
Browse files Browse the repository at this point in the history
  • Loading branch information
kinuxroot committed Sep 14, 2016
1 parent d2dc5d4 commit 9e09c97
Show file tree
Hide file tree
Showing 196 changed files with 3,399 additions and 843 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@
*.a

*.swp

msvc/**/Debug
msvc/**/Release
*.opensdf
*.sdf
target/bin/linux/x64/Release/*
target/lib/linux/x64/Release/*
target/build/linux/x64/Release/*
22 changes: 0 additions & 22 deletions Kakefile

This file was deleted.

7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ It's very simple to build with Makefile, simply type the following command:

make

### Build Hurricane usin gmake
Enter target/build/linux/x64/Release and type:

make install

The nimbus and supervisor will be built into target/bin/linux/x64/Release. The demo project will submit a sample word count topololgy.

### Build Hurricane using Kake
#### Brief Introduction to Kake
Kake is a building system following the the "convention over configuration" paradigm
Expand Down
2 changes: 2 additions & 0 deletions conf/nimbus.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
nimbus.host = 127.0.0.1
nimbus.port = 6009
9 changes: 9 additions & 0 deletions conf/supervisor1.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
nimbus.host = 127.0.0.1
nimbus.port = 6009

supervisor.host = 127.0.0.1
supervisor.port = 7001

supervisor.name = s1
supervisor.spout.num = 2
supervisor.bolt.num = 3
9 changes: 9 additions & 0 deletions conf/supervisor2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
nimbus.host = 127.0.0.1
nimbus.port = 6009

supervisor.host = 127.0.0.1
supervisor.port = 7002

supervisor.name = s2
supervisor.spout.num = 2
supervisor.bolt.num = 3
9 changes: 9 additions & 0 deletions conf/supervisor3.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
nimbus.host = 127.0.0.1
nimbus.port = 6009

supervisor.host = 127.0.0.1
supervisor.port = 7003

supervisor.name = s3
supervisor.spout.num = 2
supervisor.bolt.num = 3
File renamed without changes.
181 changes: 181 additions & 0 deletions include/hurricane/base/ByteArray.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/**
* licensed to the apache software foundation (asf) under one
* or more contributor license agreements. see the notice file
* distributed with this work for additional information
* regarding copyright ownership. the asf licenses this file
* to you under the apache license, version 2.0 (the
* "license"); you may not use this file except in compliance
* with the license. you may obtain a copy of the license at
*
* http://www.apache.org/licenses/license-2.0
*
* unless required by applicable law or agreed to in writing, software
* distributed under the license is distributed on an "as is" basis,
* without warranties or conditions of any kind, either express or implied.
* see the license for the specific language governing permissions and
* limitations under the license.
*/

#pragma once

#include <cstring>
#include <vector>
#include <cstdint>

namespace hurricane {
namespace base {
class ByteArray : public std::vector<char> {
public:
ByteArray() = default;

ByteArray(int32_t size) :
std::vector<char>(size) {
}

ByteArray(const char* buffer, int32_t size) :
std::vector<char>(buffer, buffer + size) {
}

std::string toStdString() const {
std::string result(this->cbegin(), this->cend());

return result;
}

ByteArray& concat(const ByteArray& buffer2) {
size_t oldSize = size();
size_t newSize = oldSize + buffer2.size();
resize(newSize);
memcpy(this->data() + oldSize, buffer2.data(), buffer2.size());

return *this;
}

ByteArray operator+(const ByteArray& buffer2) const {
ByteArray buffer1(this->size() + buffer2.size());
memcpy(buffer1.data(), this->data(), this->size());
memcpy(buffer1.data() + this->size(), buffer2.data(), buffer2.size());

return buffer1;
}
};

class IODevice {
public:
enum class SeekMode {
Set,
Forward,
Backward
};

~IODevice() {}
};

class ByteArrayReader : public IODevice {
public:
ByteArrayReader(const ByteArray& buffer) :
_buffer(buffer), _pos(0) {}

template <class T>
int32_t read(T* buffer, int32_t count) {
if ( _pos >= static_cast<int32_t>(_buffer.size()) ) {
return 0;
}

int32_t sizeToRead = sizeof(T) * count;
if ( _pos + sizeToRead > static_cast<int32_t>(_buffer.size()) ) {
sizeToRead = _buffer.size() - _pos;
}

memcpy(buffer, _pos + _buffer.data(), sizeToRead);
_pos += sizeToRead;

return sizeToRead;
}

template <class T>
T read() {
T t;
read(&t, 1);

return t;
}

ByteArray readData(int32_t size) {
if ( _pos >= static_cast<int32_t>(_buffer.size()) ) {
return 0;
}

int32_t sizeToRead = size;
if ( _pos + sizeToRead > static_cast<int32_t>(_buffer.size()) ) {
sizeToRead = _buffer.size() - _pos;
}

ByteArray result(sizeToRead);
memcpy(result.data(), _buffer.data() + _pos, sizeToRead);
_pos += sizeToRead;

return result;
}

int32_t tell() const {
return _pos;
}

void seek(SeekMode mode, int32_t size) {
int32_t dest = _pos;
if ( mode == SeekMode::Set ) {
dest = size;
}
else if ( mode == SeekMode::Forward ) {
dest += size;
}
else if ( mode == SeekMode::Backward ) {
dest -= size;
}
}
private:
ByteArray _buffer;
int32_t _pos;
};

class ByteArrayWriter {
public:
ByteArrayWriter() {
}

template <class T>
int32_t write(const T* buffer, int32_t count) {
int32_t sizeToWrite = sizeof(T) * count;
ByteArray buffer2((const char*)(buffer), sizeToWrite);
_buffer.concat(buffer2);

return sizeToWrite;
}

template <class T>
int32_t write(const T& value) {
return write(&value, 1);
}

int32_t write(const ByteArray& buffer) {
_buffer.concat(buffer);

return buffer.size();
}

const ByteArray& ToByteArray() const {
return _buffer;
}


int32_t tell() const {
return _buffer.size();
}

private:
ByteArray _buffer;
};

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,49 +32,59 @@ namespace hurricane {
namespace base {
class Writable {
public:
virtual std::string GetType() const = 0;
virtual int32_t Read(ByteArrayReader& reader, Variant& variant) = 0;
virtual int32_t Write(ByteArrayWriter& writer, const Variant& variant) = 0;
};

class IntWritable : public Writable {
public:
std::string GetType() const {
return "int";
}

int32_t Read(ByteArrayReader& reader, Variant& variant) override {
int32_t intValue = reader.Read<int32_t>();
int32_t intValue = reader.read<int32_t>();
variant.SetIntValue(intValue);

return sizeof(int32_t);
}

int32_t Write(ByteArrayWriter& writer, const Variant& variant) override {
int value = variant.GetIntValue();
writer.Write(value);
writer.write(value);

return sizeof(int32_t);
}
};

class StringWritable : public Writable {
public:
public:
std::string GetType() const {
return "string";
}

int32_t Read(ByteArrayReader& reader, Variant& variant) override {
int32_t size = reader.Read<int32_t>();
int32_t size = reader.read<int32_t>();

ByteArray bytes = reader.ReadData(size);
ByteArray bytes = reader.readData(size);

variant.SetStringValue(bytes.ToStdString());
variant.SetStringValue(bytes.toStdString());

return sizeof(int32_t) + bytes.size();
}

int32_t Write(ByteArrayWriter& writer, const Variant& variant) override {
std::string value = variant.GetStringValue();

writer.Write(int32_t(value.size()));
writer.Write(value.c_str(), value.size());
writer.write(int32_t(value.size()));
writer.write(value.c_str(), value.size());
return sizeof(int32_t) + value.size();
}
};

extern std::map<int8_t, std::shared_ptr<Writable>> Writables;
//extern std::map<int8_t, std::shared_ptr<Writable>> Writables;
std::map<int8_t, std::shared_ptr<Writable>>& GetWritables();

class DataPackage {
public:
Expand Down Expand Up @@ -114,20 +124,20 @@ namespace hurricane {
ByteArray SerializeHead(int32_t bodySize) {
ByteArrayWriter headWriter;
_length = sizeof(int32_t) + sizeof(_version) + bodySize;
headWriter.Write(_length);
headWriter.Write(_version);
headWriter.write(_length);
headWriter.write(_version);
ByteArray head = headWriter.ToByteArray();

return head;
}

void DeserializeHead(ByteArrayReader& reader) {
_length = reader.Read<int32_t>();
_version = reader.Read<int8_t>();
_length = reader.read<int32_t>();
_version = reader.read<int8_t>();
}

void DeserializeBody(ByteArrayReader& reader) {
while ( reader.Tell() < _length ) {
while ( reader.tell() < _length ) {
Variant variant = DeserializeVariant(reader);
_variants.push_back(variant);
}
Expand All @@ -136,12 +146,13 @@ namespace hurricane {
Variant DeserializeVariant(ByteArrayReader& reader) {
Variant variant;

if ( reader.Tell() >= _length ) {
if ( reader.tell() >= _length ) {
return variant;
}

int8_t typeCode = reader.Read<int8_t>();
std::shared_ptr<Writable> writable = Writables[typeCode];
int8_t typeCode = reader.read<int8_t>();
std::map<int8_t, std::shared_ptr<Writable>>& writables = GetWritables();
std::shared_ptr<Writable> writable = writables[typeCode];
writable->Read(reader, variant);

return variant;
Expand All @@ -150,9 +161,10 @@ namespace hurricane {
void SerializeVariant(ByteArrayWriter& writer, const Variant& variant) {
Variant::Type type = variant.GetType();
int8_t typeCode = Variant::TypeCodes[type];
std::shared_ptr<Writable> writable = Writables[typeCode];
std::map<int8_t, std::shared_ptr<Writable>>& writables = GetWritables();
std::shared_ptr<Writable> writable = writables[typeCode];

writer.Write<int8_t>(typeCode);
writer.write<int8_t>(typeCode);
writable->Write(writer, variant);
}

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 9e09c97

Please sign in to comment.