From e11728ae1788db78f0115f20e28bc2cf61c97346 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Hombashi Date: Sun, 21 Feb 2016 23:08:19 +0900 Subject: [PATCH] initial commit --- .gitignore | 65 +++ .travis.yml | 22 + MANIFEST.in | 11 + README.rst | 194 ++++++++ docs_requirements.txt | 1 + requirements.txt | 2 + setup.cfg | 5 + setup.py | 46 ++ simplesqlite/__init__.py | 865 +++++++++++++++++++++++++++++++++ test/test_simplesqlite.py | 987 ++++++++++++++++++++++++++++++++++++++ test_requirements.txt | 2 + tox.ini | 9 + 12 files changed, 2209 insertions(+) create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 MANIFEST.in create mode 100644 README.rst create mode 100644 docs_requirements.txt create mode 100644 requirements.txt create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 simplesqlite/__init__.py create mode 100644 test/test_simplesqlite.py create mode 100644 test_requirements.txt create mode 100644 tox.ini diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..10d97cc --- /dev/null +++ b/.gitignore @@ -0,0 +1,65 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ +/.idea +convert_readme.sh +desktop.ini +README.md +*.ipynb +readme_converter.py +README_HEADER.rst +upgrade.sh diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..52de459 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,22 @@ +language: python + +env: + - TOXENV=py25 + - TOXENV=py26 + - TOXENV=py27 + - TOXENV=py33 + - TOXENV=py34 + +os: + - linux + +install: + - pip install tox + - pip install coveralls + +script: + - tox + - python setup.py test --addopts "-v --cov simplesqlite --cov-report term-missing" + +after_success: + - coveralls diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..20c183b --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,11 @@ +include LICENSE +include README.rst +include setup.cfg +include tox.ini +include requirements.txt +include test_requirements.txt +include docs_requirements.txt +recursive-include test * + +global-exclude __pycache__/* +global-exclude *.pyc diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..317899d --- /dev/null +++ b/README.rst @@ -0,0 +1,194 @@ +**SimpleSQLite** + +.. contents:: Table of contents + :backlinks: top + :local: + +About +===== + +SimpleSQLite is a python library to simplify the table creation and data +insertion in SQLite database. + +Feature +======= + +- Automatic table creation from data +- Support various data type for insertion : dictionary, namedtuple, + list and tuple + +Usage +===== + +Create table +------------ + +Sample +~~~~~~ + +.. code:: python + + from simplesqlite import SimpleSQLite + + con = SimpleSQLite("sample.sqlite") + + # create table ----- + data_matrix = [ + [1, 1.1, "aaa", 1, 1], + [2, 2.2, "bbb", 2.2, 2.2], + [3, 3.3, "ccc", 3, "ccc"], + ] + + con.create_table_with_data( + table_name="sample_table", + attribute_name_list=["attr_a", "attr_b", "attr_c", "attr_d", "attr_e"], + data_matrix=data_matrix, + index_attribute_list=["attr_a"] + ) + + # display values ----- + result = con.select(select="*", table="sample_table") + for record in result.fetchall(): + print record + + # display type for each column ----- + query = "SELECT DISTINCT TYPEOF(attr_a),TYPEOF(attr_b),TYPEOF(attr_c),TYPEOF(attr_d),TYPEOF(attr_e) FROM sample_table" + result = con.execute_query(query) + print result.fetchall() + +Output +~~~~~~ + +.. code:: console + + (1, 1.1, u'aaa', 1.0, u'1') + (2, 2.2, u'bbb', 2.2, u'2.2') + (3, 3.3, u'ccc', 3.0, u'ccc') + [(u'integer', u'real', u'text', u'real', u'text')] + +insert +------ + +Dictionary +~~~~~~~~~~ + +.. code:: python + + con.insert( + "sample_table", + { + "attr_a": 4, + "attr_b": 4.4, + "attr_c": "ddd", + "attr_d": 4.44, + "attr_e": "hoge", + } + ) + con.insert_many( + "sample_table", + [ + { + "attr_a": 5, + "attr_b": 5.5, + "attr_c": "eee", + "attr_d": 5.55, + "attr_e": "foo", + }, + { + "attr_a": 6, + "attr_c": "fff", + }, + ] + ) + result = con.select(select="*", table="sample_table") + for record in result.fetchall(): + print record + +.. code:: console + + (1, 1.1, u'aaa', 1.0, u'1') + (2, 2.2, u'bbb', 2.2, u'2.2') + (3, 3.3, u'ccc', 3.0, u'ccc') + (4, 4.4, u'ddd', 4.44, u'hoge') + (5, 5.5, u'eee', 5.55, u'foo') + (6, u'NULL', u'fff', u'NULL', u'NULL') + +list/tuple/namedtuple +~~~~~~~~~~~~~~~~~~~~~ + +.. code:: python + + from collections import namedtuple + + SampleTuple = namedtuple( + "SampleTuple", "attr_a attr_b attr_c attr_d attr_e") + + con.insert("sample_table", [7, 7.7, "fff", 7.77, "bar"]) + con.insert_many( + "sample_table", + [ + (8, 8.8, "ggg", 8.88, "foobar"), + SampleTuple(9, 9.9, "ggg", 9.99, "hogehoge"), + ] + ) + + result = con.select(select="*", table="sample_table") + for record in result.fetchall(): + print record + +.. code:: console + + (1, 1.1, u'aaa', 1.0, u'1') + (2, 2.2, u'bbb', 2.2, u'2.2') + (3, 3.3, u'ccc', 3.0, u'ccc') + (4, 4.4, u'ddd', 4.44, u'hoge') + (5, 5.5, u'eee', 5.55, u'foo') + (6, u'NULL', u'fff', u'NULL', u'NULL') + (7, 7.7, u'fff', 7.77, u'bar') + (8, 8.8, u'ggg', 8.88, u'foobar') + (9, 9.9, u'ggg', 9.99, u'hogehoge') + +Misc +---- + +In default ``__table_configuration__`` table will automatically +create/insert-data each time of table creation. +``__table_configuration__`` table contains each table information, such +as the value type of columns, columns has index or not. + +Sample value of ``__table_configuration__`` table is as follows. + ++-----------------+-------------------+---------------+--------------+ +| table\_name | attribute\_name | value\_type | has\_index | ++=================+===================+===============+==============+ +| sample\_table | attr\_a | INTEGER | 1 | ++-----------------+-------------------+---------------+--------------+ +| sample\_table | attr\_b | REAL | 0 | ++-----------------+-------------------+---------------+--------------+ +| sample\_table | attr\_c | TEXT | 0 | ++-----------------+-------------------+---------------+--------------+ +| sample\_table | attr\_d | REAL | 0 | ++-----------------+-------------------+---------------+--------------+ +| sample\_table | attr\_e | TEXT | 0 | ++-----------------+-------------------+---------------+--------------+ + +Dependencies +============ + +Python 2.5+ or 3.3+ + +- `DataPropery `__ +- Used to extract data types. +- `six `__ + +Test dependencies +----------------- + +- `pytest `__ +- `pytest-runner `__ +- `tox `__ + +Documentation +============= + +Under construction diff --git a/docs_requirements.txt b/docs_requirements.txt new file mode 100644 index 0000000..483a4e9 --- /dev/null +++ b/docs_requirements.txt @@ -0,0 +1 @@ +sphinx_rtd_theme diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d1e1638 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +DataProperty +six<=1.8.0 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..2bc7ce7 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[aliases] +test=pytest + +[wheel] +universal = 1 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9a13b1c --- /dev/null +++ b/setup.py @@ -0,0 +1,46 @@ +from __future__ import with_statement +import setuptools + + +with open("README.rst") as fp: + long_description = fp.read() + +with open("requirements.txt") as f: + install_requires = [line.strip() for line in f if line.strip()] + +with open("test_requirements.txt") as f: + tests_require = [line.strip() for line in f if line.strip()] + +setuptools.setup( + name="SimpleSQLite", + version="0.1.0", + author="Tsuyoshi Hombashi", + author_email="gogogo.vm@gmail.com", + url="https://github.com/thombashi/SimpleSQLite", + description="Python library to simplify the table creation/insertion in SQLite database", + long_description=long_description, + license="MIT License", + include_package_data=True, + packages=setuptools.find_packages(exclude=['test*']), + install_requires=install_requires, + setup_requires=["pytest-runner"], + tests_require=tests_require, + classifiers=[ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.5", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.3", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Topic :: Database", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) diff --git a/simplesqlite/__init__.py b/simplesqlite/__init__.py new file mode 100644 index 0000000..73323e1 --- /dev/null +++ b/simplesqlite/__init__.py @@ -0,0 +1,865 @@ +# encoding: utf-8 + +''' +@author: Tsuyoshi Hombashi +''' + +import logging +import os +import re +import sys +import sqlite3 + +import dataproperty +import six +from six.moves import range +from six.moves import map + + +MEMORY_DB_NAME = ":memory:" +__INVALID_PATH_CHAR = '\:*?"<>|' + + +def validate_file_path(file_path): + if dataproperty.is_empty_string(file_path): + raise ValueError("path is null") + + if file_path == MEMORY_DB_NAME: + return + + match = re.search("[%s]" % ( + re.escape(__INVALID_PATH_CHAR)), os.path.basename(file_path)) + if match is not None: + raise ValueError( + "invalid char found in file name: '%s'" % ( + re.escape(match.group()))) + + +def validate_table_name(name): + """ + :param str name: Table name to validate. + :raises ValueError: If ``name`` is empty. + """ + + if dataproperty.is_empty_string(name): + raise ValueError("table name is empty") + + +class SqlQuery: + __RE_SANITIZE = re.compile("[%s]" % (re.escape("%/()[]<>.:;'!\# -+=\n\r"))) + __RE_TABLE_STR = re.compile("[%s]" % (re.escape("%()-+/."))) + __RE_TO_ATTR_STR = re.compile("[%s0-9\s#]" % (re.escape("[%()-+/.]"))) + __RE_SPACE = re.compile("[\s]+") + + __VALID_WHERE_OPERATION_LIST = [ + "=", "==", "!=", "<>", ">", ">=", "<", "<=", + ] + + @classmethod + def sanitize(cls, query_item): + """ + :return: String that exclude invalid char. + :rtype: str + """ + + return cls.__RE_SANITIZE.sub("", query_item) + + @classmethod + def to_table_str(cls, name): + """ + :return: String that suitable for table name. + :rtype: str + """ + + if cls.__RE_TABLE_STR.search(name): + return "[%s]" % (name) + + if cls.__RE_SPACE.search(name): + return "'%s'" % (name) + + return name + + @classmethod + def to_attr_str(cls, name, operation_query=""): + if cls.__RE_TO_ATTR_STR.search(name): + sql_name = "[%s]" % (name) + elif name == "join": + sql_name = "[%s]" % (name) + else: + sql_name = name + + if dataproperty.is_not_empty_string(operation_query): + sql_name = "%s(%s)" % (operation_query, sql_name) + + return sql_name + + @classmethod + def to_attr_str_list(cls, name_list, operation_query=None): + if dataproperty.is_empty_string(operation_query): + return map(cls.to_attr_str, name_list) + + return [ + "%s(%s)" % (operation_query, cls.to_attr_str(name)) + for name in name_list + ] + + @classmethod + def to_value_str(cls, value): + if value is None: + return "NULL" + + if dataproperty.is_integer(value) or dataproperty.is_float(value): + return str(value) + + return "'%s'" % (value) + + @classmethod + def to_value_str_list(cls, value_list): + return map(cls.to_value_str, value_list) + + @classmethod + def make_select(cls, select, table, where=None, extra=None): + """ + SQLite query作成補助関数 + + :return: SQLite query string + :rtype: str + """ + + validate_table_name(table) + if dataproperty.is_empty_string(select): + raise ValueError("SELECT query is null") + + query_list = [ + "SELECT " + select, + "FROM " + cls.to_table_str(table), + ] + if dataproperty.is_not_empty_string(where): + query_list.append("WHERE " + where) + if dataproperty.is_not_empty_string(extra): + query_list.append(extra) + + return " ".join(query_list) + + @classmethod + def make_insert(cls, table_name, insert_tuple, is_insert_many=False): + """ + Make INSERT query. + + :param str table_name: Table name to insert data. + :param list/tuple insert_tuple: Insertion data. + :param bool is_insert_many: ``True`` if inserting multiple data. + :return: SQLite query. + :rtype: str + """ + + validate_table_name(table_name) + + table_name = cls.to_table_str(table_name) + + if dataproperty.is_empty_list_or_tuple(insert_tuple): + raise ValueError("empty insert list/tuple") + + if is_insert_many: + value_list = ['?' for _i in insert_tuple] + else: + value_list = [ + "'%s'" % (value) + if isinstance(value, six.string_types) and value != "NULL" + else str(value) + for value in insert_tuple + ] + + return "INSERT INTO %s VALUES (%s)" % ( + table_name, ",".join(value_list)) + + @classmethod + def make_update(cls, table, set_query, where=None): + validate_table_name(table) + if dataproperty.is_empty_string(set_query): + raise ValueError("SET query is null") + + query_list = [ + "UPDATE " + cls.to_table_str(table), + "SET " + set_query, + ] + if dataproperty.is_not_empty_string(where): + query_list.append("WHERE " + where) + + return " ".join(query_list) + + @classmethod + def make_where(cls, key, value, operation="="): + if operation not in cls.__VALID_WHERE_OPERATION_LIST: + raise ValueError("operation not supported: " + str(operation)) + + return "%s %s %s" % ( + cls.to_attr_str(key), operation, cls.to_value_str(value)) + + @classmethod + def make_where_in(cls, key, value_list): + return "%s IN (%s)" % ( + cls.to_attr_str(key), ", ".join(cls.to_value_str_list(value_list))) + + @classmethod + def make_where_not_in(cls, key, value_list): + return "%s NOT IN (%s)" % ( + cls.to_attr_str(key), ", ".join(cls.to_value_str_list(value_list))) + + +def copy_table(con_src, con_dst, table_name): + con_src.verify_table_existence(table_name) + + result = con_src.select(select="*", table=table_name) + if result is None: + return False + + value_matrix = result.fetchall() + + return con_dst.create_table_with_data( + table_name, + con_src.get_attribute_name_list(table_name), + value_matrix) + + +def append_table(con_src, con_dst, table_name): + con_src.verify_table_existence(table_name) + + result = con_src.select(select="*", table=table_name) + if result is None: + return False + + value_matrix = [value_list for value_list in result.fetchall()] + + if not con_dst.has_table(table_name): + con_dst.create_table_with_data( + table_name, con_src.get_attribute_name_list(table_name), + value_matrix) + else: + con_dst.insert_many(table_name, value_matrix) + + return True + + +# class --- + +class NullDatabaseConnectionError(Exception): + pass + + +class TableNotFoundError(Exception): + pass + + +class AttributeNotFoundError(Exception): + pass + + +class SimpleSQLite(object): + ''' + wrapper class of sqlite3 + ''' + + class TableConfiguration: + TABLE_NAME = "__table_configuration__" + ATTRIBUTE_NAME_LIST = [ + "table_name", "attribute_name", "value_type", "has_index", + ] + + @property + def database_path(self): + return self.__database_path + + @property + def connection(self): + return self.__connection + + @property + def mode(self): + return self.__mode + + def __init__( + self, database_path, mode="a", + is_create_table_config=True, profile=False): + self.__initialize_connection() + self.__is_profile = profile + self.__is_create_table_config = is_create_table_config + + self.connect(database_path, mode) + + def __del__(self): + self.close() + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def is_connected(self): + """ + :return: ``True`` if the connection to a database is valid. + :rtype: bool + """ + + try: + self.check_connection() + except NullDatabaseConnectionError: + return False + + return True + + def check_connection(self): + """ + :raises NullDatabaseConnectionError: + if not connected to a SQLite database file. + """ + + if self.connection is None: + raise NullDatabaseConnectionError("null database connection") + + if dataproperty.is_empty_string(self.database_path): + raise NullDatabaseConnectionError("null database file path") + + def connect(self, database_path, mode="a"): + """ + :param str mode: + "r": Open for read only. + "w": Open for read/write. Delete existing tables. + "a": Open for read/write. Append to the existing tables. + :raises ValueError: + - If ``mode`` is invalid. + + See also + :py:func:`__verify_sqlite_db_file() ` + :py:func:`validate_file_path() ` + """ + + self.close() + + if mode == "r": + self.__verify_sqlite_db_file(database_path) + elif mode in ["w", "a"]: + validate_file_path(database_path) + else: + raise ValueError("unknown connection mode: " + mode) + + self.__database_path = os.path.realpath(database_path) + self.__connection = sqlite3.connect(database_path) + self.__mode = mode + + if mode != "w": + return + + for table in self.get_table_name_list(): + self.drop_table(table) + + def execute_query(self, query, caller=None): + import time + + self.check_connection() + if dataproperty.is_empty_string(query): + return None + + if self.__is_profile: + exec_start_time = time.time() + + try: + result = self.connection.execute(query) + except sqlite3.OperationalError: + _, e, _ = sys.exc_info() # for python 2.5 compatibility + if caller is None: + caller = logging.getLogger().findCaller() + file_path, line_no, func_name = caller[:3] + message_list = [ + "failed to execute query at %s(%d) %s" % ( + file_path, line_no, func_name), + " - query: %s" % (query), + " - msg: %s" % (e), + " - db: %s" % (self.database_path), + ] + raise sqlite3.OperationalError(os.linesep.join(message_list)) + + if self.__is_profile: + self.__dict_query_count[query] = ( + self.__dict_query_count.get(query, 0) + 1) + + elapse_time = time.time() - exec_start_time + self.__dict_query_totalexectime[query] = ( + self.__dict_query_totalexectime.get(query, 0) + elapse_time) + + return result + + def select(self, select, table, where=None, extra=None): + query = SqlQuery.make_select(select, table, where, extra) + + return self.execute_query(query, logging.getLogger().findCaller()) + + def insert(self, table_name, insert_record): + self.__validate_access_permission(["w", "a"]) + + query = SqlQuery.make_insert(table_name, self.__to_record( + self.get_attribute_name_list(table_name), insert_record)) + self.execute_query(query, logging.getLogger().findCaller()) + + def insert_many(self, table_name, insert_record_list): + self.__validate_access_permission(["w", "a"]) + self.verify_table_existence(table_name) + + if dataproperty.is_empty_list_or_tuple(insert_record_list): + return + + record_list = self.__to_data_matrix( + self.get_attribute_name_list(table_name), insert_record_list) + + query = SqlQuery.make_insert( + table_name, record_list[0], is_insert_many=True) + + try: + self.connection.executemany(query, record_list) + except sqlite3.OperationalError: + _, e, _ = sys.exc_info() # for python 2.5 compatibility + caller = logging.getLogger().findCaller() + file_path, line_no, func_name = caller[:3] + raise sqlite3.OperationalError( + "%s(%d) %s: failed to execute query:\n" % ( + file_path, line_no, func_name) + + " query=%s\n" % (query) + + " msg='%s'\n" % (str(e)) + + " db=%s\n" % (self.database_path) + + " records=%s\n" % (record_list[:2]) + ) + + def update(self, table, set_query, where=None): + self.__validate_access_permission(["w", "a"]) + query = SqlQuery.make_update(table, set_query, where) + + return self.execute_query(query, logging.getLogger().findCaller()) + + def get_total_changes(self): + self.check_connection() + + return self.connection.total_changes + + def get_value(self, select, table, where=None, extra=None): + query = SqlQuery.make_select(select, table, where, extra) + result = self.execute_query(query, logging.getLogger().findCaller()) + if result is None: + return None + + return result.fetchone()[0] + + def get_table_name_list(self): + self.check_connection() + + query = "SELECT name FROM sqlite_master WHERE TYPE='table'" + result = self.execute_query(query, logging.getLogger().findCaller()) + if result is None: + return [] + + return self.__get_list_from_fetch(result.fetchall()) + + def get_attribute_name_list(self, table_name): + if not self.has_table(table_name): + raise TableNotFoundError("'%s' table not found in %s" % ( + table_name, self.database_path)) + + query = "SELECT * FROM '%s'" % (table_name) + result = self.execute_query(query, logging.getLogger().findCaller()) + + return self.__get_list_from_fetch(result.description) + + def get_attribute_type_list(self, table_name): + if not self.has_table(table_name): + raise TableNotFoundError("'%s' table not found in %s" % ( + table_name, self.database_path)) + + attribute_name_list = self.get_attribute_name_list(table_name) + query = "SELECT DISTINCT %s FROM '%s'" % ( + ",".join([ + "TYPEOF(%s)" % (SqlQuery.to_attr_str(attribute)) + for attribute in attribute_name_list]), + table_name) + result = self.execute_query(query, logging.getLogger().findCaller()) + + return result.fetchone() + + def get_profile(self, get_profile_count=50): + TN_SQL_PROFILE = "sql_profile" + + value_matrix = [] + for query, execute_time in six.iteritems(self.__dict_query_totalexectime): + call_count = self.__dict_query_count.get(query, 0) + value_list = [query, execute_time, call_count] + value_matrix.append(value_list) + + attribute_name_list = ["query", "execution_time", "count"] + con_tmp = connect_sqlite_db_mem() + try: + con_tmp.create_table_with_data( + TN_SQL_PROFILE, + attribute_name_list, + data_matrix=value_matrix) + except ValueError: + return [], [] + + try: + result = con_tmp.select( + select="%s,SUM(%s),SUM(%s)" % ( + "query", "execution_time", "count"), + table=TN_SQL_PROFILE, + extra="GROUP BY %s ORDER BY %s DESC LIMIT %d" % ( + "query", "execution_time", get_profile_count)) + except sqlite3.OperationalError: + return [], [] + if result is None: + return [], [] + + return attribute_name_list, result.fetchall() + + def has_table(self, table_name): + try: + validate_table_name(table_name) + except ValueError: + return False + + return table_name in self.get_table_name_list() + + def has_attribute(self, table_name, attribute_name): + self.verify_table_existence(table_name) + + if dataproperty.is_empty_string(attribute_name): + return False + + return attribute_name in self.get_attribute_name_list(table_name) + + def has_attribute_list(self, table_name, attribute_name_list): + if dataproperty.is_empty_list_or_tuple(attribute_name_list): + return False + + not_exist_field_list = [ + attribute_name + for attribute_name in attribute_name_list + if not self.has_attribute(table_name, attribute_name) + ] + + if len(not_exist_field_list) > 0: + return False + + return True + + def verify_table_existence(self, table_name): + """ + :raises TableNotFoundError: If table not found in the database + + See also + :py:func:`validate_table_name() ` + """ + + validate_table_name(table_name) + + if self.has_table(table_name): + return + + raise TableNotFoundError( + "'%s' table not found in %s" % (table_name, self.database_path)) + + def verify_attribute_existence(self, table_name, attribute_name): + """ + :raises AttributeNotFoundError: If attribute not found in the table + + See also + :py:func:`verify_table_existence() ` + """ + + self.verify_table_existence(table_name) + + if self.has_attribute(table_name, attribute_name): + return + + raise AttributeNotFoundError( + "'%s' attribute not found in '%s' table" % ( + attribute_name, table_name)) + + def drop_table(self, table_name): + self.__validate_access_permission(["w", "a"]) + + if self.has_table(table_name): + query = "DROP TABLE IF EXISTS '%s'" % (table_name) + self.execute_query(query, logging.getLogger().findCaller()) + self.commit() + + def create_table(self, table_name, attribute_description_list): + self.__validate_access_permission(["w", "a"]) + + table_name = table_name.strip() + if self.has_table(table_name): + return True + + query = "CREATE TABLE IF NOT EXISTS '%s' (%s)" % ( + table_name, ", ".join(attribute_description_list)) + if self.execute_query(query, logging.getLogger().findCaller()) is None: + return False + + return True + + def create_index(self, table_name, attribute_name): + """ + TODO: update table configuration + """ + + self.verify_table_existence(table_name) + self.__validate_access_permission(["w", "a"]) + + index_name = "%s_%s_index" % ( + SqlQuery.sanitize(table_name), SqlQuery.sanitize(attribute_name)) + query = "CREATE INDEX IF NOT EXISTS %s ON %s('%s')" % ( + index_name, SqlQuery.to_table_str(table_name), attribute_name) + self.execute_query(query, logging.getLogger().findCaller()) + + if self.__is_create_table_config: + where_list = [ + SqlQuery.make_where("table_name", table_name), + SqlQuery.make_where("attribute_name", attribute_name), + ] + self.update( + table=self.TableConfiguration.TABLE_NAME, + set_query="has_index = 1", + where=" AND ".join(where_list)) + + def create_index_list(self, table_name, attribute_name_list): + self.__validate_access_permission(["w", "a"]) + + if dataproperty.is_empty_list_or_tuple(attribute_name_list): + return + + for attribute in attribute_name_list: + self.create_index(table_name, attribute) + + def create_table_with_data( + self, table_name, attribute_name_list, data_matrix, + index_attribute_list=()): + """ + Create table if not exists. And insert data to the created table. + + :param str table_name: Table name to create. + :param list attribute_name_list: Attribute names of the table. + :param dict/namedtuple/list/tuple data_matrix: Data to be inserted. + :param tuple index_attribute_list: Attribute name list to create index. + :raises ValueError: If ``data_matrix`` is empty. + + See also + :py:func:`__verify_value_matrix() ` + :py:func:`create_table() ` + :py:func:`insert_many() ` + :py:func:`create_index_list() ` + """ + + validate_table_name(table_name) + + self.__validate_access_permission(["w", "a"]) + + if dataproperty.is_empty_list_or_tuple(data_matrix): + raise ValueError("input data is null: '%s (%s)'" % ( + table_name, ", ".join(attribute_name_list))) + + data_matrix = self.__to_data_matrix(attribute_name_list, data_matrix) + self.__verify_value_matrix(attribute_name_list, data_matrix) + + strip_index_attribute_list = list( + set(attribute_name_list).intersection(set(index_attribute_list))) + attr_description_list = [] + + table_config_matrix = [] + for col, value_type in sorted( + six.iteritems(self.__get_column_valuetype(data_matrix))): + attr_name = attribute_name_list[col] + attr_description_list.append( + "'%s' %s" % (attr_name, value_type)) + + table_config_matrix.append([ + table_name, + attr_name, + value_type, + attr_name in strip_index_attribute_list, + ]) + self.__create_table_config(table_config_matrix) + + self.create_table(table_name, attr_description_list) + self.insert_many(table_name, data_matrix) + self.create_index_list(table_name, strip_index_attribute_list) + self.commit() + + def rollback(self): + try: + self.check_connection() + except NullDatabaseConnectionError: + return + + self.connection.rollback() + + def commit(self): + try: + self.check_connection() + except NullDatabaseConnectionError: + return + + self.connection.commit() + + def close(self): + try: + self.check_connection() + except NullDatabaseConnectionError: + return + + self.commit() + self.connection.close() + self.__initialize_connection() + + @staticmethod + def __verify_sqlite_db_file(database_path): + """ + :raises sqlite3.OperationalError: unable to open database file + + See also + :py:func:`validate_file_path() ` + """ + + validate_file_path(database_path) + if not os.path.isfile(os.path.realpath(database_path)): + raise IOError("file not found: " + database_path) + + connection = sqlite3.connect(database_path) + connection.close() + + @staticmethod + def __verify_value_matrix(field_list, value_matrix): + miss_match_idx_list = [] + + for list_idx in range(len(value_matrix)): + if len(field_list) == len(value_matrix[list_idx]): + continue + + miss_match_idx_list.append(list_idx) + + if len(miss_match_idx_list) == 0: + return + + sample_miss_match_list = value_matrix[miss_match_idx_list[0]] + + raise ValueError( + "miss match header length and value length:" + + " header: %d %s\n" % (len(field_list), str(field_list)) + + " # of miss match line: %d ouf of %d\n" % ( + len(miss_match_idx_list), len(value_matrix)) + + " e.g. value at line=%d, len=%d: %s\n" % ( + miss_match_idx_list[0], + len(sample_miss_match_list), str(sample_miss_match_list)) + ) + + @staticmethod + def __get_list_from_fetch(result): + """ + :params tuple result: Return value from a Cursor.fetchall() + + :rtype: list + """ + + return [record[0] for record in result] + + def __initialize_connection(self): + self.__database_path = None + self.__connection = None + self.__cursur = None + self.__mode = None + + self.__dict_query_count = {} + self.__dict_query_totalexectime = {} + + def __validate_access_permission(self, valid_permission_list): + self.check_connection() + + if dataproperty.is_empty_string(self.mode): + raise ValueError("mode is not set") + + if self.mode not in valid_permission_list: + raise IOError(str(valid_permission_list)) + + def __get_column_valuetype(self, data_matrix): + """ + Get value type for each column. + + :return: { column_number : value_type } + :rtype: dictionary + """ + + TYPENAME_TABLE = { + dataproperty.Typecode.INT: "INTEGER", + dataproperty.Typecode.FLOAT: "REAL", + dataproperty.Typecode.STRING: "TEXT", + } + + col_prop_list = dataproperty.PropertyExtractor.extract_column_property_list( + [], data_matrix) + + return dict([ + [col, TYPENAME_TABLE[col_prop.typecode]] + for col, col_prop in enumerate(col_prop_list) + ]) + + def __convert_none(self, value): + if value is None: + return "NULL" + + return value + + def __to_record(self, attr_name_list, value): + try: + # dictionary to list + return [ + self.__convert_none(value.get(attr_name)) + for attr_name in attr_name_list + ] + except AttributeError: + pass + + try: + # namedtuple to list + dict_value = value._asdict() + return [ + self.__convert_none(dict_value.get(attr_name)) + for attr_name in attr_name_list + ] + except AttributeError: + pass + + if dataproperty.is_list_or_tuple(value): + return value + + raise ValueError("cannot convert to list") + + def __to_data_matrix(self, attr_name_list, data_matrix): + return [ + self.__to_record(attr_name_list, record) + for record in data_matrix + ] + + def __create_table_config(self, table_config_matrix): + if not self.__is_create_table_config: + return + + attr_description_list = [] + for attr_name in self.TableConfiguration.ATTRIBUTE_NAME_LIST: + if attr_name == "has_index": + data_type = "INTEGER" + else: + data_type = "TEXT" + + attr_description_list.append("'%s' %s" % (attr_name, data_type)) + + table_name = self.TableConfiguration.TABLE_NAME + if not self.has_table(table_name): + self.create_table(table_name, attr_description_list) + + self.insert_many(table_name, table_config_matrix) + + +def connect_sqlite_db_mem(): + return SimpleSQLite(MEMORY_DB_NAME, "w") diff --git a/test/test_simplesqlite.py b/test/test_simplesqlite.py new file mode 100644 index 0000000..213c0bb --- /dev/null +++ b/test/test_simplesqlite.py @@ -0,0 +1,987 @@ +''' +@author: Tsuyoshi Hombashi + +:required: + https://pypi.python.org/pypi/pytest +''' + +import itertools +import os +import re +import sqlite3 + +from collections import namedtuple +import dataproperty +import pytest +from six.moves import range + +from simplesqlite import * + + +nan = float("nan") +inf = float("inf") +TEST_TABLE_NAME = "test_table" +TEST_DB_NAME = "test_db" +NOT_EXIT_FILE_PATH = "/not/existing/file/__path__" + +NamedTuple = namedtuple("NamedTuple", "attr_a attr_b") +NamedTupleEx = namedtuple("NamedTupleEx", "attr_a attr_b attr_c") + + +class Test_SqlQuery_sanitize: + SANITIZE_CHAR_LIST = [ + "%", "/", "(", ")", "[", "]", "<", ">", ".", ";", + "'", "!", "\\", "#", " ", "-", "+", "=", "\n" + ] + + @pytest.mark.parametrize( + ["value", "expected"], + [ + ["AAA%s" % (re.escape(c)), "AAA"] for c in SANITIZE_CHAR_LIST + ] + [ + ["%sBBB" % (re.escape(c)), "BBB"] for c in SANITIZE_CHAR_LIST + ] + [ + [ + "%a/b(c)d[e]fh.i;j'k!l\\m#n _o-p+q=r\nstrvwxyz" + + os.linesep, + "abcdefghijklmn_opqrstrvwxyz" + ] + ] + ) + def test_normal(self, value, expected): + assert SqlQuery.sanitize(value) == expected + + @pytest.mark.parametrize(["value", "expected"], [ + [None, TypeError], + [1, TypeError], + [nan, TypeError], + [True, TypeError], + ]) + def test_abnormal(self, value, expected): + with pytest.raises(expected): + SqlQuery.sanitize(value) + + +class Test_SqlQuery_to_table_str: + + @pytest.mark.parametrize(["value", "expected"], [ + ["test", "test"], + ["te%st", "[te%st]"], + ["te(st", "[te(st]"], + ["te)st", "[te)st]"], + ["te-st", "[te-st]"], + ["te+st", "[te+st]"], + ["te.st", "[te.st]"], + ["te st", "'te st'"], + ]) + def test_normal(self, value, expected): + assert SqlQuery.to_table_str(value) == expected + + @pytest.mark.parametrize(["value", "expected"], [ + [None, TypeError], + [1, TypeError], + [False, TypeError], + ]) + def test_exception(self, value, expected): + with pytest.raises(expected): + assert SqlQuery.to_table_str(value) + + +class Test_SqlQuery_to_attr_str: + + @pytest.mark.parametrize(["value", "operation", "expected"], [ + ["test", None, "test"], + ["test", "AVG", "AVG(test)"], + ["attr_a", 2, "attr_a"], + ["attr_a", True, "attr_a"], + ] + [ + ["te%sst" % (re.escape(c)), None, "[te%sst]" % (re.escape(c))] + for c in [ + "%", "(", ")", ".", " ", "-", "+", "#" + ] + [str(n) for n in range(10)] + ] + ) + def test_normal(self, value, operation, expected): + assert SqlQuery.to_attr_str(value, operation) == expected + + @pytest.mark.parametrize(["value", "expected"], [ + [None, TypeError], + [1, TypeError], + [False, TypeError], + ]) + def test_exception_1(self, value, expected): + with pytest.raises(expected): + SqlQuery.to_attr_str(value) + + +class Test_SqlQuery_to_attr_str_list: + + @pytest.mark.parametrize( + ["value", "operation", "expected"], + [ + [ + ["%aaa", "bbb", "ccc-ddd"], + None, + ["[%aaa]", "bbb", "[ccc-ddd]"], + ], + [ + ["%aaa", "bbb"], + "SUM", + ["SUM([%aaa])", "SUM(bbb)"], + ], + [[], None, []], + ] + ) + def test_normal(self, value, operation, expected): + assert list(SqlQuery.to_attr_str_list(value, operation)) == expected + + @pytest.mark.parametrize(["value", "expected"], [ + [None, TypeError], + [nan, TypeError], + [True, TypeError], + ]) + def test_exception(self, value, expected): + with pytest.raises(expected): + SqlQuery.to_attr_str_list(value) + + +class Test_SqlQuery_to_value_str: + + @pytest.mark.parametrize(["value", "expected"], [ + [0, "0"], + ["0", "0"], + [1.1, "1.1"], + ["test", "'test'"], + ["te st", "'te st'"], + [None, "NULL"], + [False, "'False'"], + ]) + def test_normal(self, value, expected): + assert SqlQuery.to_value_str(value) == expected + + @pytest.mark.parametrize(["value", "expected"], [ + [nan, "nan"], + [inf, "inf"], + ]) + def test_abnormal(self, value, expected): + assert SqlQuery.to_value_str(value) == expected + + +class Test_SqlQuery_to_value_str_list: + + @pytest.mark.parametrize( + ["value", "expected"], + [ + [[0, "bbb", None], ["0", "'bbb'", "NULL"]], + [[], []], + ] + ) + def test_normal(self, value, expected): + assert list(SqlQuery.to_value_str_list(value)) == expected + + @pytest.mark.parametrize(["value", "expected"], [ + [None, TypeError], + [nan, TypeError], + ]) + def test_exception(self, value, expected): + with pytest.raises(expected): + assert SqlQuery.to_value_str_list(value) + + +class Test_SqlQuery_make_select: + + @pytest.mark.parametrize( + ["select", "table", "where", "extra", "expected"], + [ + [ + "A", "B", None, None, + "SELECT A FROM B" + ], + [ + "A", "B B", None, None, + "SELECT A FROM 'B B'" + ], + [ + "A", "B-B", SqlQuery.make_where("C", 1), None, + "SELECT A FROM [B-B] WHERE C = 1" + ], + [ + "A", "B-B", SqlQuery.make_where("C", 1, ">"), "ORDER BY D", + "SELECT A FROM [B-B] WHERE C > 1 ORDER BY D" + ], + ]) + def test_normal(self, select, table, where, extra, expected): + assert SqlQuery.make_select(select, table, where, extra) == expected + + @pytest.mark.parametrize( + ["select", "table", "where", "extra", "expected"], [ + ["A", None, None, None, ValueError], + ["", "B", None, None, ValueError], + [None, None, None, None, ValueError], + [None, "B", None, None, ValueError], + [nan, None, None, None, ValueError], + [nan, nan, None, None, ValueError], + [nan, nan, nan, None, ValueError], + [nan, nan, nan, nan, ValueError], + ]) + def test_exception(self, select, table, where, extra, expected): + with pytest.raises(expected): + SqlQuery.make_select(select, table, where, extra) + + +class Test_SqlQuery_make_insert: + + @pytest.mark.parametrize( + ["table", "insert_tuple", "is_isert_many", "expected"], [ + [ + "A", ["B"], False, + "INSERT INTO A VALUES ('B')" + ], + [ + "A", ["AAAA", 2], False, + "INSERT INTO A VALUES ('AAAA',2)" + ], + [ + "A", ["B"], True, + "INSERT INTO A VALUES (?)" + ], + [ + "A", ["B", "C"], True, + "INSERT INTO A VALUES (?,?)" + ], + ]) + def test_normal(self, table, insert_tuple, is_isert_many, expected): + assert SqlQuery.make_insert( + table, insert_tuple, is_isert_many) == expected + + @pytest.mark.parametrize( + ["table", "insert_tuple", "is_isert_many", "expected"], [ + ["", [], False, ValueError], + ["", None, True, ValueError], + ["", ["B"], False, ValueError], + ["A", [], True, ValueError], + ["A", None, False, ValueError], + [None, None, True, ValueError], + [None, ["B"], False, ValueError], + [None, [], True, ValueError], + ]) + def test_exception(self, table, insert_tuple, is_isert_many, expected): + with pytest.raises(expected): + SqlQuery.make_insert(table, insert_tuple) + + +class Test_SqlQuery_make_update: + + @pytest.mark.parametrize( + ["table", "set_query", "where", "expected"], [ + [ + "A", "B=1", None, + "UPDATE A SET B=1" + ], + [ + "A", "B=1", SqlQuery.make_where("C", 1, ">"), + "UPDATE A SET B=1 WHERE C > 1" + ], + ]) + def test_normal(self, table, set_query, where, expected): + assert SqlQuery.make_update(table, set_query, where) == expected + + @pytest.mark.parametrize( + ["table", "set_query", "where", "expected"], [ + [None, "B=1", None, ValueError], + ["", "B=1", None, ValueError], + ["A", None, None, ValueError], + ["A", "", None, ValueError], + ]) + def test_exception(self, table, set_query, where, expected): + with pytest.raises(expected): + SqlQuery.make_update(table, set_query, where) + + """ + @pytest.mark.parametrize( + ["select", "table", "where", "extra", "expected"], [ + ["A", None, None, None, ValueError], + [None, None, None, None, ValueError], + [None, "B", None, None, TypeError], + [nan, None, None, None, ValueError], + [nan, nan, None, None, ValueError], + [nan, nan, nan, None, ValueError], + [nan, nan, nan, nan, ValueError], + ]) + def test_exception(self, select, table, where, extra, expected): + with pytest.raises(expected): + SqlQuery.make_select(select, table, where, extra) + """ + + +class Test_SqlQuery_make_where: + + @pytest.mark.parametrize(["key", "value", "operation", "expected"], [ + ["key", "value", "=", "key = 'value'"], + ["key key", "value", "!=", "[key key] != 'value'"], + ["%key+key", 100, "<", "[%key+key] < 100"], + ["key.key", "555", ">", "[key.key] > 555"], + + ["key", None, "!=", "key != NULL"], + ]) + def test_normal(self, key, value, operation, expected): + assert SqlQuery.make_where(key, value, operation) == expected + + @pytest.mark.parametrize(["key", "value", "operation", "expected"], [ + ["key", "value", None, ValueError], + ["key", "value", "INVALID_VALUE", ValueError], + ]) + def test_exception(self, key, value, operation, expected): + with pytest.raises(expected): + SqlQuery.make_where(key, value, operation) + + +class Test_SqlQuery_make_where_in: + + @pytest.mark.parametrize(["key", "value", "expected"], [ + ["key", ["attr_a", "attr_b"], "key IN ('attr_a', 'attr_b')"], + ]) + def test_normal(self, key, value, expected): + assert SqlQuery.make_where_in(key, value) == expected + + @pytest.mark.parametrize(["key", "value", "expected"], [ + ["key", None, TypeError], + ["key", 1, TypeError], + [None, ["attr_a", "attr_b"], TypeError], + [None, None, TypeError], + ]) + def test_exception(self, key, value, expected): + with pytest.raises(expected): + SqlQuery.make_where_in(key, value) + + +class Test_SqlQuery_make_where_not_in: + + @pytest.mark.parametrize(["key", "value", "expected"], [ + ["key", ["attr_a", "attr_b"], "key NOT IN ('attr_a', 'attr_b')"], + ]) + def test_normal(self, key, value, expected): + assert SqlQuery.make_where_not_in(key, value) == expected + + @pytest.mark.parametrize(["key", "value", "expected"], [ + ["key", None, TypeError], + ["key", 1, TypeError], + [None, ["attr_a", "attr_b"], TypeError], + [None, None, TypeError], + ]) + def test_exception(self, key, value, expected): + with pytest.raises(expected): + SqlQuery.make_where_not_in(key, value) + + +@pytest.fixture +def con(tmpdir): + p = tmpdir.join("tmp.db") + con = SimpleSQLite(str(p), "w") + + con.create_table_with_data( + table_name=TEST_TABLE_NAME, + attribute_name_list=["attr_a", "attr_b"], + data_matrix=[ + [1, 2], + [3, 4], + ]) + + return con + + +@pytest.fixture +def con_mix(tmpdir): + p = tmpdir.join("tmp.db") + con = SimpleSQLite(str(p), "w") + + con.create_table_with_data( + table_name=TEST_TABLE_NAME, + attribute_name_list=["attr_i", "attr_f", "attr_s"], + data_matrix=[ + [1, 2.2, "aa"], + [3, 4.4, "bb"], + ]) + + return con + + +@pytest.fixture +def con_ro(tmpdir): + p = tmpdir.join("tmp.db") + con = SimpleSQLite(str(p), "w") + + con.create_table_with_data( + table_name=TEST_TABLE_NAME, + attribute_name_list=["attr_a", "attr_b"], + data_matrix=[ + [1, 2], + [3, 4], + ]) + con.close() + con.connect(str(p), "r") + + return con + + +@pytest.fixture +def con_profile(tmpdir): + p = tmpdir.join("tmp_profile.db") + con = SimpleSQLite(str(p), "w", profile=True) + + con.create_table_with_data( + table_name=TEST_TABLE_NAME, + attribute_name_list=["attr_a", "attr_b"], + data_matrix=[ + [1, 2], + [3, 4], + ]) + + return con + + +@pytest.fixture +def con_null(tmpdir): + p = tmpdir.join("tmp.db") + con = SimpleSQLite(str(p), "w") + con.close() + + return con + + +class Test_SimpleSQLite_init: + + @pytest.mark.parametrize(["mode"], [ + ["w"], + ["a"], + ]) + def test_normal(self, tmpdir, mode): + p = tmpdir.join("not_exist.db") + db_path = str(p) + con = SimpleSQLite(db_path, mode) + assert con.database_path == db_path + assert con.connection is not None + + @pytest.mark.parametrize(["value", "mode", "expected"], [ + [NOT_EXIT_FILE_PATH, "r", IOError], + [NOT_EXIT_FILE_PATH, "w", sqlite3.OperationalError], + [NOT_EXIT_FILE_PATH, "a", sqlite3.OperationalError], + + [NOT_EXIT_FILE_PATH, None, TypeError], + [NOT_EXIT_FILE_PATH, inf, TypeError], + [NOT_EXIT_FILE_PATH, "", ValueError], + [NOT_EXIT_FILE_PATH, "b", ValueError], + ] + [ + arg_list + for arg_list in itertools.product( + [None, nan, ""], ["r", "w", "a"], [ValueError]) + ]) + def test_exception_1(self, value, mode, expected): + with pytest.raises(expected): + SimpleSQLite(value, mode) + + @pytest.mark.parametrize(["mode", "expected"], [ + ["r", IOError], + ]) + def test_exception_2(self, tmpdir, mode, expected): + p = tmpdir.join("not_exist.db") + + with pytest.raises(expected): + SimpleSQLite(str(p), mode) + + +class Test_SimpleSQLite_is_connected: + + def test_normal(self, con): + assert con.is_connected() + + def test_null(self, con_null): + assert not con_null.is_connected() + + +class Test_SimpleSQLite_check_connection: + + def test_normal(self, con): + con.check_connection() + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.check_connection() + + +class Test_SimpleSQLite_select: + + def test_smoke(self, con): + result = con.select(select="*", table=TEST_TABLE_NAME) + assert result is not None + + @pytest.mark.parametrize(["attr", "table_name", "expected"], [ + ["not_exist_attr", TEST_TABLE_NAME, sqlite3.OperationalError], + ["", TEST_TABLE_NAME, ValueError], + [None, TEST_TABLE_NAME, ValueError], + ["attr_a", "not_exist_table", sqlite3.OperationalError], + ["attr_a", "", ValueError], + ["attr_a", None, ValueError], + ["", "", ValueError], + ["", None, ValueError], + [None, None, ValueError], + [None, "", ValueError], + ]) + def test_exception(self, con, attr, table_name, expected): + with pytest.raises(expected): + con.select(select=attr, table=table_name) + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.select(select="*", table=TEST_TABLE_NAME) + + +class Test_SimpleSQLite_insert: + + @pytest.mark.parametrize(["value", "expeted"], [ + [[5, 6], (5, 6)], + [(5, 6), (5, 6)], + [ + {"attr_a": 5, "attr_b": 6}, + (5, 6) + ], + [ + {"attr_a": 5, "attr_b": 6, "not_exist_attr": 100}, + (5, 6) + ], + [{"attr_a": 5}, (5, None)], + [{"attr_b": 6}, (None, 6)], + [{}, (None, None)], + [NamedTuple(5, 6), (5, 6)], + [NamedTuple(5, None), (5, None)], + [NamedTuple(None, 6), (None, 6)], + [NamedTuple(None, None), (None, None)], + [NamedTupleEx(5, 6, 7), (5, 6)] + ]) + def test_normal(self, con, value, expeted): + assert con.get_value(select="COUNT(*)", table=TEST_TABLE_NAME) == 2 + con.insert(TEST_TABLE_NAME, insert_record=value) + assert con.get_value(select="COUNT(*)", table=TEST_TABLE_NAME) == 3 + result = con.select(select="*", table=TEST_TABLE_NAME) + result_tuple = result.fetchall()[2] + assert result_tuple == expeted + + @pytest.mark.parametrize(["value", "expeted"], [ + [[5, 6.6, "c"], (5, 6.6, "c")], + ]) + def test_mix(self, con_mix, value, expeted): + assert con_mix.get_value(select="COUNT(*)", table=TEST_TABLE_NAME) == 2 + con_mix.insert(TEST_TABLE_NAME, insert_record=value) + assert con_mix.get_value(select="COUNT(*)", table=TEST_TABLE_NAME) == 3 + result = con_mix.select(select="*", table=TEST_TABLE_NAME) + result_tuple = result.fetchall()[2] + assert result_tuple == expeted + + def test_read_only(self, con_ro): + with pytest.raises(IOError): + con_ro.insert( + TEST_TABLE_NAME, insert_record=[5, 6]) + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.insert( + TEST_TABLE_NAME, insert_record=[5, 6]) + + +class Test_SimpleSQLite_insert_many: + + @pytest.mark.parametrize(["table_name", "value"], [ + [ + TEST_TABLE_NAME, + [ + [7, 8], + [9, 10], + [11, 12], + ], + ], + [ + TEST_TABLE_NAME, + [ + {"attr_a": 7, "attr_b": 8}, + {"attr_a": 9, "attr_b": 10}, + {"attr_a": 11, "attr_b": 12}, + ], + ], + [ + TEST_TABLE_NAME, + [ + NamedTuple(7, 8), + NamedTuple(9, 10), + NamedTuple(11, 12), + ], + ], + [ + TEST_TABLE_NAME, + [ + [7, 8], + {"attr_a": 9, "attr_b": 10}, + NamedTuple(11, 12), + ], + ], + ]) + def test_normal(self, con, table_name, value): + expected = [ + (7, 8), + (9, 10), + (11, 12), + ] + + assert con.get_value(select="COUNT(*)", table=TEST_TABLE_NAME) == 2 + con.insert_many(TEST_TABLE_NAME, value) + assert con.get_value(select="COUNT(*)", table=TEST_TABLE_NAME) == 5 + result = con.select(select="*", table=TEST_TABLE_NAME) + result_tuple = result.fetchall()[2:] + assert result_tuple == expected + + @pytest.mark.parametrize(["table_name", "value"], [ + [TEST_TABLE_NAME, []], + [TEST_TABLE_NAME, None], + ]) + def test_empty(self, con, table_name, value): + con.insert_many(TEST_TABLE_NAME, value) + + @pytest.mark.parametrize(["table_name", "value", "expected"], [ + [None, None, ValueError], + [None, [], ValueError], + [TEST_TABLE_NAME, [None], ValueError], + ]) + def test_exception(self, con, table_name, value, expected): + with pytest.raises(expected): + con.insert_many(table_name, value) + + def test_read_only(self, con_ro): + with pytest.raises(IOError): + con_ro.insert( + TEST_TABLE_NAME, []) + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.insert_many( + TEST_TABLE_NAME, []) + + +class Test_SimpleSQLite_update: + + def test_normal(self, con): + table_name = TEST_TABLE_NAME + where = SqlQuery.make_where("attr_b", 2) + con.update(table=table_name, set_query="attr_a = 100", where=where) + assert con.get_value( + select="attr_a", table=table_name, where=where) == 100 + + @pytest.mark.parametrize(["table_name", "set_query", "expected"], [ + [TEST_TABLE_NAME, "", ValueError], + [TEST_TABLE_NAME, None, ValueError], + ["not_exist_table", "attr_a = 1", sqlite3.OperationalError], + ["", "attr_a = 1", ValueError], + [None, "attr_a = 1", ValueError], + ["", "", ValueError], + ["", None, ValueError], + [None, None, ValueError], + [None, "", ValueError], + ]) + def test_exception(self, con, table_name, set_query, expected): + with pytest.raises(expected): + con.update(table=table_name, set_query=set_query) + + def test_read_only(self, con_ro): + with pytest.raises(IOError): + con_ro.update( + table=TEST_TABLE_NAME, set_query="attr_a = 100") + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.update(table=TEST_TABLE_NAME, set_query="hoge") + + +class Test_SimpleSQLite_get_total_changes: + + def test_smoke(self, con): + assert con.get_total_changes() > 0 + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.get_total_changes() + + +class Test_SimpleSQLite_get_table_name_list: + + def test_normal(self, con): + expected = set([ + SimpleSQLite.TableConfiguration.TABLE_NAME, TEST_TABLE_NAME + ]) + + assert set(con.get_table_name_list()) == expected + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.get_table_name_list() + + +class Test_SimpleSQLite_get_attribute_name_list: + + @pytest.mark.parametrize(["value", "expected"], [ + [ + TEST_TABLE_NAME, + ["attr_a", "attr_b"], + ], + ]) + def test_normal(self, con, value, expected): + assert con.get_attribute_name_list(value) == expected + + @pytest.mark.parametrize(["value", "expected"], [ + ["not_exist_table", TableNotFoundError], + [None, TableNotFoundError], + ]) + def test_null(self, con, value, expected): + with pytest.raises(expected): + con.get_attribute_name_list(value) + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.get_attribute_name_list("not_exist_table") + + +class Test_SimpleSQLite_get_attribute_type_list: + + @pytest.mark.parametrize(["value", "expected"], [ + [ + TEST_TABLE_NAME, + ("integer", "integer"), + ], + ]) + def test_normal(self, con, value, expected): + assert con.get_attribute_type_list(value) == expected + + @pytest.mark.parametrize(["value", "expected"], [ + ["not_exist_table", TableNotFoundError], + [None, TableNotFoundError], + ]) + def test_null(self, con, value, expected): + with pytest.raises(expected): + con.get_attribute_type_list(value) + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.get_attribute_type_list(TEST_TABLE_NAME) + + +class Test_SimpleSQLite_has_table: + + @pytest.mark.parametrize(["value", "expected"], [ + [TEST_TABLE_NAME, True], + ["not_exist_table", False], + ["", False], + [None, False], + ]) + def test_normal(self, con, value, expected): + assert con.has_table(value) == expected + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.has_table(TEST_TABLE_NAME) + + +class Test_SimpleSQLite_has_attribute: + + @pytest.mark.parametrize(["table", "attr", "expected"], [ + [TEST_TABLE_NAME, "attr_a", True], + [TEST_TABLE_NAME, "not_exist_attr", False], + [TEST_TABLE_NAME, "", False], + [TEST_TABLE_NAME, None, False], + ]) + def test_normal(self, con, table, attr, expected): + assert con.has_attribute(table, attr) == expected + + @pytest.mark.parametrize(["value", "attr", "expected"], [ + ["not_exist_table", "attr_a", TableNotFoundError], + [None, "attr_a", ValueError], + ["", "attr_a", ValueError], + ]) + def test_exception(self, con, value, attr, expected): + with pytest.raises(expected): + con.has_attribute(value, attr) + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.has_attribute(TEST_TABLE_NAME, "attr") + + +class Test_SimpleSQLite_has_attribute_list: + + @pytest.mark.parametrize(["table", "attr", "expected"], [ + [TEST_TABLE_NAME, ["attr_a"], True], + [TEST_TABLE_NAME, ["attr_a", "attr_b"], True], + [TEST_TABLE_NAME, ["attr_a", "attr_b", "not_exist_attr"], False], + [TEST_TABLE_NAME, ["not_exist_attr"], False], + [TEST_TABLE_NAME, [], False], + [TEST_TABLE_NAME, None, False], + ]) + def test_normal(self, con, table, attr, expected): + assert con.has_attribute_list(table, attr) == expected + + @pytest.mark.parametrize(["value", "attr", "expected"], [ + ["not_exist_table", ["attr_a"], TableNotFoundError], + [None, ["attr_a"], ValueError], + ["", ["attr_a"], ValueError], + ]) + def test_exception(self, con, value, attr, expected): + with pytest.raises(expected): + con.has_attribute_list(value, attr) + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.has_attribute_list(TEST_TABLE_NAME, "attr") + + +class Test_SimpleSQLite_get_profile: + + def test_normal(self, con): + attribute_name_list, profile_list = con.get_profile() + assert dataproperty.is_empty_list_or_tuple(attribute_name_list) + assert dataproperty.is_empty_list_or_tuple(profile_list) + + def test_normal_profile(self, con_profile): + attribute_name_list, profile_list = con_profile.get_profile() + assert dataproperty.is_not_empty_list_or_tuple(attribute_name_list) + assert dataproperty.is_not_empty_list_or_tuple(profile_list) + + +class Test_SimpleSQLite_verify_table_existence: + + def test_normal(self, con): + con.verify_table_existence(TEST_TABLE_NAME) + + def test_exception(self, con): + with pytest.raises(TableNotFoundError): + con.verify_table_existence("not_exist_table") + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.verify_table_existence(TEST_TABLE_NAME) + + +class Test_SimpleSQLite_verify_attribute_existence: + + @pytest.mark.parametrize(["table", "attr", "expected"], [ + [TEST_TABLE_NAME, "not_exist_attr", AttributeNotFoundError], + ["not_exist_table", "attr_a", TableNotFoundError], + [None, "attr_a", ValueError], + ["", "attr_a", ValueError], + ]) + def test_normal(self, con, table, attr, expected): + with pytest.raises(expected): + con.verify_attribute_existence(table, attr) + + +class Test_SimpleSQLite_drop_table: + + def test_normal(self, con): + attr_description_list = [ + "'%s' %s" % ("attr_name", "TEXT") + ] + + table_name = "new_table" + + assert not con.has_table(table_name) + + con.create_table(table_name, attr_description_list) + assert con.has_table(table_name) + + con.drop_table(table_name) + assert not con.has_table(table_name) + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.drop_table(TEST_TABLE_NAME) + + +class Test_SimpleSQLite_create_table_with_data: + + @pytest.mark.parametrize(["data_matrix"], [ + [ + [ + [1, 4, "a"], + [2, 2.1, "bb"], + [3, 120.9, "ccc"], + ], + ], + [ + [ + {"attr_a": 1, "attr_b": 4, "attr_c": "a"}, + {"attr_a": 2, "attr_b": 2.1, "attr_c": "bb"}, + {"attr_a": 3, "attr_b": 120.9, "attr_c": "ccc"}, + ], + ], + ]) + def test_normal(self, tmpdir, data_matrix): + p = tmpdir.join("tmp.db") + con = SimpleSQLite(str(p), "w") + table_name = TEST_TABLE_NAME + attribute_name_list = ["attr_a", "attr_b", "attr_c"] + index_attribute_list = ["attr_a"] + + con.create_table_with_data( + table_name, attribute_name_list, data_matrix, index_attribute_list) + con.commit() + + # check data --- + result = con.select(select="*", table=table_name) + result_matrix = result.fetchall() + assert len(result_matrix) == 3 + + # check table config --- + expected = [ + (table_name, 'attr_a', 'INTEGER', 1), + (table_name, 'attr_b', 'REAL', 0), + (table_name, 'attr_c', 'TEXT', 0), + ] + + result = con.select( + select="*", table=SimpleSQLite.TableConfiguration.TABLE_NAME) + result_matrix = result.fetchall() + print result_matrix + assert result_matrix == expected + + def test_null(self, con_null): + with pytest.raises(NullDatabaseConnectionError): + con_null.create_table_with_data( + TEST_TABLE_NAME, [], []) + + +class Test_SimpleSQLite_rollback: + + def test_normal(self, con): + con.rollback() + + def test_null(self, con_null): + con_null.rollback() + + +class Test_SimpleSQLite_commit: + + def test_normal(self, con): + con.commit() + + def test_null(self, con_null): + con_null.commit() + + +class Test_SimpleSQLite_close: + + def test_close(self, con): + con.close() + + def test_null(self, con_null): + con_null.close() + + +class Test_connect_sqlite_db_mem: + + def test_normal(self): + assert connect_sqlite_db_mem() is not None diff --git a/test_requirements.txt b/test_requirements.txt new file mode 100644 index 0000000..9955dec --- /dev/null +++ b/test_requirements.txt @@ -0,0 +1,2 @@ +pytest +pytest-cov diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..b9038f4 --- /dev/null +++ b/tox.ini @@ -0,0 +1,9 @@ +[tox] +envlist = py{25,26,27,33,34} + +[testenv] +deps = + pytest +commands = + python setup.py test + \ No newline at end of file