From 759aab4778c082c11928dd7c523c6bd0f745841a Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 13 Jul 2021 15:54:59 +0200 Subject: [PATCH 01/14] implement client side bulk metadata upload --- splitgraph/cloud/__init__.py | 145 +++++++++++++++++++++++++++++++- splitgraph/commandline/cloud.py | 30 ++++--- 2 files changed, 164 insertions(+), 11 deletions(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index 1ca147b9..c33465ea 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -13,6 +13,7 @@ from pydantic import BaseModel from requests import HTTPError from requests.models import Response +import click from splitgraph.__version__ import __version__ from splitgraph.cloud.models import ( @@ -57,6 +58,69 @@ def get_headers(): } } +_BULK_UPSERT_REPO_PROFILES_QUERY = """mutation BulkUpsertRepoProfilesMutation( + $namespaces: [String!] + $repositories: [String!] + $descriptions: [String] + $readmes: [String] + $licenses: [String] + $metadatas: [JSON] +) { + __typename + bulkUpsertRepoProfiles( + input: { + namespaces: $namespaces + repositories: $repositories + descriptions: $descriptions + readmes: $readmes + licenses: $licenses + metadatas: $metadatas + } + ) { + clientMutationId + __typename + } +} +""" + +_BULK_UPDATE_REPO_SOURCES_QUERY = """mutation BulkUpdateRepoSourcesMutation( + $namespaces: [String!] + $repositories: [String!] + $sources: [DatasetSourceInput] +) { + __typename + bulkUpdateRepoSources( + input: { + namespaces: $namespaces + repositories: $repositories + sources: $sources + } + ) { + clientMutationId + __typename + } +} +""" + +_BULK_UPSERT_REPO_TOPICS_QUERY = """mutation BulkUpsertRepoTopicsMutation( + $namespaces: [String!] + $repositories: [String!] + $topics: [String] +) { + __typename + bulkUpsertRepoTopics( + input: { + namespaces: $namespaces + repositories: $repositories + topics: $topics + } + ) { + clientMutationId + __typename + } +} +""" + _PROFILE_UPSERT_QUERY = """mutation UpsertRepoProfile( $namespace: String! $repository: String! @@ -633,7 +697,7 @@ def _gql(self, query: Dict, endpoint=None, handle_errors=False) -> requests.Resp return result @staticmethod - def _prepare_upsert_metadata_gql(namespace: str, repository: str, metadata: Metadata, v1=False): + def _validate_metadata(namespace: str, repository: str, metadata: Metadata): # Pre-flight validation if metadata.description and len(metadata.description) > 160: raise ValueError("The description should be 160 characters or shorter!") @@ -669,6 +733,12 @@ def _prepare_upsert_metadata_gql(namespace: str, repository: str, metadata: Meta if "readme" in variables and isinstance(variables["readme"], dict): variables["readme"] = variables["readme"]["text"] + return variables + + @staticmethod + def _prepare_upsert_metadata_gql(namespace: str, repository: str, metadata: Metadata, v1=False): + variables = GQLAPIClient._validate_metadata(namespace, repository, metadata) + gql_query = _PROFILE_UPSERT_QUERY if v1: gql_query = gql_query.replace("createRepoTopicsAgg", "createRepoTopic").replace( @@ -706,6 +776,79 @@ def upsert_metadata(self, namespace: str, repository: str, metadata: Metadata): ) return response + def bulk_upsert_metadata(self, namespace_list: List[str], repository_list: List[str], metadata_list: List[Metadata]): + repo_profiles: Dict[str, List[Any]] = dict( + namespaces=namespace_list, + repositories=repository_list, + descriptions=[], + readmes=[], + licenses=[], + metadata=[] + ) + repo_sources: Dict[str, List[Any]] = dict( + namespaces=[], + repositories=[], + sources=[] + ) + repo_topics: Dict[str, List[Any]] = dict( + namespaces=[], + repositories=[], + topics=[] + ) + + # populate mutation payloads + for ind, metadata in enumerate(metadata_list): + validated_metadata = GQLAPIClient._validate_metadata( + namespace_list[ind], + repository_list[ind], + metadata + ) + + repo_profiles["namespaces"].append(namespace_list[ind]) + repo_profiles["repositories"].append(repository_list[ind]) + repo_profiles["descriptions"].append(validated_metadata.get("description")) + repo_profiles["readmes"].append(validated_metadata.get("readme")) + repo_profiles["licenses"].append(validated_metadata.get("license")) + repo_profiles["metadata"].append(validated_metadata.get("metadata")) + + # flatten sources, which will be aggregated on the server side + if len(validated_metadata.get("sources", [])) > 0: + for source in validated_metadata["sources"]: + repo_sources["namespaces"].append(namespace_list[ind]) + repo_sources["repositories"].append(repository_list[ind]) + repo_sources["sources"].append(source) + + # flatten topics, which will be aggregated on the server side + if len(validated_metadata.get("topics", [])) > 0: + for topic in validated_metadata["topics"]: + repo_topics["namespaces"].append(namespace_list[ind]) + repo_topics["repositories"].append(repository_list[ind]) + repo_topics["topics"].append(topic) + + # bulk upsert repo profiles metadata + repo_profiles_query = { + "operationName": "BulkUpsertRepoProfilesMutation", + "variables": repo_profiles, + "query": _BULK_UPSERT_REPO_PROFILES_QUERY, + } + self._gql(repo_profiles_query) + + # bulk update sources on repo profiles + repo_sources_query = { + "operationName": "BulkUpdateRepoSourcesMutation", + "variables": repo_sources, + "query": _BULK_UPDATE_REPO_SOURCES_QUERY, + } + self._gql(repo_sources_query) + + # bulk upsert repo topics + repo_topics_query = { + "operationName": "BulkUpsertRepoTopicsMutation", + "variables": repo_topics, + "query": _BULK_UPSERT_REPO_TOPICS_QUERY, + } + self._gql(repo_topics_query) + def upsert_readme(self, namespace: str, repository: str, readme: str): return self.upsert_metadata(namespace, repository, Metadata(readme=readme)) diff --git a/splitgraph/commandline/cloud.py b/splitgraph/commandline/cloud.py index bcb0f1a5..80e60115 100644 --- a/splitgraph/commandline/cloud.py +++ b/splitgraph/commandline/cloud.py @@ -669,16 +669,26 @@ def load_c(remote, readme_dir, repositories_file, limit_repositories): r for r in repositories if f"{r.namespace}/{r.repository}" in limit_repositories ] - with tqdm(repositories) as t: - for repository in t: - t.set_description(f"{repository.namespace}/{repository.repository}") - if repository.external: - rest_client.upsert_external( - repository.namespace, repository.repository, repository.external, credential_map - ) - if repository.metadata: - metadata = _prepare_metadata(repository.metadata, readme_basedir=readme_dir) - gql_client.upsert_metadata(repository.namespace, repository.repository, metadata) + # with tqdm(repositories) as t: + # for repository in t: + # t.set_description(f"{repository.namespace}/{repository.repository}") + # if repository.external: + # rest_client.upsert_external( + # repository.namespace, repository.repository, repository.external, credential_map + # ) + + logging.info("Uploading metadata...") + namespace_list = [] + repository_list = [] + metadata_list = [] + for repository in repositories: + if repository.metadata: + namespace_list.append(repository.namespace) + repository_list.append(repository.repository) + + metadata = _prepare_metadata(repository.metadata, readme_basedir=readme_dir) + metadata_list.append(metadata) + gql_client.bulk_upsert_metadata(namespace_list, repository_list, metadata_list) def _build_credential_map(auth_client, credentials=None): From 1a2f24a634d211a4e6364bdcd354dce538e6183d Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 13 Jul 2021 16:02:26 +0200 Subject: [PATCH 02/14] uncomment external repo upload; remove unused import --- splitgraph/cloud/__init__.py | 1 - splitgraph/commandline/cloud.py | 14 +++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index c33465ea..03f32876 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -13,7 +13,6 @@ from pydantic import BaseModel from requests import HTTPError from requests.models import Response -import click from splitgraph.__version__ import __version__ from splitgraph.cloud.models import ( diff --git a/splitgraph/commandline/cloud.py b/splitgraph/commandline/cloud.py index 80e60115..db0533ee 100644 --- a/splitgraph/commandline/cloud.py +++ b/splitgraph/commandline/cloud.py @@ -669,13 +669,13 @@ def load_c(remote, readme_dir, repositories_file, limit_repositories): r for r in repositories if f"{r.namespace}/{r.repository}" in limit_repositories ] - # with tqdm(repositories) as t: - # for repository in t: - # t.set_description(f"{repository.namespace}/{repository.repository}") - # if repository.external: - # rest_client.upsert_external( - # repository.namespace, repository.repository, repository.external, credential_map - # ) + with tqdm(repositories) as t: + for repository in t: + t.set_description(f"{repository.namespace}/{repository.repository}") + if repository.external: + rest_client.upsert_external( + repository.namespace, repository.repository, repository.external, credential_map + ) logging.info("Uploading metadata...") namespace_list = [] From ecba4a3d2f12fe1c0b5adcad5db0beaaa8f80e18 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 14 Jul 2021 10:06:01 +0200 Subject: [PATCH 03/14] hook new method bulk upsert into existing error handling --- splitgraph/cloud/__init__.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index 03f32876..217f61a3 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -789,7 +789,7 @@ def bulk_upsert_metadata(self, namespace_list: List[str], repository_list: List[ repositories=[], sources=[] ) - repo_topics: Dict[str, List[Any]] = dict( + repo_topics: Dict[str, List[str]] = dict( namespaces=[], repositories=[], topics=[] @@ -803,8 +803,6 @@ def bulk_upsert_metadata(self, namespace_list: List[str], repository_list: List[ metadata ) - repo_profiles["namespaces"].append(namespace_list[ind]) - repo_profiles["repositories"].append(repository_list[ind]) repo_profiles["descriptions"].append(validated_metadata.get("description")) repo_profiles["readmes"].append(validated_metadata.get("readme")) repo_profiles["licenses"].append(validated_metadata.get("license")) @@ -824,29 +822,39 @@ def bulk_upsert_metadata(self, namespace_list: List[str], repository_list: List[ repo_topics["repositories"].append(repository_list[ind]) repo_topics["topics"].append(topic) - # bulk upsert repo profiles metadata + self._bulk_upsert_repo_profiles(repo_profiles) + self._bulk_upsert_repo_sources(repo_sources) + self._bulk_upsert_repo_topics(repo_topics) + + @handle_gql_errors + def _bulk_upsert_repo_profiles(self, repo_profiles: Dict[str, List[Any]]): repo_profiles_query = { "operationName": "BulkUpsertRepoProfilesMutation", "variables": repo_profiles, "query": _BULK_UPSERT_REPO_PROFILES_QUERY, } - self._gql(repo_profiles_query) + response = self._gql(repo_profiles_query) + return response - # bulk update sources on repo profiles + @handle_gql_errors + def _bulk_upsert_repo_sources(self, repo_sources: Dict[str, List[Any]]): repo_sources_query = { "operationName": "BulkUpdateRepoSourcesMutation", "variables": repo_sources, "query": _BULK_UPDATE_REPO_SOURCES_QUERY, } - self._gql(repo_sources_query) + response = self._gql(repo_sources_query) + return response - # bulk upsert repo topics + @handle_gql_errors + def _bulk_upsert_repo_topics(self, repo_topics: Dict[str, List[str]]): repo_topics_query = { "operationName": "BulkUpsertRepoTopicsMutation", "variables": repo_topics, "query": _BULK_UPSERT_REPO_TOPICS_QUERY, } - self._gql(repo_topics_query) + response = self._gql(repo_topics_query) + return response def upsert_readme(self, namespace: str, repository: str, readme: str): return self.upsert_metadata(namespace, repository, Metadata(readme=readme)) From d83ab8076243f11ba727fea71ef6bbd74b53b945 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 14 Jul 2021 10:27:25 +0200 Subject: [PATCH 04/14] fix leftover metadatas argument to mutation --- splitgraph/cloud/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index 217f61a3..a80c6682 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -63,7 +63,7 @@ def get_headers(): $descriptions: [String] $readmes: [String] $licenses: [String] - $metadatas: [JSON] + $metadata: [JSON] ) { __typename bulkUpsertRepoProfiles( @@ -73,7 +73,7 @@ def get_headers(): descriptions: $descriptions readmes: $readmes licenses: $licenses - metadatas: $metadatas + metadata: $metadata } ) { clientMutationId From b0081fb26e7bc58391a78e9e98fd92deb812a306 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 16 Jul 2021 11:41:37 +0200 Subject: [PATCH 05/14] Serialize batches of repos externals for bulk upsert on server side --- splitgraph/cloud/__init__.py | 19 +++++++++---------- splitgraph/cloud/models.py | 25 +++++++++++++++++++++++-- splitgraph/commandline/cloud.py | 27 ++++++++++++++++++++++----- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index 1ca147b9..713d84b7 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -7,7 +7,7 @@ import warnings from functools import wraps from json import JSONDecodeError -from typing import Callable, List, Union, Tuple, cast, Optional, Dict, Any, Type, TypeVar +from typing import Callable, List, Union, Tuple, cast, Optional, Dict, Any, Type, TypeVar, Set import requests from pydantic import BaseModel @@ -16,6 +16,8 @@ from splitgraph.__version__ import __version__ from splitgraph.cloud.models import ( + Credential, + CredentialID, Metadata, MetadataResponse, External, @@ -26,7 +28,8 @@ UpdateExternalCredentialRequest, AddExternalCredentialRequest, UpdateExternalCredentialResponse, - AddExternalRepositoryRequest, + ExternalRepository, + AddExternalRepositoriesRequest ) from splitgraph.commandline.engine import patch_and_save_config from splitgraph.config import create_config_dict, get_singleton, CONFIG @@ -567,16 +570,12 @@ def ensure_external_credential( assert credential return credential.credential_id - def upsert_external( + def bulk_upsert_external( self, - namespace: str, - repository: str, - external: External, - credentials_map: Optional[Dict[str, str]] = None, + repositories: List[ExternalRepository], + credential_ids: Set[CredentialID] ): - request = AddExternalRepositoryRequest.from_external( - namespace, repository, external, credentials_map - ) + request = AddExternalRepositoriesRequest(repositories=repositories, credential_ids=credential_ids) self._perform_request("/add", self.access_token, request, endpoint=self.externals_endpoint) diff --git a/splitgraph/cloud/models.py b/splitgraph/cloud/models.py index 39ea6a83..382fb743 100644 --- a/splitgraph/cloud/models.py +++ b/splitgraph/cloud/models.py @@ -2,7 +2,7 @@ Definitions for the repositories.yml format that's used to batch-populate a Splitgraph catalog with repositories and their metadata. """ -from typing import Dict, List, Optional, Any, Union +from typing import Dict, List, Optional, Any, Union, Set from pydantic import BaseModel, Field @@ -27,12 +27,26 @@ class Credential(BaseModel): data: Dict[str, Any] +class CredentialID(BaseModel): + credential_id: str + plugin: str + + def __hash__(self): + return hash((self.credential_id, self.plugin)) + + +class IngestionSchedule(BaseModel): + schedule: str + enabled = True + + class External(BaseModel): credential_id: Optional[str] credential: Optional[str] plugin: str params: Dict[str, Any] tables: Dict[str, Table] + schedule: Optional[IngestionSchedule] # Models for the catalog metadata (description, README, topics etc) @@ -218,7 +232,7 @@ class ExternalTableRequest(BaseModel): schema_: Optional[Dict[str, str]] = Field(alias="schema") -class AddExternalRepositoryRequest(BaseModel): +class ExternalRepository(BaseModel): namespace: str repository: str plugin_name: str @@ -226,6 +240,7 @@ class AddExternalRepositoryRequest(BaseModel): is_live: bool tables: Optional[Dict[str, ExternalTableRequest]] credential_id: Optional[str] + schedule: Optional[IngestionSchedule] @classmethod def from_external( @@ -259,4 +274,10 @@ def from_external( }, credential_id=credential_id, is_live=True, + schedule=external.schedule ) + + +class AddExternalRepositoriesRequest(BaseModel): + repositories: List[ExternalRepository] + credential_ids: Set[CredentialID] diff --git a/splitgraph/commandline/cloud.py b/splitgraph/commandline/cloud.py index bcb0f1a5..783422b0 100644 --- a/splitgraph/commandline/cloud.py +++ b/splitgraph/commandline/cloud.py @@ -14,7 +14,7 @@ from click import wrap_text from tqdm import tqdm -from splitgraph.cloud.models import Metadata, RepositoriesYAML +from splitgraph.cloud.models import Metadata, RepositoriesYAML, ExternalRepository, CredentialID from splitgraph.commandline.common import ( ImageType, RepositoryType, @@ -669,13 +669,30 @@ def load_c(remote, readme_dir, repositories_file, limit_repositories): r for r in repositories if f"{r.namespace}/{r.repository}" in limit_repositories ] + logging.info("Uploading images...") + external_repositories = [] + credential_ids = set() + for repository in repositories: + if repository.external: + external_repository = ExternalRepository.from_external( + repository.namespace, + repository.repository, + repository.external, + credential_map + ) + external_repositories.append(external_repository) + + if external_repository.credential_id is not None: + credential_id = CredentialID( + credential_id=external_repository.credential_id, + plugin=external_repository.plugin_name + ) + credential_ids.add(credential_id) + rest_client.bulk_upsert_external(repositories=external_repositories, credential_ids=credential_ids) + with tqdm(repositories) as t: for repository in t: t.set_description(f"{repository.namespace}/{repository.repository}") - if repository.external: - rest_client.upsert_external( - repository.namespace, repository.repository, repository.external, credential_map - ) if repository.metadata: metadata = _prepare_metadata(repository.metadata, readme_basedir=readme_dir) gql_client.upsert_metadata(repository.namespace, repository.repository, metadata) From abff94b158ddf5e92a960766683ef8e610f87812 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 16 Jul 2021 14:19:58 +0200 Subject: [PATCH 06/14] Remove obsolete CredentialID type --- splitgraph/cloud/__init__.py | 6 ++---- splitgraph/cloud/models.py | 9 --------- splitgraph/commandline/cloud.py | 11 ++--------- 3 files changed, 4 insertions(+), 22 deletions(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index 713d84b7..0ca04b3c 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -17,7 +17,6 @@ from splitgraph.__version__ import __version__ from splitgraph.cloud.models import ( Credential, - CredentialID, Metadata, MetadataResponse, External, @@ -572,10 +571,9 @@ def ensure_external_credential( def bulk_upsert_external( self, - repositories: List[ExternalRepository], - credential_ids: Set[CredentialID] + repositories: List[ExternalRepository] ): - request = AddExternalRepositoriesRequest(repositories=repositories, credential_ids=credential_ids) + request = AddExternalRepositoriesRequest(repositories=repositories) self._perform_request("/add", self.access_token, request, endpoint=self.externals_endpoint) diff --git a/splitgraph/cloud/models.py b/splitgraph/cloud/models.py index 382fb743..395899b6 100644 --- a/splitgraph/cloud/models.py +++ b/splitgraph/cloud/models.py @@ -27,14 +27,6 @@ class Credential(BaseModel): data: Dict[str, Any] -class CredentialID(BaseModel): - credential_id: str - plugin: str - - def __hash__(self): - return hash((self.credential_id, self.plugin)) - - class IngestionSchedule(BaseModel): schedule: str enabled = True @@ -280,4 +272,3 @@ def from_external( class AddExternalRepositoriesRequest(BaseModel): repositories: List[ExternalRepository] - credential_ids: Set[CredentialID] diff --git a/splitgraph/commandline/cloud.py b/splitgraph/commandline/cloud.py index 783422b0..b38175ed 100644 --- a/splitgraph/commandline/cloud.py +++ b/splitgraph/commandline/cloud.py @@ -14,7 +14,7 @@ from click import wrap_text from tqdm import tqdm -from splitgraph.cloud.models import Metadata, RepositoriesYAML, ExternalRepository, CredentialID +from splitgraph.cloud.models import Metadata, RepositoriesYAML, ExternalRepository from splitgraph.commandline.common import ( ImageType, RepositoryType, @@ -681,14 +681,7 @@ def load_c(remote, readme_dir, repositories_file, limit_repositories): credential_map ) external_repositories.append(external_repository) - - if external_repository.credential_id is not None: - credential_id = CredentialID( - credential_id=external_repository.credential_id, - plugin=external_repository.plugin_name - ) - credential_ids.add(credential_id) - rest_client.bulk_upsert_external(repositories=external_repositories, credential_ids=credential_ids) + rest_client.bulk_upsert_external(repositories=external_repositories) with tqdm(repositories) as t: for repository in t: From ea7f41c2a2eeab46313a8798a8a0a7fd204b1cad Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 19 Jul 2021 11:44:46 +0200 Subject: [PATCH 07/14] Rename the bulk endpoint for BC --- splitgraph/cloud/__init__.py | 6 +++--- splitgraph/cloud/models.py | 6 +++--- splitgraph/commandline/cloud.py | 5 ++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index 0ca04b3c..c158662a 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -27,7 +27,7 @@ UpdateExternalCredentialRequest, AddExternalCredentialRequest, UpdateExternalCredentialResponse, - ExternalRepository, + AddExternalRepositoryRequest, AddExternalRepositoriesRequest ) from splitgraph.commandline.engine import patch_and_save_config @@ -571,10 +571,10 @@ def ensure_external_credential( def bulk_upsert_external( self, - repositories: List[ExternalRepository] + repositories: List[AddExternalRepositoryRequest] ): request = AddExternalRepositoriesRequest(repositories=repositories) - self._perform_request("/add", self.access_token, request, endpoint=self.externals_endpoint) + self._perform_request("/bulk-add", self.access_token, request, endpoint=self.externals_endpoint) def AuthAPIClient(*args, **kwargs): diff --git a/splitgraph/cloud/models.py b/splitgraph/cloud/models.py index 395899b6..63525811 100644 --- a/splitgraph/cloud/models.py +++ b/splitgraph/cloud/models.py @@ -2,7 +2,7 @@ Definitions for the repositories.yml format that's used to batch-populate a Splitgraph catalog with repositories and their metadata. """ -from typing import Dict, List, Optional, Any, Union, Set +from typing import Dict, List, Optional, Any, Union from pydantic import BaseModel, Field @@ -224,7 +224,7 @@ class ExternalTableRequest(BaseModel): schema_: Optional[Dict[str, str]] = Field(alias="schema") -class ExternalRepository(BaseModel): +class AddExternalRepositoryRequest(BaseModel): namespace: str repository: str plugin_name: str @@ -271,4 +271,4 @@ def from_external( class AddExternalRepositoriesRequest(BaseModel): - repositories: List[ExternalRepository] + repositories: List[AddExternalRepositoryRequest] diff --git a/splitgraph/commandline/cloud.py b/splitgraph/commandline/cloud.py index b38175ed..a9e15a3f 100644 --- a/splitgraph/commandline/cloud.py +++ b/splitgraph/commandline/cloud.py @@ -14,7 +14,7 @@ from click import wrap_text from tqdm import tqdm -from splitgraph.cloud.models import Metadata, RepositoriesYAML, ExternalRepository +from splitgraph.cloud.models import Metadata, RepositoriesYAML, AddExternalRepositoryRequest from splitgraph.commandline.common import ( ImageType, RepositoryType, @@ -671,10 +671,9 @@ def load_c(remote, readme_dir, repositories_file, limit_repositories): logging.info("Uploading images...") external_repositories = [] - credential_ids = set() for repository in repositories: if repository.external: - external_repository = ExternalRepository.from_external( + external_repository = AddExternalRepositoryRequest.from_external( repository.namespace, repository.repository, repository.external, From bb9a14af97063d3c58a8fee6c97d50b3e1988801 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 19 Jul 2021 13:30:12 +0000 Subject: [PATCH 08/14] Make Json SQL param coercion global, even for nested objects --- splitgraph/engine/postgres/engine.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/splitgraph/engine/postgres/engine.py b/splitgraph/engine/postgres/engine.py index 2ec81503..0bb1d083 100644 --- a/splitgraph/engine/postgres/engine.py +++ b/splitgraph/engine/postgres/engine.py @@ -59,6 +59,8 @@ # the connection property otherwise from psycopg2._psycopg import connection as Connection +psycopg2.extensions.register_adapter(dict, Json) + _AUDIT_SCHEMA = "splitgraph_audit" _AUDIT_TRIGGER = "resources/static/audit_trigger.sql" _PUSH_PULL = "resources/static/splitgraph_api.sql" @@ -505,7 +507,7 @@ def run_sql( with connection.cursor(**cursor_kwargs) as cur: try: self.notices = [] - cur.execute(statement, _convert_vals(arguments) if arguments else None) + cur.execute(statement, arguments) if connection.notices: self.notices = connection.notices[:] del connection.notices[:] @@ -597,7 +599,7 @@ def run_sql_batch( batches = _paginate_by_size( cur, statement, - (_convert_vals(a) for a in arguments), + arguments, max_size=API_MAX_QUERY_LENGTH, ) for batch in batches: @@ -1597,13 +1599,6 @@ def _convert_audit_change( _KIND = {"I": 0, "D": 1, "U": 2} -def _convert_vals(vals: Any) -> Any: - """Psycopg returns jsonb objects as dicts/lists but doesn't actually accept them directly - as a query param (or in the case of lists coerces them into an array. - Hence, we have to wrap them in the Json datatype when doing a dump + load.""" - return [Json(v) if isinstance(v, dict) else v for v in vals] - - def _generate_where_clause(table: str, cols: List[str], table_2: str) -> Composed: return SQL(" AND ").join( SQL("{}.{} = {}.{}").format( From 7456de2bc4941a61e25dc2ba274a68468910c9b8 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 20 Jul 2021 12:12:26 +0200 Subject: [PATCH 09/14] Fix bulk upsert metadata tests --- test/splitgraph/commandline/http_fixtures.py | 88 +++++++------------ .../commandline/test_cloud_metadata.py | 15 +++- 2 files changed, 46 insertions(+), 57 deletions(-) diff --git a/test/splitgraph/commandline/http_fixtures.py b/test/splitgraph/commandline/http_fixtures.py index 1af12a28..84245b76 100644 --- a/test/splitgraph/commandline/http_fixtures.py +++ b/test/splitgraph/commandline/http_fixtures.py @@ -1,6 +1,7 @@ import json -from splitgraph.cloud import _PROFILE_UPSERT_QUERY +from splitgraph.cloud import _PROFILE_UPSERT_QUERY, _BULK_UPSERT_REPO_PROFILES_QUERY, _BULK_UPDATE_REPO_SOURCES_QUERY, \ + _BULK_UPSERT_REPO_TOPICS_QUERY REMOTE = "remote_engine" AUTH_ENDPOINT = "http://some-auth-service.example.com" @@ -379,66 +380,45 @@ def add_external_repo(request, uri, response_headers): ] -def upsert_repository_metadata(request, uri, response_headers): +def assert_repository_profiles(request): data = json.loads(request.body) - assert data["operationName"] == "UpsertRepoProfile" - assert data["query"] == _PROFILE_UPSERT_QUERY + assert data["operationName"] == "BulkUpsertRepoProfilesMutation" + assert data["query"] == _BULK_UPSERT_REPO_PROFILES_QUERY variables = data["variables"] - if variables["namespace"] == "someuser" and variables["repository"] == "somerepo_1": - assert variables == { - "namespace": "someuser", - "repository": "somerepo_1", - "readme": "# Readme 1", - "description": "Repository Description 1", - "topics": [], - "sources": [ - { - "anchor": "test data source", - "href": "https://example.com", - "isCreator": True, - "isSameAs": False, - } - ], - "license": "Public Domain", - } - elif variables["namespace"] == "someuser" and variables["repository"] == "somerepo_2": - assert variables == { - "description": "Another Repository", - "namespace": "someuser", - "repository": "somerepo_2", - } - elif variables["namespace"] == "otheruser" and variables["repository"] == "somerepo_2": - assert variables == { - "description": "Repository Description 2", - "namespace": "otheruser", - "readme": "# Readme 2", - "repository": "somerepo_2", - "sources": [{"anchor": "test data source", "href": "https://example.com"}], - "topics": ["topic_1", "topic_2"], - } - else: - raise AssertionError( - "Unknown repository %s/%s!" % (variables["namespace"], variables["repository"]) - ) - - success_response = { - "data": { - "__typename": "Mutation", - "upsertRepoProfileByNamespaceAndRepository": { - "clientMutationId": None, - "__typename": "UpsertRepoProfilePayload", - }, - } - } + assert variables["namespaces"] == ["otheruser", "someuser", "someuser"] + assert variables["repositories"] == ["somerepo_2", "somerepo_1", "somerepo_2"] + assert variables["readmes"] == ["# Readme 2", "# Readme 1", None] + assert variables["descriptions"] == ["Repository Description 2", "Repository Description 1", "Another Repository"] + assert variables["licenses"] == [None, "Public Domain", None] + assert variables["metadata"] == [None, None, None] - return [ - 200, - response_headers, - json.dumps(success_response), + +def assert_repository_sources(request): + data = json.loads(request.body) + assert data["operationName"] == "BulkUpdateRepoSourcesMutation" + assert data["query"] == _BULK_UPDATE_REPO_SOURCES_QUERY + + variables = data["variables"] + assert variables["namespaces"] == ["otheruser", "someuser"] + assert variables["repositories"] == ["somerepo_2", "somerepo_1"] + assert variables["sources"] == [ + {"anchor": "test data source", "href": "https://example.com"}, + {"anchor": "test data source", "href": "https://example.com", "isCreator": True, "isSameAs": False} ] +def assert_repository_topics(request): + data = json.loads(request.body) + assert data["operationName"] == "BulkUpsertRepoTopicsMutation" + assert data["query"] == _BULK_UPSERT_REPO_TOPICS_QUERY + + variables = data["variables"] + assert variables["namespaces"] == ["otheruser", "otheruser"] + assert variables["repositories"] == ["somerepo_2", "somerepo_2"] + assert variables["topics"] == ["topic_1", "topic_2"] + + def register_user(request, uri, response_headers): assert json.loads(request.body) == { "username": "someuser", diff --git a/test/splitgraph/commandline/test_cloud_metadata.py b/test/splitgraph/commandline/test_cloud_metadata.py index c96ea283..9bf67624 100644 --- a/test/splitgraph/commandline/test_cloud_metadata.py +++ b/test/splitgraph/commandline/test_cloud_metadata.py @@ -34,7 +34,9 @@ update_external_credential, add_external_credential, add_external_repo, - upsert_repository_metadata, + assert_repository_profiles, + assert_repository_sources, + assert_repository_topics, AUTH_ENDPOINT, ) from test.splitgraph.conftest import RESOURCES @@ -368,7 +370,7 @@ def test_commandline_load(): ) httpretty.register_uri( - httpretty.HTTPretty.POST, GQL_ENDPOINT + "/", body=upsert_repository_metadata + httpretty.HTTPretty.POST, GQL_ENDPOINT + "/" ) def get_remote_param(remote, param): @@ -398,4 +400,11 @@ def get_remote_param(remote, param): catch_exceptions=False, ) assert result.exit_code == 0 - assert "someuser/somerepo_1" in result.output + + reqs = httpretty.latest_requests() + + assert_repository_topics(reqs.pop()) + reqs.pop() # discard duplicate request + assert_repository_sources(reqs.pop()) + reqs.pop() # discard duplicate request + assert_repository_profiles(reqs.pop()) From b70e06386f87c39dd6cb1acb3fe6eca44ee0c8a3 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 20 Jul 2021 14:32:55 +0200 Subject: [PATCH 10/14] Fix formatting with pre-commit --- splitgraph/cloud/__init__.py | 22 ++++++------------- splitgraph/commandline/cloud.py | 4 +++- test/splitgraph/commandline/http_fixtures.py | 21 ++++++++++++++---- .../commandline/test_cloud_metadata.py | 4 +--- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index a80c6682..26469d36 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -775,32 +775,24 @@ def upsert_metadata(self, namespace: str, repository: str, metadata: Metadata): ) return response - def bulk_upsert_metadata(self, namespace_list: List[str], repository_list: List[str], metadata_list: List[Metadata]): + def bulk_upsert_metadata( + self, namespace_list: List[str], repository_list: List[str], metadata_list: List[Metadata] + ): repo_profiles: Dict[str, List[Any]] = dict( namespaces=namespace_list, repositories=repository_list, descriptions=[], readmes=[], licenses=[], - metadata=[] - ) - repo_sources: Dict[str, List[Any]] = dict( - namespaces=[], - repositories=[], - sources=[] - ) - repo_topics: Dict[str, List[str]] = dict( - namespaces=[], - repositories=[], - topics=[] + metadata=[], ) + repo_sources: Dict[str, List[Any]] = dict(namespaces=[], repositories=[], sources=[]) + repo_topics: Dict[str, List[str]] = dict(namespaces=[], repositories=[], topics=[]) # populate mutation payloads for ind, metadata in enumerate(metadata_list): validated_metadata = GQLAPIClient._validate_metadata( - namespace_list[ind], - repository_list[ind], - metadata + namespace_list[ind], repository_list[ind], metadata ) repo_profiles["descriptions"].append(validated_metadata.get("description")) diff --git a/splitgraph/commandline/cloud.py b/splitgraph/commandline/cloud.py index db0533ee..152a396e 100644 --- a/splitgraph/commandline/cloud.py +++ b/splitgraph/commandline/cloud.py @@ -22,6 +22,7 @@ Color, ) from splitgraph.commandline.engine import patch_and_save_config, inject_config_into_engines +from splitgraph.core.output import pluralise # Hardcoded database name for the Splitgraph DDN (ddn instead of sgregistry) _DDN_DBNAME = "ddn" @@ -677,7 +678,7 @@ def load_c(remote, readme_dir, repositories_file, limit_repositories): repository.namespace, repository.repository, repository.external, credential_map ) - logging.info("Uploading metadata...") + logging.info("Updating metadata...") namespace_list = [] repository_list = [] metadata_list = [] @@ -689,6 +690,7 @@ def load_c(remote, readme_dir, repositories_file, limit_repositories): metadata = _prepare_metadata(repository.metadata, readme_basedir=readme_dir) metadata_list.append(metadata) gql_client.bulk_upsert_metadata(namespace_list, repository_list, metadata_list) + logging.info(f"Updated metadata for {pluralise('repository', len(repository_list))}") def _build_credential_map(auth_client, credentials=None): diff --git a/test/splitgraph/commandline/http_fixtures.py b/test/splitgraph/commandline/http_fixtures.py index 84245b76..7c54a5ba 100644 --- a/test/splitgraph/commandline/http_fixtures.py +++ b/test/splitgraph/commandline/http_fixtures.py @@ -1,7 +1,11 @@ import json -from splitgraph.cloud import _PROFILE_UPSERT_QUERY, _BULK_UPSERT_REPO_PROFILES_QUERY, _BULK_UPDATE_REPO_SOURCES_QUERY, \ - _BULK_UPSERT_REPO_TOPICS_QUERY +from splitgraph.cloud import ( + _PROFILE_UPSERT_QUERY, + _BULK_UPSERT_REPO_PROFILES_QUERY, + _BULK_UPDATE_REPO_SOURCES_QUERY, + _BULK_UPSERT_REPO_TOPICS_QUERY, +) REMOTE = "remote_engine" AUTH_ENDPOINT = "http://some-auth-service.example.com" @@ -389,7 +393,11 @@ def assert_repository_profiles(request): assert variables["namespaces"] == ["otheruser", "someuser", "someuser"] assert variables["repositories"] == ["somerepo_2", "somerepo_1", "somerepo_2"] assert variables["readmes"] == ["# Readme 2", "# Readme 1", None] - assert variables["descriptions"] == ["Repository Description 2", "Repository Description 1", "Another Repository"] + assert variables["descriptions"] == [ + "Repository Description 2", + "Repository Description 1", + "Another Repository", + ] assert variables["licenses"] == [None, "Public Domain", None] assert variables["metadata"] == [None, None, None] @@ -404,7 +412,12 @@ def assert_repository_sources(request): assert variables["repositories"] == ["somerepo_2", "somerepo_1"] assert variables["sources"] == [ {"anchor": "test data source", "href": "https://example.com"}, - {"anchor": "test data source", "href": "https://example.com", "isCreator": True, "isSameAs": False} + { + "anchor": "test data source", + "href": "https://example.com", + "isCreator": True, + "isSameAs": False, + }, ] diff --git a/test/splitgraph/commandline/test_cloud_metadata.py b/test/splitgraph/commandline/test_cloud_metadata.py index 9bf67624..f87eaa66 100644 --- a/test/splitgraph/commandline/test_cloud_metadata.py +++ b/test/splitgraph/commandline/test_cloud_metadata.py @@ -369,9 +369,7 @@ def test_commandline_load(): body=add_external_repo, ) - httpretty.register_uri( - httpretty.HTTPretty.POST, GQL_ENDPOINT + "/" - ) + httpretty.register_uri(httpretty.HTTPretty.POST, GQL_ENDPOINT + "/") def get_remote_param(remote, param): if param == "SG_AUTH_API": From afa3150551182b417d2e31c0ee1dbe8383a0ab0d Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 21 Jul 2021 13:02:58 +0200 Subject: [PATCH 11/14] Fix test for bulk external image upload --- test/splitgraph/commandline/http_fixtures.py | 53 ++++++++++--------- .../commandline/test_cloud_metadata.py | 2 +- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/test/splitgraph/commandline/http_fixtures.py b/test/splitgraph/commandline/http_fixtures.py index 7c54a5ba..7a4df032 100644 --- a/test/splitgraph/commandline/http_fixtures.py +++ b/test/splitgraph/commandline/http_fixtures.py @@ -337,28 +337,9 @@ def add_external_credential(request, uri, response_headers): def add_external_repo(request, uri, response_headers): data = json.loads(request.body) - if data["namespace"] == "someuser" and data["repository"] == "somerepo_1": - assert data == { - "namespace": "someuser", - "repository": "somerepo_1", - "plugin_name": "plugin_2", - "params": {}, - "is_live": True, - "tables": {}, - "credential_id": "123e4567-e89b-12d3-a456-426655440000", - } - elif data["namespace"] == "someuser" and data["repository"] == "somerepo_2": - assert data == { - "namespace": "someuser", - "repository": "somerepo_2", - "plugin_name": "plugin_3", - "params": {}, - "is_live": True, - "tables": {}, - "credential_id": "00000000-0000-0000-0000-000000000000", - } - elif data["namespace"] == "otheruser" and data["repository"] == "somerepo_2": - assert data == { + assert data["repositories"] is not None + assert data["repositories"] == [ + { "credential_id": "98765432-aaaa-bbbb-a456-000000000000", "is_live": True, "namespace": "otheruser", @@ -373,14 +354,34 @@ def add_external_repo(request, uri, response_headers): "table_2": {"options": {"param_1": "val_2"}, "schema": {}}, "table_3": {"options": {}, "schema": {"id": "text", "val": "text"}}, }, - } - else: - raise AssertionError("Unknown repository %s/%s!" % (data["namespace"], data["repository"])) + "schedule": None, + }, + { + "namespace": "someuser", + "repository": "somerepo_1", + "plugin_name": "plugin_2", + "params": {}, + "is_live": True, + "tables": {}, + "credential_id": "123e4567-e89b-12d3-a456-426655440000", + "schedule": None, + }, + { + "namespace": "someuser", + "repository": "somerepo_2", + "plugin_name": "plugin_3", + "params": {}, + "is_live": True, + "tables": {}, + "credential_id": "00000000-0000-0000-0000-000000000000", + "schedule": None, + }, + ] return [ 200, response_headers, - json.dumps({"live_image_hash": "abcdef12" * 8}), + json.dumps({"live_image_hashes": ["abcdef12" * 8, "ghijkl34" * 8, "mnoprs56" * 8]}), ] diff --git a/test/splitgraph/commandline/test_cloud_metadata.py b/test/splitgraph/commandline/test_cloud_metadata.py index f87eaa66..4b298f0b 100644 --- a/test/splitgraph/commandline/test_cloud_metadata.py +++ b/test/splitgraph/commandline/test_cloud_metadata.py @@ -365,7 +365,7 @@ def test_commandline_load(): httpretty.register_uri( httpretty.HTTPretty.POST, - QUERY_ENDPOINT + "/api/external/add", + QUERY_ENDPOINT + "/api/external/bulk-add", body=add_external_repo, ) From cb24f1417a921ae8f6a32d05d635aa4fd4f2557d Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 22 Jul 2021 17:22:17 +0200 Subject: [PATCH 12/14] Reformat models with black --- splitgraph/cloud/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitgraph/cloud/models.py b/splitgraph/cloud/models.py index 63525811..266a59f6 100644 --- a/splitgraph/cloud/models.py +++ b/splitgraph/cloud/models.py @@ -266,7 +266,7 @@ def from_external( }, credential_id=credential_id, is_live=True, - schedule=external.schedule + schedule=external.schedule, ) From 6c6a581a5585b5108b7537af04bc1c4904a9df87 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 26 Jul 2021 09:40:18 +0200 Subject: [PATCH 13/14] Make pluralise work for nouns that end in 'y' --- splitgraph/cloud/__init__.py | 2 +- splitgraph/core/output.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/splitgraph/cloud/__init__.py b/splitgraph/cloud/__init__.py index 00ce2a34..4843c6d9 100644 --- a/splitgraph/cloud/__init__.py +++ b/splitgraph/cloud/__init__.py @@ -7,7 +7,7 @@ import warnings from functools import wraps from json import JSONDecodeError -from typing import Callable, List, Union, Tuple, cast, Optional, Dict, Any, Type, TypeVar, Set +from typing import Callable, List, Union, Tuple, cast, Optional, Dict, Any, Type, TypeVar import requests from pydantic import BaseModel diff --git a/splitgraph/core/output.py b/splitgraph/core/output.py index 2d9064a2..fc2f750a 100644 --- a/splitgraph/core/output.py +++ b/splitgraph/core/output.py @@ -21,6 +21,8 @@ def pretty_size(size: Union[int, float]) -> str: def pluralise(word: str, number: int) -> str: """1 banana, 2 bananas""" + if word.endswith("y"): + return "%d %s" % (number, word if number == 1 else word[:-1] + "ies") return "%d %s%s" % (number, word, "" if number == 1 else "s") From db3b0fc67e4eb9d30f9e21ff515b730b67541035 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Mon, 26 Jul 2021 09:32:11 +0100 Subject: [PATCH 14/14] Fix PostgREST example test (new image expects explicit command/args) --- examples/postgrest/docker-compose.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/postgrest/docker-compose.yml b/examples/postgrest/docker-compose.yml index af400656..0d5adb3f 100644 --- a/examples/postgrest/docker-compose.yml +++ b/examples/postgrest/docker-compose.yml @@ -13,6 +13,9 @@ services: - 5432 postgrest: image: postgrest/postgrest:latest + command: + - postgrest + - /etc/postgrest.conf ports: - '0.0.0.0:8080:8080' volumes: