Skip to content

Commit

Permalink
vdk-core: Remove @table column from base ingestion code (#170)
Browse files Browse the repository at this point in the history
While it is necessary when doing http ingestion, this column
is not needed for database ingestion. So there is no need to
always set this value, thus making it mandatory to create @table
column for all ingestion targets.

Remove code setting @table column in ingester_base
send_object_for_ingestion.

Tested by unit tests.

Signed-off-by: Yana Zhivkova <yzhivkova@vmware.com>
  • Loading branch information
YanaZhivkova committed Sep 1, 2021
1 parent d277e15 commit 1422e23
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,7 @@ def send_tabular_data_for_ingestion(
# fetch data in chunks to prevent running out of memory
for page_number, page in enumerate(ingester_utils.get_page_generator(rows)):
ingester_utils.validate_column_count(page, column_names)
converted_rows = ingester_utils.convert_table(
page, column_names, destination_table
)
converted_rows = ingester_utils.convert_table(page, column_names)
log.debug(
"Posting page {number} with {size} rows for ingestion.".format(
number=page_number, size=len(converted_rows)
Expand Down Expand Up @@ -325,15 +323,22 @@ def _payload_aggregator_thread(self):
continue

# First payload will determine the target and collection_id
if not current_target and not current_collection_id and not current_destination_table:
if (
not current_target
and not current_collection_id
and not current_destination_table
):
current_target = target
current_collection_id = collection_id
current_destination_table = destination_table


# When we get a payload with different than current target/collection_id/destination_table,
# send the current payload and start aggregating for the new one.
if current_target != target or current_collection_id != collection_id or current_destination_table != destination_table:
if (
current_target != target
or current_collection_id != collection_id
or current_destination_table != destination_table
):
(
aggregated_payload,
number_of_payloads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,19 @@ def validate_column_count(data: iter, column_names: iter):
)


def convert_table(table: iter, column_names: iter, table_name: str) -> List[dict]:
def convert_table(table: iter, column_names: iter) -> List[dict]:
"""
Converts tabular data into dictionary objects
:param table: iter
A representation of a two-dimensional array that allows iteration over rows.
:param column_names: iter
Names of the table columns.
:param table_name: string
Value of the `table_name` key, that is mandatory to identify the
destination table.
:return: list of dicts containing the converted table objects.
"""
converted_rows = []
for row in table:
cdf_row = dict()
cdf_row["@table"] = table_name
for index, value in enumerate(row):
if value is not None:
value = _handle_special_types(value)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from datetime import datetime
from unittest.mock import MagicMock
from unittest.mock import patch
from datetime import datetime

import pytest
from taurus.api.plugin.plugin_input import IIngesterPlugin
Expand Down Expand Up @@ -62,10 +62,12 @@ def test_send_object_for_ingestion(mocked_send):
assert exc_info.type == errors.UserCodeError

with pytest.raises(errors.UserCodeError) as exc_info:
ingester_base.send_object_for_ingestion(payload=test_unserializable_payload,
destination_table=destination_table,
method=method,
target=target)
ingester_base.send_object_for_ingestion(
payload=test_unserializable_payload,
destination_table=destination_table,
method=method,
target=target,
)
assert exc_info.type == errors.UserCodeError


Expand All @@ -75,7 +77,6 @@ def test_send_tabular_data_for_ingestion():
destination_table = "a_destination_table"
converted_row = [
{
"@table": destination_table,
"testcol0": "testrow0testcol0",
"testcol1": 42,
}
Expand Down

0 comments on commit 1422e23

Please sign in to comment.