Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality so Starcoder can send back arbitrary PMTs, not just blobs #78

Merged
merged 31 commits into from
Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
93c33b5
rename convert_pmt_proto to convert_proto_to_pmt
Aug 29, 2018
68e50c0
init commit of pmt_to_proto
Aug 29, 2018
3b353b5
finish pmt_to_proto?
Aug 29, 2018
c7b6653
clang
Aug 29, 2018
044f5ad
update proto
Aug 29, 2018
c873e2d
convert in enqueue_message_sink
Aug 29, 2018
e62c48d
update starcoder to parse the protobufs
Aug 30, 2018
dc1308a
modify waterfall plotter to pass up protobuf
Aug 30, 2018
2efc6e4
update noaa
Aug 30, 2018
359b1c1
introduce blob_value
Aug 30, 2018
f8c4b65
add blob to proto_to_pmt
Aug 30, 2018
2c33d2e
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Aug 30, 2018
a610968
test for proto_to_pmt
Aug 30, 2018
dc5fc29
use pmt in meteor decoder
Aug 30, 2018
044768e
experimental drop payload
Aug 30, 2018
e4fbdbc
fix closing
Aug 30, 2018
bf3e2ee
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Aug 31, 2018
c6e7ab7
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Sep 7, 2018
ad27a4d
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Sep 10, 2018
db77770
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Sep 10, 2018
df2b028
check if queue is closed() to exit goroutine
Sep 10, 2018
5686c67
revert
Sep 10, 2018
1215f26
factor out function
Sep 10, 2018
8c1522d
dont expose in header file
Sep 10, 2018
26a5a50
use back inserter
Sep 10, 2018
55f1eea
use repeatedptrfieldbackinsertiterator
Sep 10, 2018
d9d22b9
clang
Sep 10, 2018
3e95525
use std::copy
Sep 11, 2018
c2b222e
rename request to respose
Sep 11, 2018
d0df957
change signature of pmt_to_proto
Sep 11, 2018
a9b4413
changes to test.grc
Sep 11, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/starcoder.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ message BlockMessage {
UniformVector uniform_vector_value = 9;

Dict dict_value = 10;

bytes blob_value = 11;
}
}

Expand Down Expand Up @@ -203,7 +205,10 @@ message RunFlowgraphResponse {
string block_id = 1;

// Serialized arbitrary PMT (GNURadio Polymorphic Message Type)
bytes payload = 2;
bytes payload = 2 [deprecated=true];

// PMT (GNURadio Polymorphic Message Type) response sent from the block
BlockMessage pmt = 3;
}

// Complex number
Expand Down
4 changes: 4 additions & 0 deletions cqueue/c_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (q *CStringQueue) Close() {
q.queue.Close()
}

func (q *CStringQueue) Closed() bool {
return q.queue.Closed()
}

func (q *CStringQueue) Push(str string) {
q.queue.Push(str)
}
Expand Down
5 changes: 5 additions & 0 deletions cqueue/string_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ void string_queue::close() {
condition_var_.notify_one();
}

bool string_queue::closed() {
std::unique_lock<std::mutex> lock(mutex_);
return closed_;
}

uint64_t string_queue::get_ptr() const {
return reinterpret_cast<uint64_t>(this);
}
Expand Down
1 change: 1 addition & 0 deletions cqueue/string_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class string_queue {
std::string blocking_pop();
unsigned long get_ptr() const;
void close();
bool closed();
reiinakano marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Make this uint64_t
static string_queue *queue_from_pointer(unsigned long long ptr);
private:
Expand Down
2 changes: 1 addition & 1 deletion flowgraphs/test.grc
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@
</param>
<param>
<key>msg</key>
<value>pmt.make_u8vector(6, ord('b'))</value>
<value>pmt.make_c64vector(2, -7+3.2j)</value>
</param>
<param>
<key>minoutbuf</key>
Expand Down
1 change: 1 addition & 0 deletions gr-starcoder/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ list(APPEND starcoder_sources
${CMAKE_CURRENT_SOURCE_DIR}/../../cqueue/string_queue.cc
${hw_proto_srcs}
proto_to_pmt.cc
pmt_to_proto.cc
init.c
driver.c
firmware.c
Expand Down
2 changes: 1 addition & 1 deletion gr-starcoder/lib/command_source_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void command_source_impl::readloop() {
GR_LOG_WARN(d_logger, "Failed to deserialize gRPC");
continue;
}
pmt::pmt_t m = convert_pmt_proto(grpc_pmt);
pmt::pmt_t m = convert_proto_to_pmt(grpc_pmt);
message_port_pub(port_, m);
}
}
Expand Down
5 changes: 4 additions & 1 deletion gr-starcoder/lib/enqueue_message_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <gnuradio/io_signature.h>
#include "enqueue_message_sink_impl.h"

