Skip to content

Commit

Permalink
Merge f72767a into 505f163
Browse files Browse the repository at this point in the history
  • Loading branch information
ramonacat committed May 3, 2022
2 parents 505f163 + f72767a commit 86e9005
Show file tree
Hide file tree
Showing 10 changed files with 1,064 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ tonic = { version = "0.6.1", default-features = false, features = [
"transport",
"tls",
] }
prost = "0.9.0"
prost-types = "0.9.0"
tremor-otelapis = { version = "0.2.4" }

# aws-s3
Expand All @@ -207,6 +209,7 @@ aws-smithy-http = "0.41"
# gcp
googapis = { version = "0.6", default-features = false, features = [
"google-pubsub-v1",
"google-cloud-bigquery-storage-v1"
] }
gouth = { version = "0.2" }
http = "0.2.7"
Expand Down
1 change: 1 addition & 0 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ pub(crate) fn builtin_connector_types() -> Vec<Box<dyn ConnectorBuilder + 'stati
Box::new(impls::http::server::Builder::default()),
Box::new(impls::otel::client::Builder::default()),
Box::new(impls::otel::server::Builder::default()),
Box::new(impls::gbq::writer::Builder::default()),
]
}

Expand Down
2 changes: 2 additions & 0 deletions src/connectors/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub(crate) mod elastic;
pub(crate) mod exit;
/// file connector implementation
pub(crate) mod file;
/// Google Big Query
pub(crate) mod gbq;
/// HTTP
pub(crate) mod http;
/// Kafka consumer and producer
Expand Down
15 changes: 15 additions & 0 deletions src/connectors/impls/gbq.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2021, The Tremor Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod writer;
69 changes: 69 additions & 0 deletions src/connectors/impls/gbq/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2021, The Tremor Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod sink;

use crate::connectors::impls::gbq::writer::sink::GbqSink;
use crate::connectors::prelude::*;
use crate::connectors::{Connector, ConnectorBuilder, ConnectorConfig, ConnectorType};
use serde::Deserialize;
use tremor_pipeline::ConfigImpl;

#[derive(Deserialize, Clone)]
pub(crate) struct Config {
pub table_id: String,
pub connect_timeout: u64,
pub request_timeout: u64,
}
impl ConfigImpl for Config {}

#[derive(Debug, Default)]
pub(crate) struct Builder {}

struct Gbq {
config: Config,
}

#[async_trait::async_trait]
impl Connector for Gbq {
async fn create_sink(
&mut self,
sink_context: SinkContext,
builder: SinkManagerBuilder,
) -> Result<Option<SinkAddr>> {
let sink = GbqSink::new(self.config.clone());

builder.spawn(sink, sink_context).map(Some)
}

fn codec_requirements(&self) -> CodecReq {
CodecReq::Structured
}
}

#[async_trait::async_trait]
impl ConnectorBuilder for Builder {
fn connector_type(&self) -> ConnectorType {
"gbq".into()
}

async fn build(&self, alias: &str, config: &ConnectorConfig) -> Result<Box<dyn Connector>> {
if let Some(raw_config) = &config.config {
let config = Config::new(raw_config)?;
Ok(Box::new(Gbq { config }))
} else {
Err(ErrorKind::MissingConfiguration(alias.to_string()).into())
}
}
}

0 comments on commit 86e9005

Please sign in to comment.