Skip to content

Commit

Permalink
TDL-14068:fixed key-error exception (#38)
Browse files Browse the repository at this point in the history
* TDL-14068:fixed key-error exception

* Added unit test cases and integration tests

* Running one integration test for debugging

* Debugging integration test case

* Updated integration test

* Updated integration test expected output

* Updated config.yml for running all integration test again
  • Loading branch information
savan-chovatiya committed Jul 1, 2021
1 parent a008877 commit 352a926
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 2 deletions.
7 changes: 5 additions & 2 deletions tap_s3_csv/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,11 @@ def sync_csv_file(config, file_handle, s3_path, table_spec, stream):
# memory consumption but that's acceptable as well.
csv.field_size_limit(sys.maxsize)

iterator = csv_helper.get_row_iterator(
file_handle, table_spec, stream["schema"]["properties"].keys(), True)
if "properties" in stream["schema"]:
iterator = csv_helper.get_row_iterator(
file_handle, table_spec, stream["schema"]["properties"].keys(), True)
else:
iterator = csv_helper.get_row_iterator(file_handle, table_spec, None, True)

records_synced = 0

Expand Down
15 changes: 15 additions & 0 deletions tests/resources/tap-s3-csv/catalog_without_properties_case.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"start_date": "2017-01-01T00:00:00Z",
"bucket": "com-stitchdata-prod-circleci-assets",
"account_id": "218546966473",
"tables": [
{
"table_name": "catalog_without_properties",
"search_prefix": "tap_tester",
"search_pattern": ".*?/test_empty_catalog.*?csv$",
"key_properties": "",
"date_overrides": ""
}
]
}

1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_4.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_5.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
1 change: 1 addition & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_6.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id,name
5 changes: 5 additions & 0 deletions tests/resources/tap-s3-csv/test_empty_catalog_7.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,name,name
1,John,carl
2,Bob
3
4,Alice,Barak,Ben,5
120 changes: 120 additions & 0 deletions tests/test_catalog_without_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
import boto3
import unittest
import utils_for_test as utils
import os

import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner

TAP_S3_CSV_PATH = "tap-s3-csv"

def get_resources_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'resources', path)


class S3CatalogWithoutProperties(unittest.TestCase):

def resource_names(self):
return ["test_empty_catalog_1.csv", "test_empty_catalog_2.csv", "test_empty_catalog_3.csv", "test_empty_catalog_4.csv", "test_empty_catalog_5.csv", "test_empty_catalog_6.csv", "test_empty_catalog_7.csv"]

def tap_name(self):
return "tap-s3-csv"

def name(self):
return "tap_tester_s3_catalog_without_properties_csv"

def get_type(self):
return "platform.s3-csv"

def get_credentials(self):
return {}

def expected_streams(self):
return {
'catalog_without_properties'
}

def expected_pks(self):
return {}

def get_properties(self):
with open(get_resources_path("tap-s3-csv/catalog_without_properties_case.json"), encoding='utf-8') as file:
data = json.load(file)
data["tables"] = json.dumps(data["tables"])

return data

def setUp(self):
self.conn_id = connections.ensure_connection(self)

def setUpTestEnvironment(self):
for resource in self.resource_names():
utils.delete_and_push_file(self.get_properties(), [
resource], TAP_S3_CSV_PATH)

def test_catalog_without_properties(self):

self.setUpTestEnvironment()

runner.run_check_job_and_check_status(self)

found_catalogs = menagerie.get_catalogs(self.conn_id)
self.assertEqual(len(found_catalogs), 1,
msg="unable to locate schemas for connection {}".format(self.conn_id))

found_catalog_names = set(
map(lambda c: c['tap_stream_id'], found_catalogs))
subset = self.expected_streams().issubset(found_catalog_names)
self.assertTrue(
subset, msg="Expected check streams are not subset of discovered catalog")

our_catalogs = [c for c in found_catalogs if c.get(
'tap_stream_id') in self.expected_streams()]

