Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-14068:fixed key-error exception #38

Merged
merged 7 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions tap_s3_csv/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,11 @@ def sync_csv_file(config, file_handle, s3_path, table_spec, stream):
# memory consumption but that's acceptable as well.
csv.field_size_limit(sys.maxsize)

iterator = csv_helper.get_row_iterator(
file_handle, table_spec, stream["schema"]["properties"].keys(), True)
if "properties" in stream["schema"]:
iterator = csv_helper.get_row_iterator(
file_handle, table_spec, stream["schema"]["properties"].keys(), True)
else:
iterator = csv_helper.get_row_iterator(file_handle, table_spec, None, True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When all five CSV files used in sampling are without any records, the catalog will be generated without a properties field so emitting all the records when properties are not found.


records_synced = 0

Expand Down
15 changes: 15 additions & 0 deletions tests/resources/tap-s3-csv/catalog_without_properties_case.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"start_date": "2017-01-01T00:00:00Z",
"bucket": "com-stitchdata-prod-circleci-assets",
"account_id": "218546966473",
"tables": [
{
"table_name": "catalog_without_properties",
"search_prefix": "tap_tester",
"search_pattern": ".*?/test_empty_catalog.*?csv$",
"key_properties": "",
"date_overrides": ""
}
]
}

1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_4.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_5.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_6.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
5 changes: 5 additions & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_7.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,name,name
1,John,carl
2,Bob
3
4,Alice,Barak,Ben,5
120 changes: 120 additions & 0 deletions tests/test_catalog_without_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
import boto3
import unittest
import utils_for_test as utils
import os

import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner

TAP_S3_CSV_PATH = "tap-s3-csv"

def get_resources_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'resources', path)


class S3CatalogWithoutProperties(unittest.TestCase):

def resource_names(self):
return ["test_empty_catalog_1.csv", "test_empty_catalog_2.csv", "test_empty_catalog_3.csv", "test_empty_catalog_4.csv", "test_empty_catalog_5.csv", "test_empty_catalog_6.csv", "test_empty_catalog_7.csv"]

def tap_name(self):
return "tap-s3-csv"

def name(self):
return "tap_tester_s3_catalog_without_properties_csv"

def get_type(self):
return "platform.s3-csv"

def get_credentials(self):
return {}

def expected_streams(self):
return {
'catalog_without_properties'
}

def expected_pks(self):
return {}

def get_properties(self):
with open(get_resources_path("tap-s3-csv/catalog_without_properties_case.json"), encoding='utf-8') as file:
data = json.load(file)
data["tables"] = json.dumps(data["tables"])

return data

def setUp(self):
self.conn_id = connections.ensure_connection(self)

def setUpTestEnvironment(self):
for resource in self.resource_names():
utils.delete_and_push_file(self.get_properties(), [
resource], TAP_S3_CSV_PATH)

def test_catalog_without_properties(self):

self.setUpTestEnvironment()

runner.run_check_job_and_check_status(self)

found_catalogs = menagerie.get_catalogs(self.conn_id)
self.assertEqual(len(found_catalogs), 1,
msg="unable to locate schemas for connection {}".format(self.conn_id))

found_catalog_names = set(
map(lambda c: c['tap_stream_id'], found_catalogs))
subset = self.expected_streams().issubset(found_catalog_names)
self.assertTrue(
subset, msg="Expected check streams are not subset of discovered catalog")

our_catalogs = [c for c in found_catalogs if c.get(
'tap_stream_id') in self.expected_streams()]

# Select our catalogs
for c in our_catalogs:
c_annotated = menagerie.get_annotated_schema(
self.conn_id, c['stream_id'])
connections.select_catalog_and_fields_via_metadata(
self.conn_id, c, c_annotated, [], [])

# Stream properties should be zero as all 5 files considered in sampling are containing headers only.
# No fields with breadcumb will be present in schema
metadata = c_annotated["metadata"]
stream_properties = [item for item in metadata if item.get("breadcrumb") != []]
self.assertEqual(len(stream_properties), 0)

# Clear state before our run
menagerie.set_state(self.conn_id, {})

# Run a sync job using orchestrator
sync_job_name = runner.run_sync_mode(self, self.conn_id)

# Verify tap and target exit codes
exit_status = menagerie.get_exit_status(self.conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

synced_records = runner.get_records_from_target_output()
upsert_messages = [m for m in synced_records.get(
'catalog_without_properties').get('messages') if m['action'] == 'upsert']

records = [message.get('data') for message in upsert_messages]

#All fields from file test_empty_catalog_7.csv should be emitted with duplicate & no header handling
#as catalog is without any fields.

expected_records = [
{'id': '1', 'name': 'John', '_sdc_extra': [{'name': 'carl'}], '_sdc_source_bucket': 'com-stitchdata-prod-circleci-assets',
'_sdc_source_file': 'tap_tester/test_empty_catalog_7.csv', '_sdc_source_lineno': 2},
{'id': '2', 'name': 'Bob', '_sdc_source_bucket': 'com-stitchdata-prod-circleci-assets',
'_sdc_source_file': 'tap_tester/test_empty_catalog_7.csv', '_sdc_source_lineno': 3},
{'id': '3', '_sdc_source_bucket': 'com-stitchdata-prod-circleci-assets',
'_sdc_source_file': 'tap_tester/test_empty_catalog_7.csv', '_sdc_source_lineno': 4},
{'id': '4', 'name': 'Alice', '_sdc_extra': [{'no_headers': ['Ben', '5']}, {
'name': 'Barak'}], '_sdc_source_bucket': 'com-stitchdata-prod-circleci-assets', '_sdc_source_file': 'tap_tester/test_empty_catalog_7.csv', '_sdc_source_lineno': 5}
]

self.assertListEqual(expected_records, records)

41 changes: 41 additions & 0 deletions tests/unittests/test_key_error_for_catalog_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import unittest
from unittest import mock
from tap_s3_csv import sync

def mockTransformer():
class Transformer():
def __init__(self):
pass
def transform(self,rec,schema,matadata):
return rec
return Transformer()

@mock.patch("tap_s3_csv.sync.csv_helper.get_row_iterator")
@mock.patch("singer.Transformer",side_effect=mockTransformer)
class TestKeyErrorFOrCatalogPropeties(unittest.TestCase):

def test_catalog_with_properties(self, mockTransformer, mocked_get_row_iterator):
config = {'bucket': 'test'}
table_spec = {'table_name': 'test_table'}
stream = {"schema": {"properties": {"columnA": {"type": ["integer"]}}}}
file_handle = [
b"columnA,columnB,columnC",
b"1,2,3,4"
]
s3_path = "unittest/sample.csv"

sync.sync_csv_file(config, file_handle, s3_path, table_spec, stream)
mocked_get_row_iterator.assert_called_with(file_handle, table_spec, stream["schema"]["properties"].keys(), True)

def test_catalog_with_no_properties(self, mockTransformer, mocked_get_row_iterator):
config = {'bucket': 'test'}
table_spec = {'table_name': 'test_table'}
stream = {"schema": {}}
file_handle = [
b"columnA,columnB,columnC",
b"1,2,3,4"
]
s3_path = "unittest/sample.csv"

sync.sync_csv_file(config, file_handle, s3_path, table_spec, stream)
mocked_get_row_iterator.assert_called_with(file_handle, table_spec, None, True)