Skip to content

Commit

Permalink
Merge pull request #68 from ucbrise/column_names
Browse files Browse the repository at this point in the history
Column names
  • Loading branch information
mwhittaker committed Jun 5, 2017
2 parents 0c01692 + 022c9e7 commit a3bf51f
Show file tree
Hide file tree
Showing 38 changed files with 386 additions and 254 deletions.
1 change: 1 addition & 0 deletions scripts/reset_database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ CREATE TABLE Collections (
node_id bigint NOT NULL,
collection_name text NOT NULL,
collection_type text NOT NULL,
column_names text[] NOT NULL,
PRIMARY KEY (node_id, collection_name)
);

Expand Down
12 changes: 0 additions & 12 deletions src/common/string_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,6 @@

namespace fluent {

std::string Join(const std::vector<std::string>& ss) {
std::string s = "";
if (ss.size() == 0) {
return s;
}
for (std::size_t i = 0; i < ss.size() - 1; ++i) {
s += ss[i] + ", ";
}
s += ss[ss.size() - 1];
return s;
}

std::string CrunchWhitespace(std::string s) {
// Replace newlines with spaces.
std::replace(s.begin(), s.end(), '\n', ' ');
Expand Down
29 changes: 28 additions & 1 deletion src/common/string_util.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
#ifndef COMMON_STRING_UTIL_H_
#define COMMON_STRING_UTIL_H_

#include <cstddef>

#include <array>
#include <string>
#include <vector>

namespace fluent {
namespace detail {

template <typename Iterator>
std::string JoinIterators(Iterator begin, Iterator end) {
std::string s = "";
while (begin != end) {
s += *begin;
begin++;
if (begin != end) {
s += ", ";
}
}
return s;
}

} // namespace detail

// Join() = ""
// Join(1) = "1"
// Join(1, "2") = "1, 2"
// Join(1, "2", '3') = "1, 2, 3"
// Join(std::vector<std::string>{"1", "2", "3"}) = "1, 2, 3"
// Join(std::array<std::string, 3>{"1", "2", "3"}) = "1, 2, 3"
inline std::string Join() { return ""; }

inline std::string Join(const std::string& x) { return x; }
Expand All @@ -30,7 +50,14 @@ std::string Join(const T& x, const Ts&... xs) {
return std::to_string(x) + ", " + Join(xs...);
}

std::string Join(const std::vector<std::string>& ss);
inline std::string Join(const std::vector<std::string>& ss) {
return detail::JoinIterators(ss.cbegin(), ss.cend());
}

template <std::size_t N>
std::string Join(const std::array<std::string, N>& ss) {
return detail::JoinIterators(ss.cbegin(), ss.cend());
}

// CrunchWhitespace converts newlines to spaces and then destutters spaces.
//
Expand Down
6 changes: 6 additions & 0 deletions src/common/string_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ TEST(StringUtil, Join) {
EXPECT_EQ(Join("a"s, "bc"s), "a, bc"s);
EXPECT_EQ(Join("a"s, "bc"s, "def"s), "a, bc, def"s);

EXPECT_EQ(Join(std::vector<std::string>{}), ""s);
EXPECT_EQ(Join(std::vector<std::string>{"a"}), "a"s);
EXPECT_EQ(Join(std::vector<std::string>{"a", "b"}), "a, b"s);
EXPECT_EQ(Join(std::vector<std::string>{"a", "b", "c"}), "a, b, c"s);

EXPECT_EQ(Join(std::array<std::string, 0>{}), ""s);
EXPECT_EQ(Join(std::array<std::string, 1>{{"a"}}), "a"s);
EXPECT_EQ(Join(std::array<std::string, 2>{{"a", "b"}}), "a, b"s);
EXPECT_EQ(Join(std::array<std::string, 3>{{"a", "b", "c"}}), "a, b, c"s);
}

TEST(StringUtil, CrunchWhitespace) {
Expand Down
24 changes: 18 additions & 6 deletions src/common/type_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,32 @@ template <typename... Ts>
struct TypeListLen<TypeList<Ts...>>
: public std::integral_constant<std::size_t, sizeof...(Ts)> {};

// All
template <typename TypeList, template <typename> class F>
struct TypeListAll;

template <template <typename> class F>
struct TypeListAll<TypeList<>, F> : public std::true_type {};

template <typename T, typename... Ts, template <typename> class F>
struct TypeListAll<TypeList<T, Ts...>, F> {
static constexpr bool value =
F<T>::value && TypeListAll<TypeList<Ts...>, F>::value;
};

// AllSame
template <typename TypeList>
struct TypeListAllSame;

template <>
struct TypeListAllSame<TypeList<>> : public std::true_type {};

template <typename T>
struct TypeListAllSame<TypeList<T>> : public std::true_type {};
template <typename T, typename... Ts>
struct TypeListAllSame<TypeList<T, Ts...>> {
template <typename U>
using is_t = std::is_same<T, U>;

template <typename T, typename U, typename... Us>
struct TypeListAllSame<TypeList<T, U, Us...>> {
static constexpr bool value =
std::is_same<T, U>::value && TypeListAllSame<TypeList<U, Us...>>::value;
static constexpr bool value = TypeListAll<TypeList<Ts...>, is_t>::value;
};

// TypeListTo<Template, TypeList<Ts...>> = Template<Ts...>
Expand Down
9 changes: 9 additions & 0 deletions src/common/type_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ TEST(TypeList, TypeListLen) {
static_assert(TypeListLen<four>::value == 4, "");
}

TEST(TypeList, TypeListAll) {
static_assert(TypeListAll<TypeList<>, std::is_void>::value, "");
static_assert(TypeListAll<TypeList<void>, std::is_void>::value, "");
static_assert(TypeListAll<TypeList<void, void>, std::is_void>::value, "");
static_assert(!TypeListAll<TypeList<int>, std::is_void>::value, "");
static_assert(!TypeListAll<TypeList<void, int>, std::is_void>::value, "");
static_assert(!TypeListAll<TypeList<int, void>, std::is_void>::value, "");
}

TEST(TypeList, TypeListAllSame) {
using a = TypeList<>;
using b = TypeList<int>;
Expand Down
1 change: 1 addition & 0 deletions src/common/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define COMMON_TYPE_TRAITS_H_

#include <set>
#include <string>
#include <type_traits>
#include <vector>

Expand Down
22 changes: 8 additions & 14 deletions src/examples/chat/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@

namespace ra = fluent::ra;

using address_t = std::string;
using server_address_t = std::string;
using client_address_t = std::string;
using nickname_t = std::string;
using message_t = std::string;

struct ClientArgs {
std::string server_address;
std::string client_address;
Expand All @@ -29,21 +23,21 @@ template <template <template <typename> class, template <typename> class>
class LineageDbClient>
int ClientMain(const ClientArgs& args,
const fluent::lineagedb::ConnectionConfig& connection_config) {
zmq::context_t context(1);

std::vector<std::tuple<server_address_t, client_address_t, nickname_t>>
connect_tuple = {std::make_tuple(args.server_address, args.client_address,
args.nickname)};
using connect_tuple_t = std::tuple<std::string, std::string, std::string>;
std::vector<connect_tuple_t> connect_tuple = {
std::make_tuple(args.server_address, args.client_address, args.nickname)};

zmq::context_t context(1);
auto f =
fluent::fluent<LineageDbClient>("chat_client_" + args.nickname,
args.client_address, &context,
connection_config)
.stdin()
.stdout()
.template channel<server_address_t, client_address_t, nickname_t>(
"connect")
.template channel<address_t, message_t>("mcast")
.template channel<std::string, std::string, std::string>(
"connect", {{"server_addr", "client_addr", "nickname"}})
.template channel<std::string, std::string>("mcast",
{{"addr", "msg"}})
.RegisterBootstrapRules([&](auto&, auto&, auto& connect, auto&) {
using namespace fluent::infix;
return std::make_tuple(
Expand Down
22 changes: 8 additions & 14 deletions src/examples/chat/ping_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@
namespace lineagedb = fluent::lineagedb;
namespace ra = fluent::ra;

using address_t = std::string;
using server_address_t = std::string;
using client_address_t = std::string;
using nickname_t = std::string;
using message_t = std::string;

struct PingClientArgs {
std::string server_address;
std::string client_address;
Expand All @@ -32,18 +26,18 @@ template <template <template <typename> class, template <typename> class>
class LineageDbClient>
int PingClientMain(const PingClientArgs& args,
const lineagedb::ConnectionConfig& connection_config) {
zmq::context_t context(1);

std::vector<std::tuple<server_address_t, client_address_t, nickname_t>>
connect_tuple = {std::make_tuple(args.server_address, args.client_address,
args.nickname)};
using connect_tuple_t = std::tuple<std::string, std::string, std::string>;
std::vector<connect_tuple_t> connect_tuple = {
std::make_tuple(args.server_address, args.client_address, args.nickname)};

zmq::context_t context(1);
auto f =
fluent::fluent<LineageDbClient>("chat_ping_client", args.client_address,
&context, connection_config)
.template channel<server_address_t, client_address_t, nickname_t>(
"connect")
.template channel<address_t, message_t>("mcast")
.template channel<std::string, std::string, std::string>(
"connect", {{"server_addr", "client_addr", "nickname"}})
.template channel<std::string, std::string>("mcast",
{{"addr", "msg"}})
.periodic("p", std::chrono::milliseconds(1000))
.RegisterBootstrapRules([&](auto& connect, auto&, auto&) {
using namespace fluent::infix;
Expand Down
52 changes: 24 additions & 28 deletions src/examples/chat/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@

namespace ra = fluent::ra;

using address_t = std::string;
using server_address_t = std::string;
using client_address_t = std::string;
using nickname_t = std::string;
using message_t = std::string;

struct ServerArgs {
std::string server_address;
};
Expand All @@ -28,28 +22,30 @@ template <template <template <typename> class, template <typename> class>
int ServerMain(const ServerArgs& args,
const fluent::lineagedb::ConnectionConfig& connection_config) {
zmq::context_t context(1);
auto f =
fluent::fluent<LineageDbClient>("chat_server", args.server_address,
&context, connection_config)
.template channel<server_address_t, client_address_t, nickname_t>(
"connect")
.template channel<address_t, message_t>("mcast")
.template table<client_address_t, nickname_t>("nodelist")
.RegisterRules([&](auto& connect, auto& mcast, auto& nodelist) {
using namespace fluent::infix;

auto subscribe =
nodelist <= (connect.Iterable() | ra::project<1, 2>());

// TODO(mwhittaker): Currently, nicknames are not being used. We
// should prepend the nickaname of the sender to the message before
// broadcasting it.
auto multicast = mcast <= (ra::make_cross(mcast.Iterable(),
nodelist.Iterable()) |
ra::project<2, 1>());

return std::make_tuple(subscribe, multicast);
});
auto f = fluent::fluent<LineageDbClient>("chat_server", args.server_address,
&context, connection_config)
.template channel<std::string, std::string, std::string>(
"connect", {{"server_addr", "client_addr", "nickname"}})
.template channel<std::string, std::string>("mcast",
{{"addr", "msg"}})
.template table<std::string, std::string>(
"nodelist", {{"client_addr", "nickname"}})
.RegisterRules([&](auto& connect, auto& mcast, auto& nodelist) {
using namespace fluent::infix;

auto subscribe =
nodelist <= (connect.Iterable() | ra::project<1, 2>());

// TODO(mwhittaker): Currently, nicknames are not being used.
// We should prepend the nickaname of the sender to the
// message before broadcasting it.
auto multicast =
mcast <=
(ra::make_cross(mcast.Iterable(), nodelist.Iterable()) |
ra::project<2, 1>());

return std::make_tuple(subscribe, multicast);
});

f.Run();
return 0;
Expand Down
24 changes: 12 additions & 12 deletions src/fluent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ zmq::context_t context(1);
ConnectionConfig conn;
std::set<std::tuple<int, char, float>> ts = {{42, 'a', 9001.}};
auto f = fluent<NoopClient>("name", "tcp://0.0.0.0:8000", &context, conn)
.table<int, char, float>("t1")
.table<float, int>("t2")
.scratch<int, int, float>("s")
.channel<std::string, float, char>("c")
.table<int, char, float>("t1", {{"x", "y", "z"}})
.table<float, int>("t2" {{"x", "y"}})
.scratch<int, int, float>("s", {{"x", "y"}})
.channel<std::string, float, char>("c", {{"addr", "x", "y"}})
.RegisterBootstrapRules([&](auto& t1, auto& t2, auto& s, auto& c) {
return std::make_tuple(t1 <= ra::make_iterable("ts", &ts));
})
Expand All @@ -38,18 +38,18 @@ that you can ignore for now. Next, we declare the collections our program will
use:

```c++
.table<int, char, float>("t1")
.table<float, int>("t2")
.scratch<int, int, float>("s")
.channel<std::string, float, char>("c")
.table<int, char, float>("t1", {{"x", "y", "z"}})
.table<float, int>("t2" {{"x", "y"}})
.scratch<int, int, float>("s", {{"x", "y"}})
.channel<std::string, float, char>("c", {{"addr", "x", "y"}})
```
This code declares that our fluent program will use
1. a 3-column table named `t1` with types `int`, `char`, and `float`;
2. a 2-column table named `t2` with types `float` and `int`;
3. a 3-column scratch named `s` with types `int`, `int`, and `float`; and
4. a 3-column channel named `c` with types `string`, `float`, and `char`.
1. a 3-column table `t1(x:int, y:char, z:float)`;
2. a 2-column table `t2(x:float, y:int)`;
3. a 3-column scratch `s(x:int, y:int, z:float)`; and
4. a 3-column channel `c(addr:string, x:float, y:char)`;
Next, we register a single bootstrap rule.
Expand Down
19 changes: 17 additions & 2 deletions src/fluent/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cstddef>

#include <algorithm>
#include <array>
#include <set>
#include <type_traits>
#include <utility>
Expand All @@ -12,6 +13,7 @@
#include "gtest/gtest.h"
#include "range/v3/all.hpp"

#include "common/macros.h"
#include "common/type_traits.h"
#include "fluent/rule_tags.h"
#include "fluent/serialization.h"
Expand Down Expand Up @@ -58,11 +60,23 @@ class Channel {
"ZeroMQ address (e.g. tcp://localhost:9999).");

public:
Channel(std::size_t id, std::string name, SocketCache* socket_cache)
: id_(id), name_(std::move(name)), socket_cache_(socket_cache) {}
Channel(std::size_t id, std::string name,
std::array<std::string, 1 + sizeof...(Ts)> column_names,
SocketCache* socket_cache)
: id_(id),
name_(std::move(name)),
column_names_(std::move(column_names)),
socket_cache_(socket_cache) {}
Channel(Channel&&) = default;
Channel& operator=(Channel&&) = default;
DISALLOW_COPY_AND_ASSIGN(Channel);

const std::string& Name() const { return name_; }

const std::array<std::string, 1 + sizeof...(Ts)>& ColumnNames() const {
return column_names_;
}

const std::set<std::tuple<T, Ts...>>& Get() const { return ts_; }

ra::Iterable<std::set<std::tuple<T, Ts...>>> Iterable() const {
Expand Down Expand Up @@ -116,6 +130,7 @@ class Channel {
private:
const std::size_t id_;
const std::string name_;
const std::array<std::string, 1 + sizeof...(Ts)> column_names_;
std::set<std::tuple<T, Ts...>> ts_;

// Whenever a tuple with address `a` is added to a Channel, the socket
Expand Down
Loading

0 comments on commit a3bf51f

Please sign in to comment.