Skip to content

Commit

Permalink
Merge pull request #673 from splitgraph/bugfix/fix-socrata-data-source
Browse files Browse the repository at this point in the history
Fix Socrata data source introspection
  • Loading branch information
mildbyte committed Apr 27, 2022
2 parents e4e4fe0 + c13b5a2 commit d1e74d6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 60 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build_and_test_and_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
shell: bash
run: |
pip install dist/splitgraph-*-py3-none-any.whl
pip install pyinstaller
pip install pyinstaller==4.10
pyinstaller -F splitgraph.spec
dist/sgr.exe --version
- name: Upload binary as artifact
Expand All @@ -175,7 +175,7 @@ jobs:
- name: Build the binary
run: |
pip install dist/splitgraph-*-py3-none-any.whl
pip install pyinstaller
pip install pyinstaller==4.10
pyinstaller -F splitgraph.spec
dist/sgr --version
- name: Smoke test the binary
Expand Down Expand Up @@ -213,7 +213,7 @@ jobs:
- name: Build the single-file binary
run: |
pip install dist/splitgraph-*-py3-none-any.whl
pip install pyinstaller
pip install pyinstaller==4.10
pyinstaller -F splitgraph.spec
dist/sgr --version
- name: Upload single-file binary as artifact
Expand Down
10 changes: 9 additions & 1 deletion splitgraph/ingestion/socrata/mount.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from copy import deepcopy
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

from psycopg2.sql import SQL, Identifier

Expand Down Expand Up @@ -129,6 +129,14 @@ def _create_foreign_tables(
self.engine.run_sql(SQL(";").join(mount_statements), mount_args)
return []

def _get_foreign_table_options(self, schema: str) -> List[Tuple[str, Dict[str, Any]]]:
# We rename the "socrata_id" param into "table" when mounting, so rename it back
# here

options = super()._get_foreign_table_options(schema)

return [(t, {"socrata_id": d["table"]}) for t, d in options]

def get_raw_url(
self, tables: Optional[TableInfo] = None, expiry: int = 3600
) -> Dict[str, List[Tuple[str, str]]]:
Expand Down
141 changes: 85 additions & 56 deletions test/splitgraph/ingestion/test_socrata.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,36 @@ class S(NamedTuple):
_long_name_col_sg = "name_long_column_to_test_that_we_map_to_socrata_column_names_co"
_col_map = {_long_name_col_sg: _long_name_col}

_EXPECTED_SCHEMA = [
TableColumn(ordinal=1, name=":id", pg_type="text", is_pk=False, comment="Socrata column ID"),
TableColumn(
ordinal=2,
name="full_or_part_time",
pg_type="text",
is_pk=False,
comment="Whether the employee was employed full- (F) or part-time (P).",
),
TableColumn(ordinal=3, name="hourly_rate", pg_type="numeric", is_pk=False, comment=mock.ANY),
TableColumn(ordinal=4, name="salary_or_hourly", pg_type="text", is_pk=False, comment=mock.ANY),
TableColumn(
ordinal=5,
name="job_titles",
pg_type="text",
is_pk=False,
comment="Title of employee at the time when the data was updated.",
),
TableColumn(ordinal=6, name="typical_hours", pg_type="numeric", is_pk=False, comment=mock.ANY),
TableColumn(ordinal=7, name="annual_salary", pg_type="numeric", is_pk=False, comment=mock.ANY),
TableColumn(ordinal=8, name=_long_name_col_sg, pg_type="text", is_pk=False, comment=mock.ANY),
TableColumn(
ordinal=9,
name="department",
pg_type="text",
is_pk=False,
comment="Department where employee worked.",
),
]


@pytest.mark.parametrize(
("quals", "expected"),
Expand Down Expand Up @@ -129,47 +159,9 @@ def test_socrata_mounting(local_engine_empty):
},
)

