Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"
ARG with_postgis
RUN test -z "${with_postgis}" || (\
export POSTGIS_MAJOR=3 && \
export POSTGIS_VERSION=3.1.4+dfsg-1.pgdg100+1 && \
export POSTGIS_VERSION=3.1.4+dfsg-3.pgdg100+1 && \
apt-get update \
&& apt-cache showpkg postgresql-$PG_MAJOR-postgis-$POSTGIS_MAJOR \
&& apt-get install -y --no-install-recommends \
Expand Down
31 changes: 29 additions & 2 deletions splitgraph/ingestion/csv/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import io
from typing import TYPE_CHECKING, Any, Dict, NamedTuple, Tuple
import logging
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Tuple

from minio import Minio

Expand All @@ -10,6 +11,13 @@
import chardet
from splitgraph.core.output import ResettableStream

try:
from multicorn.utils import log_to_postgres
except ImportError:

def log_to_postgres(*args, **kwargs):
print(*args)


class CSVOptions(NamedTuple):
autodetect_header: bool = True
Expand Down Expand Up @@ -135,7 +143,7 @@ def make_csv_reader(
errors="ignore" if csv_options.ignore_decode_errors else "strict",
)

reader = csv.reader(io_stream, **csv_options.to_csv_kwargs())
reader = csv.reader(io_stream, **csv_options.to_csv_kwargs(), skipinitialspace=True)
return csv_options, reader


Expand All @@ -155,3 +163,22 @@ def get_s3_params(fdw_options: Dict[str, Any]) -> Tuple[Minio, str, str]:
s3_object_prefix = fdw_options.get("s3_object_prefix", "")

return s3_client, s3_bucket, s3_object_prefix


def pad_csv_row(row: List[str], num_cols: int, row_number: int) -> List[str]:
"""Preprocess a CSV file row to make the parser more robust."""

# Truncate/pad the row to the expected number of columns to match the header. We'd
# rather return a CSV file full of varchars and NaNs than error out directly.
row_len = len(row)
if row_len > num_cols:
log_to_postgres(
"Row %d has %d column(s), truncating" % (row_number, row_len), level=logging.WARNING
)
row = row[:num_cols]
elif row_len < num_cols:
log_to_postgres(
"Row %d has %d column(s), padding" % (row_number, row_len), level=logging.WARNING
)
row.extend([""] * (num_cols - row_len))
return row
26 changes: 23 additions & 3 deletions splitgraph/ingestion/csv/fdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from splitgraph.config import get_singleton
from splitgraph.exceptions import get_exception_name
from splitgraph.ingestion.common import generate_column_names
from splitgraph.ingestion.csv.common import CSVOptions, get_s3_params, make_csv_reader
from splitgraph.ingestion.csv.common import (
CSVOptions,
get_s3_params,
make_csv_reader,
pad_csv_row,
)
from splitgraph.ingestion.inference import infer_sg_schema
from urllib3 import HTTPResponse

Expand All @@ -28,7 +33,10 @@
try:
from multicorn.utils import log_to_postgres
except ImportError:
log_to_postgres = print

def log_to_postgres(*args, **kwargs):
print(*args)


_PG_LOGLEVEL = logging.INFO

Expand All @@ -44,6 +52,9 @@ def _get_table_definition(response, fdw_options, table_name, table_options):
if not csv_options.header:
sample = [[""] * len(sample[0])] + sample

# Ignore empty lines (newlines at the end of file etc)
sample = [row for row in sample if len(row) > 0]

sg_schema = infer_sg_schema(sample, None, None)

# For nonexistent column names: replace with autogenerated ones (can't have empty column names)
Expand Down Expand Up @@ -108,6 +119,7 @@ def __init__(self, fdw_options, fdw_columns):

# The foreign datawrapper columns (name -> ColumnDefinition).
self.fdw_columns = fdw_columns
self._num_cols = len(fdw_columns)

self.csv_options = CSVOptions.from_fdw_options(fdw_options)

Expand Down Expand Up @@ -142,15 +154,23 @@ def explain(self, quals, columns, sortkeys=None, verbose=False):

def _read_csv(self, csv_reader, csv_options):
header_skipped = False
for row in csv_reader:
for row_number, row in enumerate(csv_reader):
if not header_skipped and csv_options.header:
header_skipped = True
continue

# Ignore empty rows too
if not row:
continue

row = pad_csv_row(row, row_number=row_number, num_cols=self._num_cols)

# CSVs don't really distinguish NULLs and empty strings well. We know
# that empty strings should be NULLs when coerced into non-strings but we
# can't easily access type information here. Do a minor hack and treat
# all empty strings as NULLs.
row = [r if r != "" else None for r in row]

yield row

def execute(self, quals, columns, sortkeys=None):
Expand Down
24 changes: 19 additions & 5 deletions splitgraph/ingestion/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from splitgraph.core.output import parse_date, parse_dt, parse_time
from splitgraph.core.types import TableColumn, TableSchema
from splitgraph.ingestion.csv.common import pad_csv_row


def parse_boolean(boolean: str):
Expand All @@ -27,6 +28,14 @@ def parse_bigint(integer: str):
return result


def parse_json(json_s: str):
# Avoid false positives for strings like " 123" that json.loads can
# parse as JSON but PostgreSQL can't (fall back to strings).
if "{" not in json_s:
raise ValueError("Not a JSON object")
json.loads(json_s)


_CONVERTERS: List[Tuple[str, Callable]] = [
("timestamp", parse_dt),
("date", parse_date),
Expand All @@ -35,7 +44,7 @@ def parse_bigint(integer: str):
("bigint", parse_bigint),
("numeric", float),
("boolean", parse_boolean),
("json", json.loads),
("json", parse_json),
]


Expand All @@ -44,7 +53,7 @@ def _infer_column_schema(column_sample: Sequence[str]) -> str:
try:
seen_value = False
for c in column_sample:
if c == "":
if c == "" or c is None:
continue

seen_value = True
Expand All @@ -53,15 +62,15 @@ def _infer_column_schema(column_sample: Sequence[str]) -> str:
# columns that are just empty strings (they'll be a string).
if seen_value:
return candidate
except ValueError:
except (ValueError, TypeError):
continue

# No suitable conversion, fall back to varchar
return "character varying"


def infer_sg_schema(
sample: Sequence[Sequence[str]],
sample: Sequence[List[str]],
override_types: Optional[Dict[str, str]] = None,
primary_keys: Optional[List[str]] = None,
):
Expand All @@ -70,7 +79,12 @@ def infer_sg_schema(
result: TableSchema = []

header = sample[0]
columns = list(zip(*sample[1:]))

sample = [
pad_csv_row(row, num_cols=len(sample[0]), row_number=i) for i, row in enumerate(sample[1:])
]

columns = list(zip(*sample))
if len(columns) != len(header):
raise ValueError(
"Malformed CSV: header has %d columns, rows have %d columns"
Expand Down
17 changes: 17 additions & 0 deletions test/resources/ingestion/csv/grades.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"Last name", "First name", "SSN", "Test1", "Test2", "Test3", "Test4", "Final", "Grade"
"Alfalfa", "Aloysius", "123-45-6789", 40.0, 90.0, 100.0, 83.0, 49.0, "D-"
"Alfred", "University", "123-12-1234", 41.0, 97.0, 96.0, 97.0, 48.0, "D+"
"Gerty", "Gramma", "567-89-0123", 41.0, 80.0, 60.0, 40.0, 44.0, "C"
"Android", "Electric", "087-65-4321", 42.0, 23.0, 36.0, 45.0, 47.0, "B-"
"Bumpkin", "Fred", "456-78-9012", 43.0, 78.0, 88.0, 77.0, 45.0, "A-"
"Rubble", "Betty", "234-56-7890", 44.0, 90.0, 80.0, 90.0, 46.0, "C-"
"Noshow", "Cecil", "345-67-8901", 45.0, 11.0, -1.0, 4.0, 43.0, "F"
"Buff", "Bif", "632-79-9939", 46.0, 20.0, 30.0, 40.0, 50.0, "B+"
"Airpump", "Andrew", "223-45-6789", 49.0 1.0, 90.0, 100.0, 83.0, "A"
"Backus", "Jim", "143-12-1234", 48.0, 1.0, 97.0, 96.0, 97.0, "A+"
"Carnivore", "Art", "565-89-0123", 44.0, 1.0, 80.0, 60.0, 40.0, "D+"
"Dandy", "Jim", "087-75-4321", 47.0, 1.0, 23.0, 36.0, 45.0, "C+"
"Elephant", "Ima", "456-71-9012", 45.0, 1.0, 78.0, 88.0, 77.0, "B-"
"Franklin", "Benny", "234-56-2890", 50.0, 1.0, 90.0, 80.0, 90.0, "B-"
"George", "Boy", "345-67-3901", 40.0, 1.0, 11.0, -1.0, 4.0, "B"
"Heffalump", "Harvey", "632-79-9439", 30.0, 1.0, 20.0, 30.0, 40.0, "C"
60 changes: 60 additions & 0 deletions test/splitgraph/ingestion/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,63 @@ def test_csv_ignore_decoding_errors():
assert len(data) == 2
assert data[0] == ["name", "number"]
assert data[1] == ["TA��ÇÃOº", "17"]


def test_csv_grades():
# Test a grades.csv file from https://people.sc.fsu.edu/~jburkardt/data/csv/csv.html with
# multiple features:
# * Line 10 is missing a comma, which means the next field merges: test we pad it and
# still try to do best-effort inference
# * Fields/separators have commas

with open(os.path.join(INGESTION_RESOURCES_CSV, "grades.csv"), "rb") as f:
options = CSVOptions()
options, reader = make_csv_reader(f, options)

assert options.encoding == "utf-8"
assert options.header is True
assert options.delimiter == ","
assert options.quotechar == '"'

data = list(reader)
assert len(data) == 17
# Error on line 10 leads to two fields merging
assert len(data[9]) == 8
assert len(data[0]) == 9
assert data[0] == [
"Last name",
"First name",
"SSN",
"Test1",
"Test2",
"Test3",
"Test4",
"Final",
"Grade",
]

schema = generate_column_names(infer_sg_schema(data))
assert schema == [
TableColumn(
ordinal=1, name="Last name", pg_type="character varying", is_pk=False, comment=None
),
TableColumn(
ordinal=2, name="First name", pg_type="character varying", is_pk=False, comment=None
),
TableColumn(
ordinal=3, name="SSN", pg_type="character varying", is_pk=False, comment=None
),
# This field has to be a varchar because of line 10 (becomes "49.0 1.0")
TableColumn(
ordinal=4, name="Test1", pg_type="character varying", is_pk=False, comment=None
),
TableColumn(ordinal=5, name="Test2", pg_type="numeric", is_pk=False, comment=None),
TableColumn(ordinal=6, name="Test3", pg_type="numeric", is_pk=False, comment=None),
TableColumn(ordinal=7, name="Test4", pg_type="numeric", is_pk=False, comment=None),
TableColumn(
ordinal=8, name="Final", pg_type="character varying", is_pk=False, comment=None
),
TableColumn(
ordinal=9, name="Grade", pg_type="character varying", is_pk=False, comment=None
),
]