#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {

Expand All @@ -51,7 +53,8 @@ enqueue_message_sink_impl::~enqueue_message_sink_impl() {}

void enqueue_message_sink_impl::handler(pmt::pmt_t msg) {
if (string_queue_ != NULL) {
std::string serialized = pmt::serialize_str(msg);
std::string serialized = convert_pmt_to_proto(msg).SerializeAsString();

if (serialized.length() > 10485760) {
GR_LOG_ERROR(d_logger,
boost::format("Received large packet of length %d in "
Expand Down
26 changes: 21 additions & 5 deletions gr-starcoder/lib/meteor_decoder_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "meteor/meteor_decoder.h"
#include "meteor/meteor_packet.h"

#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {

Expand Down Expand Up @@ -94,7 +96,8 @@ bool meteor_decoder_sink_impl::stop() {
int ok = 0;
while (decoder.pos() < total_size_ - meteor::SOFT_FRAME_LEN) {
total++;
bool res = decoder.decode_one_frame(raw.get(), total_size_, error_corrected_data.get());
bool res = decoder.decode_one_frame(raw.get(), total_size_,
error_corrected_data.get());
if (res) {
ok++;
std::cout << std::dec << 100. * decoder.pos() / total_size_ << "% "
Expand All @@ -114,10 +117,23 @@ bool meteor_decoder_sink_impl::stop() {
std::string png_b = packeter.dump_gray_image(meteor::BLUE_APID);

if (string_queue_ != NULL) {
if (!png_img.empty()) string_queue_->push(png_img);
if (!png_r.empty()) string_queue_->push(png_r);
if (!png_g.empty()) string_queue_->push(png_g);
if (!png_b.empty()) string_queue_->push(png_b);
::starcoder::BlockMessage grpc_pmt;
if (!png_img.empty()) {
grpc_pmt.set_blob_value(png_img);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_r.empty()) {
grpc_pmt.set_blob_value(png_r);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_g.empty()) {
grpc_pmt.set_blob_value(png_g);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_b.empty()) {
grpc_pmt.set_blob_value(png_b);
string_queue_->push(grpc_pmt.SerializeAsString());
}
}

if (!filename_.empty()) {
Expand Down
10 changes: 8 additions & 2 deletions gr-starcoder/lib/noaa_apt_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <cmath>

#include "gil_util.h"
#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {
Expand Down Expand Up @@ -128,9 +129,14 @@ void noaa_apt_sink_impl::write_image(std::string filename) {
if (!d_flip)
image_string = (store_gray_to_png_string(image_received_view_));
else
image_string = store_gray_to_png_string(flipped_up_down_view(image_received_view_));
image_string =
store_gray_to_png_string(flipped_up_down_view(image_received_view_));

if (!image_string.empty()) string_queue_->push(image_string);
if (!image_string.empty()) {
::starcoder::BlockMessage grpc_pmt;
grpc_pmt.set_blob_value(image_string);
string_queue_->push(grpc_pmt.SerializeAsString());
}
}
}

Expand Down
201 changes: 201 additions & 0 deletions gr-starcoder/lib/pmt_to_proto.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/* -*- c++ -*- */
/*
* Copyright 2018 Infostellar, Inc.
*
* This is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3, or (at your option)
* any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this software; see the file COPYING. If not, write to
* the Free Software Foundation, Inc., 51 Franklin Street,
* Boston, MA 02110-1301, USA.
*/

#include "pmt_to_proto.h"

void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg,
starcoder::UniformVector *uni_vector) {
if (pmt::is_u8vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size8);
const std::vector<uint8_t> vector_elements =
pmt::u8vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<uint32_t>(
u_vector->mutable_value()));
} else if (pmt::is_s8vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size8);
const std::vector<int8_t> vector_elements = pmt::s8vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<int32_t>(
i_vector->mutable_value()));
} else if (pmt::is_u16vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size16);
const std::vector<uint16_t> vector_elements =
pmt::u16vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<uint32_t>(
u_vector->mutable_value()));
} else if (pmt::is_s16vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size16);
const std::vector<int16_t> vector_elements =
pmt::s16vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<int32_t>(
i_vector->mutable_value()));
} else if (pmt::is_u32vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size32);
const std::vector<uint32_t> vector_elements =
pmt::u32vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<uint32_t>(
u_vector->mutable_value()));
} else if (pmt::is_s32vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size32);
const std::vector<int32_t> vector_elements =
pmt::s32vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<int32_t>(
i_vector->mutable_value()));
} else if (pmt::is_u64vector(pmt_msg)) {
starcoder::U64Vector *u64_vector = uni_vector->mutable_u64_value();
const std::vector<uint64_t> vector_elements =
pmt::u64vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<uint64_t>(
u64_vector->mutable_value()));
} else if (pmt::is_s64vector(pmt_msg)) {
starcoder::I64Vector *i64_vector = uni_vector->mutable_i64_value();
const std::vector<int64_t> vector_elements =
pmt::s64vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<int64_t>(
i64_vector->mutable_value()));
} else if (pmt::is_f32vector(pmt_msg)) {
starcoder::F32Vector *f32_vector = uni_vector->mutable_f32_value();
const std::vector<float> vector_elements = pmt::f32vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<float>(
f32_vector->mutable_value()));
} else if (pmt::is_f64vector(pmt_msg)) {
starcoder::F64Vector *f64_vector = uni_vector->mutable_f64_value();
const std::vector<double> vector_elements =
pmt::f64vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<double>(
f64_vector->mutable_value()));
} else if (pmt::is_c32vector(pmt_msg)) {
starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value();
const std::vector<std::complex<float>> vector_elements =
pmt::c32vector_elements(pmt_msg);
std::transform(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedPtrFieldBackInsertIterator<
starcoder::Complex32>(c32_vector->mutable_value()),
[](std::complex<float> c)->starcoder::Complex32 {
starcoder::Complex32 new_val;
new_val.set_real_value(c.real());
new_val.set_imaginary_value(c.imag());
return new_val;
});
} else if (pmt::is_c64vector(pmt_msg)) {
starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value();
const std::vector<std::complex<double>> vector_elements =
pmt::c64vector_elements(pmt_msg);
std::transform(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedPtrFieldBackInsertIterator<
starcoder::Complex>(c64_vector->mutable_value()),
[](std::complex<double> c)->starcoder::Complex {
starcoder::Complex new_val;
new_val.set_real_value(c.real());
new_val.set_imaginary_value(c.imag());
return new_val;
});
}
}

