Skip to content

Commit

Permalink
Source SFTP-Bulk: Support custom CSV separators (airbytehq#19224)
Browse files Browse the repository at this point in the history
* manage different separators

* tests

* update tests

* add optional parameter for separator

* add default separator

* typo

* add trailing newline

* fix spec.json order

* bump dockerfile version

* update changelog

* auto-bump connector version

* retrigger checks

* retrigger checks

---------

Co-authored-by: Sunny Hashmi <6833405+sh4sh@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored and marcosmarxm committed Jun 8, 2023
1 parent 414acb5 commit 4e02185
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28243,7 +28243,7 @@
"sourceDefinitionId": "31e3242f-dee7-4cdc-a4b8-8e06c5458517",
"name": "SFTP Bulk",
"dockerRepository": "airbyte/source-sftp-bulk",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/sftp-bulk",
"icon": "sftp.svg",
"sourceType": "file",
Expand Down Expand Up @@ -28307,28 +28307,36 @@
"order": 6,
"examples": [ "csv", "json" ]
},
"separator": {
"title": "CSV Separator (Optional)",
"description": "The separator used in the CSV files. Define None if you want to use the Sniffer functionality",
"type": "string",
"default": ",",
"examples": [ "," ],
"order": 7
},
"folder_path": {
"title": "Folder Path (Optional)",
"description": "The directory to search files for sync",
"type": "string",
"default": "",
"examples": [ "/logs/2022" ],
"order": 7
"order": 8
},
"file_pattern": {
"title": "File Pattern (Optional)",
"description": "The regular expression to specify files for sync in a chosen Folder Path",
"type": "string",
"default": "",
"examples": [ "log-([0-9]{4})([0-9]{2})([0-9]{2}) - This will filter files which `log-yearmmdd`" ],
"order": 8
"order": 9
},
"file_most_recent": {
"title": "Most recent file (Optional)",
"description": "Sync only the most recent file for the configured folder path and file pattern",
"type": "boolean",
"default": false,
"order": 9
"order": 10
},
"start_date": {
"type": "string",
Expand All @@ -28337,7 +28345,7 @@
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": [ "2017-01-25T00:00:00Z" ],
"description": "The date from which you'd like to replicate data for all incremental streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
"order": 10
"order": 11
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2497,7 +2497,7 @@
- name: SFTP Bulk
sourceDefinitionId: 31e3242f-dee7-4cdc-a4b8-8e06c5458517
dockerRepository: airbyte/source-sftp-bulk
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.com/integrations/sources/sftp-bulk
icon: sftp.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18186,7 +18186,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-sftp-bulk:0.1.1"
- dockerImage: "airbyte/source-sftp-bulk:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/source/ftp"
connectionSpecification:
Expand Down Expand Up @@ -18255,14 +18255,23 @@
examples:
- "csv"
- "json"
separator:
title: "CSV Separator (Optional)"
description: "The separator used in the CSV files. Define None if you want\
\ to use the Sniffer functionality"
type: "string"
default: ","
examples:
- ","
order: 7
folder_path:
title: "Folder Path (Optional)"
description: "The directory to search files for sync"
type: "string"
default: ""
examples:
- "/logs/2022"
order: 7
order: 8
file_pattern:
title: "File Pattern (Optional)"
description: "The regular expression to specify files for sync in a chosen\
Expand All @@ -18271,14 +18280,14 @@
default: ""
examples:
- "log-([0-9]{4})([0-9]{2})([0-9]{2}) - This will filter files which `log-yearmmdd`"
order: 8
order: 9
file_most_recent:
title: "Most recent file (Optional)"
description: "Sync only the most recent file for the configured folder path\
\ and file pattern"
type: "boolean"
default: false
order: 9
order: 10
start_date:
type: "string"
title: "Start Date"
Expand All @@ -18289,7 +18298,7 @@
description: "The date from which you'd like to replicate data for all incremental\
\ streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after\
\ this date will be replicated."
order: 10
order: 11
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-sftp-bulk
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
string_col;int_col
"hello";1
"foo";2
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,17 @@ def test_get_files_pattern_json(config: Mapping, configured_catalog: ConfiguredA
assert res.record.data["int_col"] == 2


def test_get_files_pattern_json_new_separator(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result_iter = source.read(logger, {**config, "file_pattern": "test_2.+"}, configured_catalog, None)
result = list(result_iter)
assert len(result) == 1
for res in result:
assert res.type == Type.RECORD
assert res.record.data["string_col"] == "hello"
assert res.record.data["int_col"] == 1


def test_get_files_pattern_no_match_json(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result = source.read(logger, {**config, "file_pattern": "bad_pattern.+"}, configured_catalog, None)
Expand All @@ -197,7 +208,7 @@ def test_get_files_no_pattern_csv(config: Mapping, configured_catalog: Configure
source = SourceFtp()
result_iter = source.read(logger, {**config, "file_type": "csv", "folder_path": "files/csv"}, configured_catalog, None)
result = list(result_iter)
assert len(result) == 2
assert len(result) == 4
for res in result:
assert res.type == Type.RECORD
assert res.record.data["string_col"] in ["foo", "hello"]
Expand All @@ -217,6 +228,33 @@ def test_get_files_pattern_csv(config: Mapping, configured_catalog: ConfiguredAi
assert res.record.data["int_col"] in [1, 2]


def test_get_files_pattern_csv_new_separator(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result_iter = source.read(
logger, {**config, "file_type": "csv", "folder_path": "files/csv", "file_pattern": "test_2.+"}, configured_catalog, None
)
result = list(result_iter)
assert len(result) == 2
for res in result:
assert res.type == Type.RECORD
assert res.record.data["string_col"] in ["foo", "hello"]
assert res.record.data["int_col"] in [1, 2]


def test_get_files_pattern_csv_new_separator_with_config(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result_iter = source.read(
logger, {**config, "file_type": "csv", "folder_path": "files/csv", "separator": ";", "file_pattern": "test_2.+"},
configured_catalog, None
)
result = list(result_iter)
assert len(result) == 2
for res in result:
assert res.type == Type.RECORD
assert res.record.data["string_col"] in ["foo", "hello"]
assert res.record.data["int_col"] in [1, 2]


def test_get_files_pattern_no_match_csv(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result = source.read(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import csv
import io
import logging
import os
Expand Down Expand Up @@ -168,15 +169,25 @@ def get_files(self, prefix, search_pattern=None, modified_since=None, most_recen

return sorted_files

def peek_line(self, f):
pos = f.tell()
line = f.readline()
f.seek(pos)
return line

@backoff.on_exception(backoff.expo, (socket.timeout), max_tries=5, factor=2)
def fetch_file(self, fn: Mapping[str, Any], file_type="csv") -> pd.DataFrame:
def fetch_file(self, fn: Mapping[str, Any], separator, file_type="csv") -> pd.DataFrame:
try:
with self._connection.open(fn["filepath"], "rb") as f:
with self._connection.open(fn["filepath"], "r") as f:
df: pd.DataFrame = None

if not separator:
dialect = csv.Sniffer().sniff(self.peek_line(f=f))
separator = dialect.delimiter

# Using pandas to make reading files in different formats easier
if file_type == "csv":
df = pd.read_csv(f)
df = pd.read_csv(f, engine="python", sep=separator)
elif file_type == "json":
df = pd.read_json(f, lines=True)
else:
Expand All @@ -196,10 +207,10 @@ def fetch_file(self, fn: Mapping[str, Any], file_type="csv") -> pd.DataFrame:

raise Exception("Unable to read file: %s" % e) from e

def fetch_files(self, files, file_type="csv") -> Tuple[datetime, Dict[str, Any]]:
def fetch_files(self, files, separator, file_type="csv") -> Tuple[datetime, Dict[str, Any]]:
logger.info("Fetching %s files", len(files))
for fn in files:
records = self.fetch_file(fn, file_type)
records = self.fetch_file(fn=fn, separator=separator, file_type=file_type)
yield (fn["last_modified"], records.to_dict("records"))

self.close()
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _infer_json_schema(self, config: Mapping[str, Any], connection: SFTPClient)

# Get last file to infer schema
# Use pandas `infer_objects` to infer dtypes
df = connection.fetch_file(files[-1], config["file_type"])
df = connection.fetch_file(fn=files[-1], file_type=config["file_type"], separator=config.get("separator"))
df = df.infer_objects()

# Default column used for incremental sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,21 @@
"order": 6,
"examples": ["csv", "json"]
},
"separator": {
"title": "CSV Separator (Optional)",
"description": "The separator used in the CSV files. Define None if you want to use the Sniffer functionality",
"type": "string",
"default": ",",
"examples": [","],
"order": 7
},
"folder_path": {
"title": "Folder Path (Optional)",
"description": "The directory to search files for sync",
"type": "string",
"default": "",
"examples": ["/logs/2022"],
"order": 7
"order": 8
},
"file_pattern": {
"title": "File Pattern (Optional)",
Expand All @@ -81,14 +89,14 @@
"examples": [
"log-([0-9]{4})([0-9]{2})([0-9]{2}) - This will filter files which `log-yearmmdd`"
],
"order": 8
"order": 9
},
"file_most_recent": {
"title": "Most recent file (Optional)",
"description": "Sync only the most recent file for the configured folder path and file pattern",
"type": "boolean",
"default": false,
"order": 9
"order": 10
},
"start_date": {
"type": "string",
Expand All @@ -97,7 +105,7 @@
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2017-01-25T00:00:00Z"],
"description": "The date from which you'd like to replicate data for all incremental streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
"order": 10
"order": 11
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def read_records(
most_recent_only=self._only_most_recent_file,
)

for cursor, records in self.connection.fetch_files(files, self.config["file_type"]):
for cursor, records in self.connection.fetch_files(
files=files, file_type=self.config["file_type"], separator=self.config.get("separator")
):
if cursor and sync_mode == SyncMode.incremental:
if self._cursor_value and cursor > self._cursor_value:
self._cursor_value = cursor
Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@
| **S3** | <img alt="S3 icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/s3.svg" height="30" height="30"/> | Source | airbyte/source-s3:2.1.2 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/s3) | [connectors/source/s3](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/s3) | [source-s3](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | <small>`69589781-7828-43c5-9f63-8925b1c1ccc2`</small> |
| **SAP Fieldglass** | <img alt="SAP Fieldglass icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/sapfieldglass.svg" height="30" height="30"/> | Source | airbyte/source-sap-fieldglass:0.1.0 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sap-fieldglass) | [connectors/source/sap-fieldglass](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sap-fieldglass) | [source-sap-fieldglass](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sap-fieldglass) | <small>`ec5f3102-fb31-4916-99ae-864faf8e7e25`</small> |
| **SFTP** | <img alt="SFTP icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp:0.1.2 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sftp) | [connectors/source/sftp](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sftp) | [source-sftp](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | <small>`a827c52e-791c-4135-a245-e233c5255199`</small> |
| **SFTP Bulk** | <img alt="SFTP Bulk icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp-bulk:0.1.1 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [connectors/source/sftp-bulk](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sftp-bulk) | [source-sftp-bulk](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | <small>`31e3242f-dee7-4cdc-a4b8-8e06c5458517`</small> |
| **SFTP Bulk** | <img alt="SFTP Bulk icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp-bulk:0.1.2 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [connectors/source/sftp-bulk](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sftp-bulk) | [source-sftp-bulk](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | <small>`31e3242f-dee7-4cdc-a4b8-8e06c5458517`</small> |
| **SalesLoft** | <img alt="SalesLoft icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/salesloft.svg" height="30" height="30"/> | Source | airbyte/source-salesloft:1.0.0 | beta | [docs](https://docs.airbyte.com/integrations/sources/salesloft) | [connectors/source/salesloft](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/salesloft) | [source-salesloft](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesloft) | <small>`41991d12-d4b5-439e-afd0-260a31d4c53f`</small> |
| **Salesforce** | <img alt="Salesforce icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/salesforce.svg" height="30" height="30"/> | Source | airbyte/source-salesforce:2.0.9 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/salesforce) | [connectors/source/salesforce](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/salesforce) | [source-salesforce](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesforce) | <small>`b117307c-14b6-41aa-9422-947e34922962`</small> |
| **Sample Data (Faker)** | <img alt="Sample Data (Faker) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/faker.svg" height="30" height="30"/> | Source | airbyte/source-faker:2.0.3 | beta | [docs](https://docs.airbyte.com/integrations/sources/faker) | [connectors/source/faker](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/faker) | [source-faker](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-faker) | <small>`dfd88b22-b603-4c3d-aad7-3701784586b1`</small> |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/sftp-bulk.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ More formats \(e.g. Apache Avro\) will be supported in the future.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------|:----------------|
| 0.1.2 | 2023-04-19 | [#19224](https://github.com/airbytehq/airbyte/pull/19224) | Support custom CSV separators |
| 0.1.1 | 2023-03-17 | [#24180](https://github.com/airbytehq/airbyte/pull/24180) | Fix field order |
| 0.1.0 | 2021-24-05 | | Initial version |

0 comments on commit 4e02185

Please sign in to comment.