assert local_engine_empty.get_full_table_schema("test/pg_mount", "some_table") == [
TableColumn(
ordinal=1, name=":id", pg_type="text", is_pk=False, comment="Socrata column ID"
),
TableColumn(
ordinal=2,
name="full_or_part_time",
pg_type="text",
is_pk=False,
comment="Whether the employee was employed full- (F) or part-time (P).",
),
TableColumn(
ordinal=3, name="hourly_rate", pg_type="numeric", is_pk=False, comment=mock.ANY
),
TableColumn(
ordinal=4, name="salary_or_hourly", pg_type="text", is_pk=False, comment=mock.ANY
),
TableColumn(
ordinal=5,
name="job_titles",
pg_type="text",
is_pk=False,
comment="Title of employee at the time when the data was updated.",
),
TableColumn(
ordinal=6, name="typical_hours", pg_type="numeric", is_pk=False, comment=mock.ANY
),
TableColumn(
ordinal=7, name="annual_salary", pg_type="numeric", is_pk=False, comment=mock.ANY
),
TableColumn(
ordinal=8, name=_long_name_col_sg, pg_type="text", is_pk=False, comment=mock.ANY
),
TableColumn(
ordinal=9,
name="department",
pg_type="text",
is_pk=False,
comment="Department where employee worked.",
),
]
assert (
local_engine_empty.get_full_table_schema("test/pg_mount", "some_table") == _EXPECTED_SCHEMA
)

assert local_engine_empty.run_sql(
"SELECT option_value FROM information_schema.foreign_table_options "
Expand Down Expand Up @@ -354,6 +346,50 @@ def test_socrata_column_deduplication():
]


def test_socrata_data_source_raw_url():
engine = Mock()
data_source = SocrataDataSource(
engine=engine,
params={"domain": "data.healthcare.gov"},
tables={"dataset": ([], {"socrata_id": "7h6f-vws8"})},
credentials={},
)

assert data_source.get_raw_url() == {
"dataset": [
(
"text/csv",
"https://data.healthcare.gov/api/views/7h6f-vws8/rows.csv?accessType=DOWNLOAD",
)
]
}


def test_socrata_data_source_introspection_full_domain(local_engine_empty):
with open(os.path.join(INGESTION_RESOURCES, "socrata/find_datasets.json"), "r") as f:
socrata_meta = json.load(f)

socrata = MagicMock(spec=Socrata)
socrata.datasets.return_value = socrata_meta

data_source = SocrataDataSource(
engine=local_engine_empty,
params={"domain": "data.cityofchicago.org"},
tables={},
credentials={},
)

with mock.patch("sodapy.Socrata", return_value=socrata):
result = data_source.introspect()
assert len(result) == 1
assert "current_employee_names_salaries_and_position_xzkq_xp2w" in result
schema, params = result["current_employee_names_salaries_and_position_xzkq_xp2w"]
assert schema == _EXPECTED_SCHEMA
assert params == {"socrata_id": "xzkq-xp2w"}


# These smoke tests rely on the Socrata API being available / datasets not going away, but good to
# test some popular datasets to make sure the mounting works end-to-end.
@pytest.mark.parametrize(
("domain", "dataset_id"),
[
Expand All @@ -363,8 +399,6 @@ def test_socrata_column_deduplication():
],
)
def test_socrata_smoke(domain, dataset_id, local_engine_empty):
# This relies on the Socrata API being available, but good to smoke test some popular datasets
# to make sure the mounting works end-to-end.
try:
mount(
"socrata_mount",
Expand All @@ -377,20 +411,15 @@ def test_socrata_smoke(domain, dataset_id, local_engine_empty):
local_engine_empty.delete_schema("socrata_mount")


def test_socrata_data_source_raw_url():
engine = Mock()
def test_socrata_data_source_introspection_smoke(local_engine_empty):
data_source = SocrataDataSource(
engine=engine,
params={"domain": "data.healthcare.gov"},
tables={"dataset": ([], {"socrata_id": "7h6f-vws8"})},
engine=local_engine_empty,
params={"domain": "data.cityofnewyork.us"},
tables={"some_table": ([], {"socrata_id": "8wbx-tsch"})},
credentials={},
)

assert data_source.get_raw_url() == {
"dataset": [
(
"text/csv",
"https://data.healthcare.gov/api/views/7h6f-vws8/rows.csv?accessType=DOWNLOAD",
)
]
}
result = data_source.introspect()
schema, params = result["some_table"]
assert len(schema) > 1
assert params == {"socrata_id": "8wbx-tsch"}

0 comments on commit d1e74d6

Please sign in to comment.