starcoder::BlockMessage convert_pmt_to_proto(const pmt::pmt_t &pmt_msg) {
starcoder::BlockMessage proto_msg;
if (pmt::is_blob(pmt_msg)) {
proto_msg.set_blob_value(pmt::blob_data(pmt_msg),
pmt::blob_length(pmt_msg));
} else if (pmt::is_uniform_vector(pmt_msg)) {
convert_proto_uniform_vector(pmt_msg,
proto_msg.mutable_uniform_vector_value());
} else if (pmt::is_bool(pmt_msg)) {
proto_msg.set_boolean_value(pmt::to_bool(pmt_msg));
} else if (pmt::is_symbol(pmt_msg)) {
proto_msg.set_symbol_value(pmt::symbol_to_string(pmt_msg));
} else if (pmt::is_integer(pmt_msg)) {
proto_msg.set_integer_value(pmt::to_long(pmt_msg));
} else if (pmt::is_uint64(pmt_msg)) {
proto_msg.set_integer_value(pmt::to_uint64(pmt_msg));
} else if (pmt::is_real(pmt_msg)) {
proto_msg.set_double_value(pmt::to_double(pmt_msg));
} else if (pmt::is_complex(pmt_msg)) {
std::complex<double> val = pmt::to_complex(pmt_msg);
starcoder::Complex *complex = proto_msg.mutable_complex_value();
complex->set_real_value(val.real());
complex->set_imaginary_value(val.imag());
} else if (pmt::is_pair(pmt_msg)) {
starcoder::Pair *pair = proto_msg.mutable_pair_value();
starcoder::BlockMessage car = convert_pmt_to_proto(pmt::car(pmt_msg));
starcoder::BlockMessage cdr = convert_pmt_to_proto(pmt::cdr(pmt_msg));
pair->mutable_car()->Swap(&car);
pair->mutable_cdr()->Swap(&cdr);
} else if (pmt::is_tuple(pmt_msg)) {
starcoder::List *list = proto_msg.mutable_list_value();
list->set_type(starcoder::List::TUPLE);
for (int i = 0; i < pmt::length(pmt_msg); i++) {
starcoder::BlockMessage element =
convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i));
list->add_value()->Swap(&element);
}
} else if (pmt::is_vector(pmt_msg)) {
starcoder::List *list = proto_msg.mutable_list_value();
list->set_type(starcoder::List::VECTOR);
for (int i = 0; i < pmt::length(pmt_msg); i++) {
starcoder::BlockMessage element =
convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i));
list->add_value()->Swap(&element);
}
} else if (pmt::is_dict(pmt_msg)) {
starcoder::Dict *dict = proto_msg.mutable_dict_value();
pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg);
for (int i = 0; i < pmt::length(key_value_pairs_list); i++) {
starcoder::Dict_Entry *entry = dict->add_entry();

starcoder::BlockMessage key =
convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list)));
starcoder::BlockMessage value =
convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list)));

entry->mutable_key()->Swap(&key);
entry->mutable_value()->Swap(&value);
}
}
return proto_msg;
}
Loading