Skip to content

Commit

Permalink
Preliminary sqlite ingestion support (continued)
Browse files Browse the repository at this point in the history
  • Loading branch information
neumark committed Feb 27, 2023
1 parent 9a4efdc commit bd9c723
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 63 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test_and_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: "recursive"
- name: Setup Python 3.8
- name: Setup Python 3.9
uses: actions/setup-python@v2
with:
python-version: "3.8"
python-version: "3.9"
- uses: actions/cache@v1
with:
path: ~/.cache/pip
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ repos:
language_version: python3
types_or: [cython, pyi, python]
args: ["--profile", "black", "--filter-files"]
minimum_pre_commit_version: '2.9.2'
additional_dependencies: ['isort==5.10.1']
minimum_pre_commit_version: "2.9.2"
additional_dependencies: ["isort==5.10.1"]
124 changes: 65 additions & 59 deletions splitgraph/ingestion/sqlite/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import contextlib
import json
import itertools
import os
import sqlite3
import tempfile
from collections.abc import Generator
from contextlib import contextmanager
from copy import deepcopy
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast

import requests
Expand All @@ -14,25 +13,15 @@
from splitgraph.core.types import (
Credentials,
IntrospectionResult,
MountError,
Params,
TableColumn,
TableInfo,
TableParams,
TableSchema,
)
from splitgraph.hooks.data_source.base import LoadableDataSource
from splitgraph.hooks.data_source.fdw import (
ForeignDataWrapperDataSource,
import_foreign_schema,
)
from splitgraph.hooks.data_source.utils import merge_jsonschema
from splitgraph.ingestion.common import IngestionAdapter, build_commandline_help
from splitgraph.ingestion.csv.common import dump_options, load_options

if TYPE_CHECKING:
from splitgraph.engine.postgres.engine import PostgresEngine
from splitgraph.engine.postgres.psycopg import PsycopgEngine

LIST_TABLES_QUERY = """
SELECT * FROM
Expand All @@ -44,38 +33,47 @@
ORDER BY 1,2;
"""

# from https://stackoverflow.com/a/16696317
def download_file(url: str, local_fh: str) -> None:
# NOTE the stream=True parameter below
# based on https://stackoverflow.com/a/16696317
def download_file(url: str, local_fh: tempfile._TemporaryFileWrapper) -> int:
total_bytes_written = 0
with requests.get(url, stream=True, verify=os.environ["SSL_CERT_FILE"]) as r:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=8192):
# If you have chunk encoded response uncomment if
# and set chunk_size parameter to None.
# if chunk:
total_bytes_written += local_fh.write(chunk)
return total_bytes_written


@contextmanager
def minio_file(url: str) -> str:
def minio_file(url: str) -> Generator[str, None, None]:
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as local_fh:
size = download_file(url, local_fh)
print("Downloaded %s bytes" % size)
download_file(url, local_fh)
yield local_fh.name
os.remove(local_fh.name)


def query(sqlite_filename: str, query: str) -> List[Any]:
with contextlib.closing(sqlite3.connect(sqlite_filename)) as con:
with contextlib.closing(con.cursor()) as cursor:
cursor.execute(query)
return cursor.fetchall()
def query_connection(
con: sqlite3.Connection, sql: str, parameters: Optional[Dict[str, str]] = None
) -> List[Any]:
with contextlib.closing(con.cursor()) as cursor:
cursor.execute(sql, parameters or {})
return cursor.fetchall()


@contextmanager
def db_from_minio(url: str) -> Generator[sqlite3.Connection, None, None]:
with minio_file(url) as f:
with contextlib.closing(sqlite3.connect(f)) as con:
yield con


def sql_quote_str(s: str) -> str:
return s.replace("'", "''").replace('"', '""')