# Select our catalogs
for c in our_catalogs:
c_annotated = menagerie.get_annotated_schema(
self.conn_id, c['stream_id'])
connections.select_catalog_and_fields_via_metadata(
self.conn_id, c, c_annotated, [], [])

# Stream properties should be zero as all 5 files considered in sampling are containing headers only.
# No fields with breadcumb will be present in schema
metadata = c_annotated["metadata"]
stream_properties = [item for item in metadata if item.get("breadcrumb") != []]
self.assertEqual(len(stream_properties), 0)

# Clear state before our run
menagerie.set_state(self.conn_id, {})

# Run a sync job using orchestrator
sync_job_name = runner.run_sync_mode(self, self.conn_id)

# Verify tap and target exit codes
exit_status = menagerie.get_exit_status(self.conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

synced_records = runner.get_records_from_target_output()
upsert_messages = [m for m in synced_records.get(
'catalog_without_properties').get('messages') if m['action'] == 'upsert']

records = [message.get('data') for message in upsert_messages]

#All fields from file test_empty_catalog_7.csv should be emitted with duplicate & no header handling
#as catalog is without any fields.

expected_records = [
{'id': '1', 'name': 'John', '_sdc_extra': [{'name': 'carl'}], '_sdc_source_bucket': 'com-stitchdata-prod-circleci-assets',
'_sdc_source_file': 'tap_tester/test_empty_catalog_7.csv', '_sdc_source_lineno': 2},
{'id': '2', 'name': 'Bob', '_sdc_source_bucket': 'com-stitchdata-prod-circleci-assets',
'_sdc_source_file': 'tap_tester/test_empty_catalog_7.csv', '_sdc_source_lineno': 3},
{'id': '3', '_sdc_source_bucket': 'com-stitchdata-prod-circleci-assets',
'_sdc_source_file': 'tap_tester/test_empty_catalog_7.csv', '_sdc_source_lineno': 4},
{'id': '4', 'name': 'Alice', '_sdc_extra': [{'no_headers': ['Ben', '5']}, {
'name': 'Barak'}], '_sdc_source_bucket': 'com-stitchdata-prod-circleci-assets', '_sdc_source_file': 'tap_tester/test_empty_catalog_7.csv', '_sdc_source_lineno': 5}
]

self.assertListEqual(expected_records, records)

41 changes: 41 additions & 0 deletions tests/unittests/test_key_error_for_catalog_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import unittest
from unittest import mock
from tap_s3_csv import sync

def mockTransformer():
class Transformer():
def __init__(self):
pass
def transform(self,rec,schema,matadata):
return rec
return Transformer()

@mock.patch("tap_s3_csv.sync.csv_helper.get_row_iterator")
@mock.patch("singer.Transformer",side_effect=mockTransformer)
class TestKeyErrorFOrCatalogPropeties(unittest.TestCase):

def test_catalog_with_properties(self, mockTransformer, mocked_get_row_iterator):
config = {'bucket': 'test'}
table_spec = {'table_name': 'test_table'}
stream = {"schema": {"properties": {"columnA": {"type": ["integer"]}}}}
file_handle = [
b"columnA,columnB,columnC",
b"1,2,3,4"
]
s3_path = "unittest/sample.csv"

sync.sync_csv_file(config, file_handle, s3_path, table_spec, stream)
mocked_get_row_iterator.assert_called_with(file_handle, table_spec, stream["schema"]["properties"].keys(), True)

def test_catalog_with_no_properties(self, mockTransformer, mocked_get_row_iterator):
config = {'bucket': 'test'}
table_spec = {'table_name': 'test_table'}
stream = {"schema": {}}
file_handle = [
b"columnA,columnB,columnC",
b"1,2,3,4"
]
s3_path = "unittest/sample.csv"

sync.sync_csv_file(config, file_handle, s3_path, table_spec, stream)
mocked_get_row_iterator.assert_called_with(file_handle, table_spec, None, True)

0 comments on commit 352a926

Please sign in to comment.