Skip to content

Commit

Permalink
Source S3: choose between data types when merging master schema (airb…
Browse files Browse the repository at this point in the history
…ytehq#16631)

* airbytehq#422 source s3: choose broadest data type when there is a mismatch during merging json schemas

* airbytehq#422 source s3: upd changelog
  • Loading branch information
davydov-d authored and robbinhan committed Sep 29, 2022
1 parent a8c5656 commit 9275793
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 13 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/source-s3
Expand Up @@ -185,6 +185,21 @@ def get_json_schema(self) -> Mapping[str, Any]:
properties[self.ab_last_mod_col]["format"] = "date-time"
return {"type": "object", "properties": properties}

@staticmethod
def _broadest_type(type_1: str, type_2: str) -> Optional[str]:
non_comparable_types = ["object", "array", "null"]
if type_1 in non_comparable_types or type_2 in non_comparable_types:
return None
types = {type_1, type_2}
if types == {"boolean", "string"}:
return "string"
if types == {"integer", "number"}:
return "number"
if types == {"integer", "string"}:
return "string"
if types == {"number", "string"}:
return "string"

def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
"""
In order to auto-infer a schema across many files and/or allow for additional properties (columns),
Expand Down Expand Up @@ -223,22 +238,27 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
# this compares datatype of every column that the two schemas have in common
for col in column_superset:
if (col in master_schema.keys()) and (col in this_schema.keys()) and (master_schema[col] != this_schema[col]):
# If this column exists in a provided schema or schema state, we'll WARN here rather than throw an error
# this is to allow more leniency as we may be able to coerce this datatype mismatch on read according to
# provided schema state. If not, then the read will error anyway
if col in self._schema.keys():
# If this column exists in a provided schema or schema state, we'll WARN here rather than throw an error.
# This is to allow more leniency as we may be able to coerce this datatype mismatch on read according to
# provided schema state. Else we're inferring the schema (or at least this column) from scratch, and therefore
# we try to choose the broadest type among two if possible
broadest_of_types = self._broadest_type(master_schema[col], this_schema[col])
type_explicitly_defined = col in self._schema.keys()
override_type = broadest_of_types and not type_explicitly_defined
if override_type:
master_schema[col] = broadest_of_types
if override_type or type_explicitly_defined:
LOGGER.warn(
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. "
+ f"Airbyte will attempt to coerce this to {master_schema[col]} on read."
)
# else we're inferring the schema (or at least this column) from scratch and therefore
# throw an error on mismatching datatypes
else:
raise RuntimeError(
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'."
)
continue
# otherwise throw an error on mismatching datatypes
raise RuntimeError(
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'."
)

# missing columns in this_schema doesn't affect our master_schema, so we don't check for it here

Expand Down
Expand Up @@ -593,6 +593,30 @@ def test_filepath_iterator(self, bucket, path_prefix, list_v2_objects, expected_
False,
False,
),
( # int becomes str in case of type mismatch in different files
"{}",
datetime(2020, 5, 5, 13, 5, 5),
[
FileInfo(last_modified=datetime(2022, 1, 1, 13, 5, 5), key="first", size=128),
FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128),
],
[
{"pk": "string", "full_name": "string", "street_address": "string", "customer_code": "integer", "email": "string",
"dob": "string"},
{"pk": "integer", "full_name": "string", "street_address": "string", "customer_code": "integer", "email": "string",
"dob": "string"}
],
{
"pk": "string",
"full_name": "string",
"street_address": "string",
"customer_code": "integer",
"email": "string",
"dob": "string"
},
True,
False,
),
),
)
@patch("source_s3.stream.IncrementalFileStreamS3.storagefile_class", MagicMock())
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/s3.md
Expand Up @@ -205,7 +205,8 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. |
| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas |
| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. |
| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet |
| 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema |
| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs |
Expand Down

0 comments on commit 9275793

Please sign in to comment.