# partly based on https://stackoverflow.com/questions/1942586/comparison-of-database-column-types-in-mysql-postgresql-and-sqlite-cross-map
def sqlite_to_postgres_type(sqlite_type: str) -> str:
if sqlite_type == "DATETIME":
return "TIMESTAMP WITHOUT TIME ZONE"
# from: https://www.sqlite.org/datatype3.html#determination_of_column_affinity
# If the declared type contains the string "INT" then it is assigned INTEGER affinity.
if "INT" in sqlite_type:
Expand All @@ -89,10 +87,29 @@ def sqlite_to_postgres_type(sqlite_type: str) -> str:
# If the declared type for a column contains any of the strings "REAL", "FLOA", or "DOUB" then the column has REAL affinity.
if "REAL" in sqlite_type or "FLOA" in sqlite_type or "DOUB" in sqlite_type:
return "REAL"
# Otherwise, the affinity is NUMERIC.
# Otherwise, the affinity is NUMERIC. TODO: Precision and scale
return "NUMERIC"


def sqlite_connection_to_introspection_result(con: sqlite3.Connection) -> IntrospectionResult:
schema: IntrospectionResult = cast(IntrospectionResult, {})
for (
table_name,
column_id,
column_name,
column_type,
_notnull,
_default_value,
pk,
) in query_connection(con, LIST_TABLES_QUERY, {}):
table = schema.get(table_name, ([], TableParams({})))
cast(Tuple[List[TableColumn], TableParams], table)[0].append(
TableColumn(column_id + 1, column_name, sqlite_to_postgres_type(column_type), pk != 0)
)
schema[table_name] = table
return schema


class SQLiteDataSource(LoadableDataSource):

table_params_schema: Dict[str, Any] = {"type": "object", "properties": {}}
Expand All @@ -115,35 +132,28 @@ class SQLiteDataSource(LoadableDataSource):
_icon_file = "sqlite.svg" # TODO

def _load(self, schema: str, tables: Optional[TableInfo] = None):
import pprint

pprint.pprint("_load", schema, tables, self.params.get("url"))

def introspect(self) -> IntrospectionResult:
with minio_file(self.params.get("url")) as f:
schema: IntrospectionResult = {}
for (
table_name,
column_id,
column_name,
column_type,
_notnull,
_default_value,
pk,
) in query(f, LIST_TABLES_QUERY):
table = schema.get(table_name, ([], {}))
# TODO: convert column_type to postgres type
table[0].append(
TableColumn(
column_id + 1, column_name, sqlite_to_postgres_type(column_type), pk == 1
)
with db_from_minio(str(self.params.get("url"))) as con:
introspection_result = sqlite_connection_to_introspection_result(con)
for table_name, table_definition in introspection_result.items():
schema_spec = cast(Tuple[List[TableColumn], TableParams], table_definition)[0]
self.engine.create_table(
schema=schema,
table=table_name,
schema_spec=schema_spec,
)
schema[table_name] = table
import pprint
table_contents = query_connection(
con, 'SELECT * FROM "{}"'.format(sql_quote_str(table_name)) # nosec
)
self.engine.run_sql_batch(
SQL("INSERT INTO {0}.{1} ").format(Identifier(schema), Identifier(table_name))
+ SQL(" VALUES (" + ",".join(itertools.repeat("%s", len(schema_spec))) + ")"),
# TODO: break this up into multiple batches for larger sqlite files
table_contents,
) # nosec

pprint.pprint("introspect %s " % f)
pprint.pprint(schema)
return schema
def introspect(self) -> IntrospectionResult:
with db_from_minio(str(self.params.get("url"))) as con:
return sqlite_connection_to_introspection_result(con)

def __init__(
self,
Expand All @@ -152,10 +162,6 @@ def __init__(
params: Params,
tables: Optional[TableInfo] = None,
):
import json
import pprint

pprint.pprint("SQLiteDataSource.__init__ %s " % json.dumps(params))
super().__init__(engine, credentials, params, tables)

@classmethod
Expand Down

0 comments on commit bd9c723

Please sign in to comment.