diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..dc792cb7 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +exclude = venv/, .ve/, data/, src/ +max-line-length = 160 diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..10a99945 --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +*.sqlite3 +*.swp +*.mo +*~ +.ve +*.pyc +__pycache__ +media +.coverage +htmlcov +docs/_build +.cache/* +.hypothesis/* +.pytest_cache +venv/ +data/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..4f804b71 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,24 @@ +sudo: false +addons: + chrome: stable + postgresql: "10" + apt: + packages: + - postgresql-10 + - postgresql-client-10 +env: + global: + - PGPORT=5433 + - KINGFISHER_PROCESS_DB_URI='postgres:///travis' +services: + - postgresql +language: python +python: + - "3.5" + +install: + - "pip install -r requirements.txt" + - "pip install flake8" +script: + - "flake8 ocdskingfisherprocess/ ocdskingfisher-process-cli tests" + - "py.test" diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..a9ee6de0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2018, Open Contracting Data Standard +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md index 1b4c064d..295b342e 100644 --- a/README.md +++ b/README.md @@ -1 +1 @@ -# kingfisher-process \ No newline at end of file +# OCDS Kingfisher diff --git a/docs/cli-check-collection.rst b/docs/cli-check-collection.rst new file mode 100644 index 00000000..ef7ac955 --- /dev/null +++ b/docs/cli-check-collection.rst @@ -0,0 +1,15 @@ +Command line tool - check-collection option +=========================================== + +This command checks all data so far in a collection. + +It can be run multiple times on a collection, and data already checked will not be rechecked. + +Pass the ID of the collection you want checked. Use :doc:`cli-list-collections` to look up the ID you want. + +.. code-block:: shell-session + + python ocdskingfisher-process-cli check-collection 17 + + +TODO write about checking different schema versions here - but how that works is about to change, so no point documenting it now. \ No newline at end of file diff --git a/docs/cli-list-collections.rst b/docs/cli-list-collections.rst new file mode 100644 index 00000000..ad9c0fe2 --- /dev/null +++ b/docs/cli-list-collections.rst @@ -0,0 +1,8 @@ +Command line tool - list-collections option +=========================================== + +This command lists all the collections this install of the app knows about. + +.. code-block:: shell-session + + python ocdskingfisher-process-cli list-collections diff --git a/docs/cli-upgrade-database.rst b/docs/cli-upgrade-database.rst new file mode 100644 index 00000000..ceefe996 --- /dev/null +++ b/docs/cli-upgrade-database.rst @@ -0,0 +1,14 @@ +Command line tool - upgrade-database option +=========================================== + +This tool will setup from scratch or update to the latest versions the tables and structure in the Postgresql database. + +.. code-block:: shell-session + + python ocdskingfisher-process-cli upgrade-database + +If you want to delete all the existing tables before setting up empty tables, pass the `deletefirst` flag. + +.. code-block:: shell-session + + python ocdskingfisher-process-cli upgrade-database --deletefirst diff --git a/docs/cli.rst b/docs/cli.rst new file mode 100644 index 00000000..0b0ce1ce --- /dev/null +++ b/docs/cli.rst @@ -0,0 +1,19 @@ +Command line tool +================= + + +You can use the tool with the provided CLI script. There are various sub commands. + +You can pass the `verbose` flag to all sub commands, to get more output printed to the terminal. + +.. code-block:: shell-session + + python ocdskingfisher-process-cli --verbose run ... + +.. toctree:: + + + cli-upgrade-database.rst + cli-list-collections.rst + cli-check-collection.rst + diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 00000000..ed021e0d --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,5 @@ +master_doc = 'index' + +project = 'OCDS Kingfisher Process Tool' +copyright = '2018, Open Contracting Data Standard' + diff --git a/docs/config.rst b/docs/config.rst new file mode 100644 index 00000000..59c911f6 --- /dev/null +++ b/docs/config.rst @@ -0,0 +1,45 @@ +Configuration +============= + +Database Configuration +---------------------- + +Postgresql Database settings can be set using a `~/.config/ocdskingfisher-process/config.ini` file. A sample one is included in the +main directory. + + +.. code-block:: ini + + [DBHOST] + HOSTNAME = localhost + PORT = 5432 + USERNAME = ocdsdata + PASSWORD = FIXME + DBNAME = ocdsdata + + +It will also attempt to load the password from a `~/.pgpass` file, if one is present. + +You can also set the `KINGFISHER_PROCESS_DB_URI` environmental variable to use a custom PostgreSQL server, for example +`postgresql://user:password@localhost:5432/dbname`. + +The order of precedence is (from least-important to most-important): + + - config file + - password from `~/.pgpass` + - environmental variable + +Web Configuration +----------------- + +TODO write up the API Key - notes: KINGFISHER_PROCESS_WEB_API_KEYS env var or [WEB] API_KEYS= in ini. Comma seperated. + +Logging Configuration +--------------------- + +This tool will provide additional logging information using the standard Python logging module, with loggers in the "ocdskingfisher" +namespace. + +When using the command line tool, it can be configured by setting a `~/.config/ocdskingfisher-process/logging.json` file. +A sample one is included in the main directory. + diff --git a/docs/data-model.rst b/docs/data-model.rst new file mode 100644 index 00000000..ccd020fe --- /dev/null +++ b/docs/data-model.rst @@ -0,0 +1,54 @@ +Data Model +========== + +Collections +----------- + +Collections are a set of data that are handled separately. + +A collection is defined uniquely by a combination of all the variables listed below. + +* Name. A String. Can be anything you want. +* Date. The date the collection started. +* Sample. A Boolean flag. + +A collection is also given a numeric ID. + +Files +----- + +Each collection contains one or more files. + +Each file is uniquely identified in a collection by it's file name. + +Data Types for Files +-------------------- + +When giving file to this software to load, you must specify a data type. This can be: + +* record - the file is a record. +* release - the file is a release. +* record_package - the file is a record package. +* release_package - the file is a release package. +* record_package_json_lines - the file is JSON lines, and every line is a record package +* release_package_json_lines - see last entry, but release packages. +* record_package_list - the file is a list of record packages. eg [ { record-package-1 } , { record-package-2 } ] +* release_package_list - see last entry, but release packages. +* record_package_list_in_results - the file is a list of record packages in the results attribute. eg { 'results': [ { record-package-1 } , { record-package-2 } ] } +* release_package_list_in_results - see last entry, but release packages. + +Items +----- + +Each File contains one or more items, where an item as a piece of OCDS data - a release, record, release package or record-package. + +Some files only contain one item, and in that case there will only be one item per file. + +Some files contain many items. For example; + +* JSON Lines files +* A file downloaded from an API where the file is a JSON object that contains a list of records. eg http://www.contratosabiertos.cdmx.gob.mx/api/contratos/array + +Each items has an integer number, which lists the order they appear in. + +Each item is uniquely identified in a file by it's number. diff --git a/docs/development.rst b/docs/development.rst new file mode 100644 index 00000000..0fcade17 --- /dev/null +++ b/docs/development.rst @@ -0,0 +1,22 @@ +Development +=========== + +Run tests +--------- + +Run `py.test` from root directory. + +The tests will drop and create the database, so you probably want to specify a special testing database with a environmental variable - see :doc:`config`. + + +Main Database - Postgresql +-------------------------- + +Create DB Migrations with Alembic - http://alembic.zzzcomputing.com/en/latest/ + +.. code-block:: shell-session + + alembic --config=mainalembic.ini revision -m "message" + +Add changes to new migration, and make sure you update database.py table structures and delete_tables to. + diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 00000000..ee4bf23d --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,15 @@ +OCDS Kingfisher Process tool +============================ + +OCDS Kingfisher Process is a tool for storing and analysing data from publishers of the Open Contracting Data Standard. + +(It does not download data - for that, see the Scrape part of Kingfisher) + +.. toctree:: + + data-model.rst + requirements-install.rst + config.rst + cli.rst + web.rst + development.rst diff --git a/docs/requirements-install.rst b/docs/requirements-install.rst new file mode 100644 index 00000000..135f1a75 --- /dev/null +++ b/docs/requirements-install.rst @@ -0,0 +1,52 @@ +Requirements and Install +======================== + +Requirements +------------ + +Requirements: + +- python v3.5 or higher +- Postgresql v10 or higher + +Requirements for website +------------------------ + +Requirements: + +- A Web Server capable of running a WSGI Python app + +Installation +------------ + +Set up a venv and install requirements: + +.. code-block:: shell-session + + virtualenv -p python3 .ve + source .ve/bin/activate + pip install -r requirements.txt + pip install -e . + +Database +-------- + +You need to create a UTF8 Postgresql database and create a user with write access. + +Once you have created the database, you need to configure the tool to connect to the database. + +You can see one way of doing that in the example below, but for other options see :doc:`config`. + +You also have to run a command to create the tables in database. + +You can see the command in the example below, but for more on that see :doc:`cli-upgrade-database`. + +Example of creating an database user, database and setting up the schema: + +.. code-block:: shell-session + + + sudo -u postgres createuser ocdskingfisher --pwprompt + sudo -u postgres createdb ocdskingfisher -O ocdskingfisher --encoding UTF8 --template template0 --lc-collate en_US.UTF-8 --lc-ctype en_US.UTF-8 + export KINGFISHER_PROCESS_DB_URI='postgres://ocdskingfisher:PASSWORD YOU CHOSE@localhost/ocdskingfisher' + python ocdskingfisher-process-cli upgrade-database diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 00000000..a6463f9b --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1 @@ +## This file is a hack to make Read The Docs Work \ No newline at end of file diff --git a/docs/web-api-v1.rst b/docs/web-api-v1.rst new file mode 100644 index 00000000..1c3a2463 --- /dev/null +++ b/docs/web-api-v1.rst @@ -0,0 +1,62 @@ +Web API Version 1 +================= + +Authentication +-------------- + +All calls to this must pass as an API_KEY as a GET parameter only. + +eg http://....../?API_KEY=KEY_GOES_HERE + +To set the key, see :doc:`config`. + + +Creating a collection +--------------------- + +There is no special API call for this. Just call one of the store methods. If the collection does not exist, it will be created automatically. + +Store File +---------- + +The end point is /api/v1/submit/file/ + +You must pass details as POST parameters. + +Firstly, you must pass details of the collection. + +* collection_source - String. +* collection_data_version - String. In format YYYY-MM-DD HH:MM:SS +* collection_sample - Boolean. Pass 1 for true and 0 for false. + +Secondly, you must pass details of the file. + +* file_name - String. +* file_url - String. +* file_data_type - String. See section on file data types in :doc:`data-model`. +* file_encoding - String. + +Finally, pass the actual file as a file upload named "file". + +Store Item +---------- + +The end point is /api/v1/submit/item/ + +You must pass details as POST parameters. + +Firstly, you must pass details of the collection. + +* collection_source - String. +* collection_data_version - String. In format YYYY-MM-DD HH:MM:SS +* collection_sample - Boolean. Pass 1 for true and 0 for false. + +Secondly, you must pass details of the file. + +* file_name - String. +* file_url - String. +* file_data_type - String. See section on file data types in :doc:`data-model`. But when passing an item, only some data types can be used. +* file_encoding - String. +* number - Integer. + +Finally, pass the actual item as a file upload named "file". diff --git a/docs/web.rst b/docs/web.rst new file mode 100644 index 00000000..9f98f762 --- /dev/null +++ b/docs/web.rst @@ -0,0 +1,9 @@ +Web Interface +============= + + +.. toctree:: + + + web-api-v1.rst + diff --git a/mainalembic.ini b/mainalembic.ini new file mode 100644 index 00000000..c7e180de --- /dev/null +++ b/mainalembic.ini @@ -0,0 +1,36 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = ocdskingfisherprocess/maindatabase/migrations + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# timezone to use when rendering the date +# within the migration file as well as the filename. +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +#truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; this defaults +# to database/migrations/versions. When using multiple version +# directories, initial revisions must be specified with --version-path +# version_locations = %(here)s/bar %(here)s/bat database/migrations/versions + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 diff --git a/ocdskingfisher-process-cli b/ocdskingfisher-process-cli new file mode 100644 index 00000000..91988659 --- /dev/null +++ b/ocdskingfisher-process-cli @@ -0,0 +1,40 @@ +#!/usr/bin/env python +import argparse +import os +import logging.config +import json +import ocdskingfisherprocess.cli.util +import ocdskingfisherprocess.config + + +def main(): + + config = ocdskingfisherprocess.config.Config() + config.load_user_config() + + logging_config_file_full_path = os.path.expanduser('~/.config/ocdskingfisher-process/logging.json') + if os.path.isfile(logging_config_file_full_path): + with open(logging_config_file_full_path) as f: + logging.config.dictConfig(json.load(f)) + + parser = argparse.ArgumentParser() + parser.add_argument("--verbose", help="increase output verbosity", + action="store_true") + + subparsers = parser.add_subparsers(dest='subcommand') + + commands = ocdskingfisherprocess.cli.util.gather_cli_commands_instances(config=config) + + for command in commands.values(): + command.configure_subparser(subparsers.add_parser(command.command)) + + args = parser.parse_args() + + if args.subcommand and args.subcommand in commands.keys(): + commands[args.subcommand].run_command(args) + else: + print("Please select a subcommand (try --help)") + + +if __name__ == '__main__': + main() diff --git a/ocdskingfisherprocess/__init__.py b/ocdskingfisherprocess/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ocdskingfisherprocess/checks.py b/ocdskingfisherprocess/checks.py new file mode 100644 index 00000000..bca32b55 --- /dev/null +++ b/ocdskingfisherprocess/checks.py @@ -0,0 +1,111 @@ +from ocdskingfisherprocess import database +from libcoveocds.api import ocds_json_output, APIException +import sqlalchemy as sa +import tempfile +import shutil + + +class Checks: + + def __init__(self, database, collection, override_schema_version=None): + self.database = database + self.collection = collection + self.override_schema_version = override_schema_version + + def process_all_files(self): + for file_model in self.database.get_all_files_in_collection(self.collection.database_id): + self.process_file(file_model=file_model) + + def process_file(self, file_model): + for file_item_model in self.database.get_all_files_items_in_file(file_model): + self.process_file_item(file_item_model=file_item_model) + + def process_file_item(self, file_item_model): + with self.database.get_engine().begin() as connection: + release_rows = connection.execute( + self.database.release_table.select().where(self.database.release_table.c.collection_file_item_id == file_item_model.database_id) + ) + + for release_row in release_rows: + if not self.database.is_release_check_done(release_row['id'], override_schema_version=self.override_schema_version): + self.check_release_row(release_row, override_schema_version=self.override_schema_version) + + del release_rows + + with self.database.get_engine().begin() as connection: + record_rows = connection.execute( + self.database.record_table.select().where(self.database.record_table.c.collection_file_item_id == file_item_model.database_id) + ) + + for record_row in record_rows: + if not self.database.is_record_check_done(record_row['id'], override_schema_version=self.override_schema_version): + self.check_record_row(record_row, override_schema_version=self.override_schema_version) + + def handle_package(self, package): + cove_temp_folder = tempfile.mkdtemp(prefix='ocdskingfisher-cove-', dir=tempfile.gettempdir()) + try: + return ocds_json_output(cove_temp_folder, None, None, convert=False, cache_schema=True, file_type='json', json_data=package) + finally: + shutil.rmtree(cove_temp_folder) + + def get_package_data(self, package_data_id): + with self.database.get_engine().begin() as connection: + s = sa.sql.select([self.database.package_data_table]) \ + .where(self.database.package_data_table.c.id == package_data_id) + result = connection.execute(s) + data_row = result.fetchone() + return data_row['data'] + + def get_data(self, data_id): + with self.database.get_engine().begin() as connection: + s = sa.sql.select([self.database.data_table]) \ + .where(self.database.data_table.c.id == data_id) + result = connection.execute(s) + data_row = result.fetchone() + return data_row['data'] + + def check_release_row(self, release_row, override_schema_version=None): + package = self.get_package_data(release_row.package_data_id) + package['releases'] = [self.get_data(release_row.data_id)] + if override_schema_version: + package['version'] = override_schema_version + try: + cove_output = self.handle_package(package) + checks = [{ + 'release_id': release_row.id, + 'cove_output': cove_output, + 'override_schema_version': override_schema_version + }] + with self.database.get_engine().begin() as connection: + connection.execute(self.database.release_check_table.insert(), checks) + except APIException as err: + checks = [{ + 'release_id': release_row.id, + 'error': str(err), + 'override_schema_version': override_schema_version + }] + with self.database.get_engine().begin() as connection: + connection.execute(database.release_check_error_table.insert(), checks) + + def check_record_row(self, record_row, override_schema_version=None): + package = self.get_package_data(record_row.package_data_id) + package['records'] = [self.get_data(record_row.data_id)] + if override_schema_version: + package['version'] = override_schema_version + try: + cove_output = self.handle_package(package) + checks = [{ + 'record_id': record_row.id, + 'cove_output': cove_output, + 'override_schema_version': override_schema_version + }] + with self.database.get_engine().begin() as connection: + connection.execute(database.record_check_table.insert(), checks) + except APIException as err: + checks = [{ + 'record_id': record_row.id, + 'error': str(err), + 'override_schema_version': override_schema_version + }] + with self.database.get_engine().begin() as connection: + connection.execute(database.record_check_error_table.insert(), checks) diff --git a/ocdskingfisherprocess/cli/__init__.py b/ocdskingfisherprocess/cli/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ocdskingfisherprocess/cli/commands/__init__.py b/ocdskingfisherprocess/cli/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ocdskingfisherprocess/cli/commands/base.py b/ocdskingfisherprocess/cli/commands/base.py new file mode 100644 index 00000000..3e67cbbe --- /dev/null +++ b/ocdskingfisherprocess/cli/commands/base.py @@ -0,0 +1,26 @@ +from ocdskingfisherprocess.database import DataBase + + +class CLICommand: + command = '' + + def __init__(self, config=None): + self.collection_instance = None + self.config = config + self.database = DataBase(config=config) + + def configure_subparser(self, subparser): + pass + + def run_command(self, args): + pass + + def configure_subparser_for_selecting_existing_collection(self, subparser): + subparser.add_argument("collection", help="Collection ID (Use list-collections command to find the ID)") + + def run_command_for_selecting_existing_collection(self, args): + + self.collection = self.database.get_collection(args.collection) + if not self.collection: + print("We can not find the collection that you requested!") + quit(-1) diff --git a/ocdskingfisherprocess/cli/commands/check_collection.py b/ocdskingfisherprocess/cli/commands/check_collection.py new file mode 100644 index 00000000..277683c0 --- /dev/null +++ b/ocdskingfisherprocess/cli/commands/check_collection.py @@ -0,0 +1,25 @@ +import ocdskingfisherprocess.database +import ocdskingfisherprocess.cli.commands.base +from ocdskingfisherprocess.checks import Checks + + +class CheckCLICommand(ocdskingfisherprocess.cli.commands.base.CLICommand): + command = 'check-collection' + + def configure_subparser(self, subparser): + self.configure_subparser_for_selecting_existing_collection(subparser) + subparser.add_argument("--schemaversion", help="Set Schema Version - defaults to 1.1") + + def run_command(self, args): + + self.run_command_for_selecting_existing_collection(args) + + override_schema_version = args.schemaversion + + schema_versions = ["1.0", "1.1"] + if override_schema_version and override_schema_version not in schema_versions: + print("We do not recognise that schema version! Options are: %s" % ", ".join(schema_versions)) + quit(-1) + + checks = Checks(self.database, self.collection, override_schema_version=override_schema_version) + checks.process_all_files() diff --git a/ocdskingfisherprocess/cli/commands/list_collections.py b/ocdskingfisherprocess/cli/commands/list_collections.py new file mode 100644 index 00000000..ec89ed99 --- /dev/null +++ b/ocdskingfisherprocess/cli/commands/list_collections.py @@ -0,0 +1,24 @@ +import ocdskingfisherprocess.cli.commands.base + + +class ListCollections(ocdskingfisherprocess.cli.commands.base.CLICommand): + command = 'list-collections' + + def configure_subparser(self, subparser): + pass + + def run_command(self, args): + + collections = self.database.get_all_collections() + + print("{:5} {:40} {:20} {:5}".format( + "DB-ID", "SOURCE-ID", "DATA-VERSION", "SAMPLE" + )) + + for collection in collections: + print("{:5} {:40} {:20} {:5}".format( + collection.database_id, + collection.source_id[:40], + collection.data_version, + ("Sample" if collection.sample else "Full") + )) diff --git a/ocdskingfisherprocess/cli/commands/upgrade_database.py b/ocdskingfisherprocess/cli/commands/upgrade_database.py new file mode 100644 index 00000000..3867eba0 --- /dev/null +++ b/ocdskingfisherprocess/cli/commands/upgrade_database.py @@ -0,0 +1,19 @@ +import ocdskingfisherprocess.cli.commands.base + + +class UpgradeDataBaseCLICommand(ocdskingfisherprocess.cli.commands.base.CLICommand): + command = 'upgrade-database' + + def configure_subparser(self, subparser): + subparser.add_argument("--deletefirst", help="Delete Database First", action="store_true") + + def run_command(self, args): + + if args.deletefirst: + if args.verbose: + print("Dropping Database") + self.database.delete_tables() + + if args.verbose: + print("Upgrading/Creating Database") + self.database.create_tables() diff --git a/ocdskingfisherprocess/cli/util.py b/ocdskingfisherprocess/cli/util.py new file mode 100644 index 00000000..6de933ca --- /dev/null +++ b/ocdskingfisherprocess/cli/util.py @@ -0,0 +1,20 @@ +import os +import glob +import inspect +import importlib + +import ocdskingfisherprocess.cli.commands.base + + +def gather_cli_commands_instances(config=None): + commands = {} + dir_path = os.path.dirname(os.path.realpath(__file__)) + commands_dir = os.path.join(dir_path, 'commands') + for file in glob.glob(commands_dir + '/*.py'): + module = importlib.import_module('ocdskingfisherprocess.cli.commands.' + file.split('/')[-1].split('.')[0]) + for item in dir(module): + value = getattr(module, item) + if inspect.isclass(value) and issubclass(value, ocdskingfisherprocess.cli.commands.base.CLICommand) \ + and value is not ocdskingfisherprocess.cli.commands.base.CLICommand: + commands[getattr(value, 'command')] = value(config=config) + return commands diff --git a/ocdskingfisherprocess/config.py b/ocdskingfisherprocess/config.py new file mode 100644 index 00000000..410cf82f --- /dev/null +++ b/ocdskingfisherprocess/config.py @@ -0,0 +1,92 @@ +import os +import configparser +import pgpasslib +import sys + + +"""This holds configuration information for Kingfisher. +Whatever tool is calling it - CLI or other code - should create one of these, set it up as required and pass it around. +""" + + +class Config: + + def __init__(self): + self.web_api_keys = [] + self.database_uri = '' + self._database_host = '' + self._database_port = 5432 + self._database_user = '' + self._database_name = '' + self._database_password = '' + + def load_user_config(self): + # First, try and load any config in the ini files + self._load_user_config_ini() + # Second, loook for password in .pggass + self._load_user_config_pgpass() + # Third, try and load any config in the env (so env overwrites ini) + self._load_user_config_env() + + def _load_user_config_pgpass(self): + if not self._database_name or not self._database_user: + return + + try: + password = pgpasslib.getpass( + self._database_host, + self._database_port, + self._database_name, + self._database_user + ) + if password: + self._database_password = password + self.database_uri = 'postgresql://{}:{}@{}:{}/{}'.format( + self._database_user, + self._database_password, + self._database_host, + self._database_port, + self._database_name + ) + + except pgpasslib.FileNotFound: + # Fail silently when no files found. + return + except pgpasslib.InvalidPermissions: + print( + "Your pgpass file has the wrong permissions, for your safety this file will be ignored. Please fix the permissions and try again.") + return + except pgpasslib.PgPassException: + print("Unexpected error:", sys.exc_info()[0]) + return + + def _load_user_config_env(self): + if os.environ.get('KINGFISHER_PROCESS_WEB_API_KEYS'): + self.web_api_keys = [key.strip() for key in os.environ.get('KINGFISHER_PROCESS_WEB_API_KEYS').split(',')] + + if os.environ.get('KINGFISHER_PROCESS_DB_URI'): + self.database_uri = os.environ.get('KINGFISHER_PROCESS_DB_URI') + + def _load_user_config_ini(self): + config = configparser.ConfigParser() + + if os.path.isfile(os.path.expanduser('~/.config/ocdskingfisher-process/config.ini')): + config.read(os.path.expanduser('~/.config/ocdskingfisher-process/config.ini')) + else: + return + + self.web_api_keys = [key.strip() for key in config.get('WEB', 'API_KEYS', fallback='').split(',')] + + self._database_host = config.get('DBHOST', 'HOSTNAME') + self._database_port = config.get('DBHOST', 'PORT') + self._database_user = config.get('DBHOST', 'USERNAME') + self._database_name = config.get('DBHOST', 'DBNAME') + self._database_password = config.get('DBHOST', 'PASSWORD', fallback='') + + self.database_uri = 'postgresql://{}:{}@{}:{}/{}'.format( + self._database_user, + self._database_password, + self._database_host, + self._database_port, + self._database_name + ) diff --git a/ocdskingfisherprocess/database.py b/ocdskingfisherprocess/database.py new file mode 100644 index 00000000..a5514826 --- /dev/null +++ b/ocdskingfisherprocess/database.py @@ -0,0 +1,412 @@ +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB +import datetime +import json +import os +from ocdskingfisherprocess.models import CollectionModel, FileModel, FileItemModel +import alembic.config +from ocdskingfisherprocess.util import get_hash_md5_for_data + + +class SetEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return json.JSONEncoder.default(self, obj) + + +class DataBase: + + def __init__(self, config): + self.config = config + self._engine = None + + self.metadata = sa.MetaData() + + self.collection_table = sa.Table('collection', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('source_id', sa.Text, nullable=False), + sa.Column('data_version', sa.Text, nullable=False), + sa.Column('store_start_at', sa.DateTime(timezone=False), nullable=False), + sa.Column('store_end_at', sa.DateTime(timezone=False), nullable=True), + sa.Column('sample', sa.Boolean, nullable=False, default=False), + sa.UniqueConstraint('source_id', 'data_version', 'sample'), + ) + + # Should be named collection_file but left for backwards compatibility! + self.collection_file_table = sa.Table('collection_file', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('collection_id', sa.Integer, + sa.ForeignKey("collection.id"), nullable=False), + sa.Column('filename', sa.Text, nullable=True), + sa.Column('store_start_at', sa.DateTime(timezone=False), + nullable=True), + sa.Column('store_end_at', sa.DateTime(timezone=False), + nullable=True), + sa.Column('warnings', JSONB, nullable=True), + sa.UniqueConstraint('collection_id', 'filename'), + ) + + self.collection_file_item_table = sa.Table('collection_file_item', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('collection_file_id', sa.Integer, + sa.ForeignKey("collection_file.id"), + nullable=False), + sa.Column('store_start_at', sa.DateTime(timezone=False), + nullable=True), + sa.Column('store_end_at', sa.DateTime(timezone=False), + nullable=True), + sa.Column('number', sa.Integer), + sa.UniqueConstraint('collection_file_id', 'number'), + ) + + self.data_table = sa.Table('data', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('hash_md5', sa.Text, nullable=False, unique=True), + sa.Column('data', JSONB, nullable=False), + ) + + self.package_data_table = sa.Table('package_data', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('hash_md5', sa.Text, nullable=False, unique=True), + sa.Column('data', JSONB, nullable=False), + ) + + self.release_table = sa.Table('release', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('collection_file_item_id', sa.Integer, + sa.ForeignKey("collection_file_item.id"), nullable=False), + sa.Column('release_id', sa.Text, nullable=True), + sa.Column('ocid', sa.Text, nullable=True), + sa.Column('data_id', sa.Integer, sa.ForeignKey("data.id"), nullable=False), + sa.Column('package_data_id', sa.Integer, sa.ForeignKey("package_data.id"), + nullable=False), + ) + + self.record_table = sa.Table('record', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('collection_file_item_id', sa.Integer, + sa.ForeignKey("collection_file_item.id"), nullable=False), + sa.Column('ocid', sa.Text, nullable=True), + sa.Column('data_id', sa.Integer, sa.ForeignKey("data.id"), nullable=False), + sa.Column('package_data_id', sa.Integer, sa.ForeignKey("package_data.id"), + nullable=False), + ) + + self.release_check_table = sa.Table('release_check', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('release_id', sa.Integer, sa.ForeignKey("release.id"), index=True, + unique=False, nullable=False), + sa.Column('override_schema_version', sa.Text, nullable=True), + sa.Column('cove_output', JSONB, nullable=False), + sa.UniqueConstraint('release_id', 'override_schema_version', + name='ix_release_check_release_id_and_more') + ) + + self.record_check_table = sa.Table('record_check', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('record_id', sa.Integer, sa.ForeignKey("record.id"), index=True, + unique=False, + nullable=False), + sa.Column('override_schema_version', sa.Text, nullable=True), + sa.Column('cove_output', JSONB, nullable=False), + sa.UniqueConstraint('record_id', 'override_schema_version', + name='ix_record_check_record_id_and_more') + ) + + self.release_check_error_table = sa.Table('release_check_error', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('release_id', sa.Integer, sa.ForeignKey("release.id"), + index=True, + unique=False, nullable=False), + sa.Column('override_schema_version', sa.Text, nullable=True), + sa.Column('error', sa.Text, nullable=False), + sa.UniqueConstraint('release_id', 'override_schema_version', + name='ix_release_check_error_release_id_and_more') + ) + + self.record_check_error_table = sa.Table('record_check_error', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('record_id', sa.Integer, sa.ForeignKey("record.id"), + index=True, + unique=False, nullable=False), + sa.Column('override_schema_version', sa.Text, nullable=True), + sa.Column('error', sa.Text, nullable=False), + sa.UniqueConstraint('record_id', 'override_schema_version', + name='ix_record_check_error_record_id_and_more') + ) + + def get_engine(self): + # We only create a connection if actually needed; sometimes people do operations that don't need a database + # and in that case no need to connect. + # But this side of kingfisher now always requires a DB, so there should not be a problem opening a connection! + if not self._engine: + self._engine = sa.create_engine(self.config.database_uri, json_serializer=SetEncoder().encode) + return self._engine + + def delete_tables(self): + engine = self.get_engine() + engine.execute("drop table if exists record_check cascade") + engine.execute("drop table if exists record_check_error cascade") + engine.execute("drop table if exists release_check cascade") + engine.execute("drop table if exists release_check_error cascade") + engine.execute("drop table if exists record cascade") + engine.execute("drop table if exists release cascade") + engine.execute("drop table if exists package_data cascade") + engine.execute("drop table if exists data cascade") + engine.execute("drop table if exists collection_file_item") + engine.execute("drop table if exists collection_file_status cascade") # This is the old table name + engine.execute("drop table if exists collection_file cascade") + engine.execute("drop table if exists source_session_file_status cascade") # This is the old table name + engine.execute("drop table if exists collection cascade") + engine.execute("drop table if exists source_session cascade") # This is the old table name + engine.execute("drop table if exists alembic_version cascade") + + def create_tables(self): + # Note this DOES NOT work with self.config! + # It works with a brand new config instance that is created in ocdskingfisher/maindatabase/migrations/env.py + # Not sure how to solve that + alembicargs = [ + '--config', os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'mainalembic.ini')), + '--raiseerr', + 'upgrade', 'head', + ] + alembic.config.main(argv=alembicargs) + + def get_or_create_collection_id(self, source_id, data_version, sample): + + with self.get_engine().begin() as connection: + s = sa.sql.select([self.collection_table]) \ + .where((self.collection_table.c.source_id == source_id) & + (self.collection_table.c.data_version == data_version) & + (self.collection_table.c.sample == sample)) + result = connection.execute(s) + collection = result.fetchone() + if collection: + return collection['id'] + + value = connection.execute(self.collection_table.insert(), { + 'source_id': source_id, + 'data_version': data_version, + 'sample': sample, + 'store_start_at': datetime.datetime.utcnow(), + }) + return value.inserted_primary_key[0] + + def get_all_collections(self): + out = [] + with self.get_engine().begin() as connection: + s = sa.sql.select([self.collection_table]) + for result in connection.execute(s): + out.append(CollectionModel( + database_id=result['id'], + source_id=result['source_id'], + data_version=result['data_version'], + sample=result['sample'], + )) + return out + + def get_collection(self, collection_id): + with self.get_engine().begin() as connection: + s = sa.sql.select([self.collection_table]) \ + .where(self.collection_table.c.id == collection_id) + result = connection.execute(s) + collection = result.fetchone() + if collection: + return CollectionModel( + database_id=collection['id'], + source_id=collection['source_id'], + data_version=collection['data_version'], + sample=collection['sample'], + ) + + def get_all_files_in_collection(self, collection_id): + out = [] + with self.get_engine().begin() as connection: + s = sa.sql.select([self.collection_file_table]) \ + .where(self.collection_file_table.c.collection_id == collection_id) + for result in connection.execute(s): + out.append(FileModel( + database_id=result['id'], + filename=result['filename'], + )) + return out + + def get_all_files_items_in_file(self, file): + out = [] + with self.get_engine().begin() as connection: + s = sa.sql.select([self.collection_file_item_table]) \ + .where(self.collection_file_item_table.c.collection_file_id == file.database_id) + for result in connection.execute(s): + out.append(FileItemModel( + database_id=result['id'], + )) + return out + + def is_release_check_done(self, release_id, override_schema_version=None): + with self.get_engine().begin() as connection: + s = sa.sql.select([self.release_check_table]) \ + .where((self.release_check_table.c.release_id == release_id) & + (self.release_check_table.c.override_schema_version == override_schema_version)) + result = connection.execute(s) + if result.fetchone(): + return True + + s = sa.sql.select([self.release_check_error_table]) \ + .where((self.release_check_error_table.c.release_id == release_id) & + (self.release_check_error_table.c.override_schema_version == override_schema_version)) + result = connection.execute(s) + if result.fetchone(): + return True + + return False + + def is_record_check_done(self, record_id, override_schema_version=None): + with self.get_engine().begin() as connection: + s = sa.sql.select([self.record_check_table]) \ + .where((self.record_check_table.c.record_id == record_id) & + (self.record_check_table.c.override_schema_version == override_schema_version)) + result = connection.execute(s) + if result.fetchone(): + return True + + s = sa.sql.select([self.record_check_error_table]) \ + .where((self.record_check_error_table.c.record_id == record_id) & + (self.record_check_error_table.c.override_schema_version == override_schema_version)) + result = connection.execute(s) + if result.fetchone(): + return True + + return False + + def mark_collection_file_store_done(self, collection_id, filename): + with self.get_engine().begin() as connection: + connection.execute( + self.collection_file_table.update() + .where((self.collection_file_table.c.collection_id == collection_id) & + (self.collection_file_table.c.filename == filename)) + .values(store_end_at=datetime.datetime.utcnow()) + ) + + +class DatabaseStore: + + def __init__(self, database, collection_id, file_name, number): + self.database = database + self.collection_id = collection_id + self.file_name = file_name + self.number = number + self.connection = None + self.transaction = None + self.collection_file_id = None + self.collection_file_item_id = None + + def __enter__(self): + self.connection = self.database.get_engine().connect() + self.transaction = self.connection.begin() + + # Collection File! + s = sa.sql.select([self.database.collection_file_table]) \ + .where((self.database.collection_file_table.c.collection_id == self.collection_id) & + (self.database.collection_file_table.c.filename == self.file_name)) + result = self.connection.execute(s) + + collection_file_table_row = result.fetchone() + + if collection_file_table_row: + self.collection_file_id = collection_file_table_row['id'] + else: + value = self.connection.execute(self.database.collection_file_table.insert(), { + 'collection_id': self.collection_id, + 'filename': self.file_name, + 'store_start_at': datetime.datetime.utcnow(), + # TODO store warning? + }) + # TODO look for unique key clashes, error appropriately! + self.collection_file_id = value.inserted_primary_key[0] + + # Collection File Item! + + value = self.connection.execute(self.database.collection_file_item_table.insert(), { + 'collection_file_id': self.collection_file_id, + 'number': self.number, + 'store_start_at': datetime.datetime.utcnow(), + }) + # TODO look for unique key clashes, error appropriately! + self.collection_file_item_id = value.inserted_primary_key[0] + + return self + + def __exit__(self, type, value, traceback): + + if type: + + self.transaction.rollback() + + self.connection.close() + + else: + + self.connection.execute( + self.database.collection_file_item_table.update() + .where(self.database.collection_file_item_table.c.id == self.collection_file_item_id) + .values(store_end_at=datetime.datetime.utcnow()) + ) + + self.transaction.commit() + + self.connection.close() + + def insert_record(self, row, package_data): + ocid = row.get('ocid') + package_data_id = self.get_id_for_package_data(package_data) + data_id = self.get_id_for_data(row) + self.connection.execute(self.database.record_table.insert(), { + 'collection_file_item_id': self.collection_file_item_id, + 'ocid': ocid, + 'data_id': data_id, + 'package_data_id': package_data_id, + }) + + def insert_release(self, row, package_data): + ocid = row.get('ocid') + release_id = row.get('id') + package_data_id = self.get_id_for_package_data(package_data) + data_id = self.get_id_for_data(row) + self.connection.execute(self.database.release_table.insert(), { + 'collection_file_item_id': self.collection_file_item_id, + 'release_id': release_id, + 'ocid': ocid, + 'data_id': data_id, + 'package_data_id': package_data_id, + }) + + def get_id_for_package_data(self, package_data): + + hash_md5 = get_hash_md5_for_data(package_data) + + s = sa.sql.select([self.database.package_data_table]).where(self.database.package_data_table.c.hash_md5 == hash_md5) + result = self.connection.execute(s) + existing_table_row = result.fetchone() + if existing_table_row: + return existing_table_row.id + else: + return self.connection.execute(self.database.package_data_table.insert(), { + 'hash_md5': hash_md5, + 'data': package_data, + }).inserted_primary_key[0] + + def get_id_for_data(self, data): + + hash_md5 = get_hash_md5_for_data(data) + + s = sa.sql.select([self.database.data_table]).where(self.database.data_table.c.hash_md5 == hash_md5) + result = self.connection.execute(s) + existing_table_row = result.fetchone() + if existing_table_row: + return existing_table_row.id + else: + return self.connection.execute(self.database.data_table.insert(), { + 'hash_md5': hash_md5, + 'data': data, + }).inserted_primary_key[0] diff --git a/ocdskingfisherprocess/maindatabase/__init__.py b/ocdskingfisherprocess/maindatabase/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ocdskingfisherprocess/maindatabase/migrations/README b/ocdskingfisherprocess/maindatabase/migrations/README new file mode 100755 index 00000000..98e4f9c4 --- /dev/null +++ b/ocdskingfisherprocess/maindatabase/migrations/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/ocdskingfisherprocess/maindatabase/migrations/__init__.py b/ocdskingfisherprocess/maindatabase/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ocdskingfisherprocess/maindatabase/migrations/env.py b/ocdskingfisherprocess/maindatabase/migrations/env.py new file mode 100755 index 00000000..0a762155 --- /dev/null +++ b/ocdskingfisherprocess/maindatabase/migrations/env.py @@ -0,0 +1,75 @@ +from __future__ import with_statement +from alembic import context +from sqlalchemy import engine_from_config, pool +from ocdskingfisherprocess.config import Config + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +kingfisher_config = Config() +kingfisher_config.load_user_config() + +config.set_main_option("sqlalchemy.url", kingfisher_config.database_uri) + +# We do NOT set up logging here; our command line tool already does that. +# Also if this code is being used as a library, it's up to the calling code to configure logging, not us. + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, target_metadata=target_metadata, literal_binds=True) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix='sqlalchemy.', + poolclass=pool.NullPool) + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/ocdskingfisherprocess/maindatabase/migrations/script.py.mako b/ocdskingfisherprocess/maindatabase/migrations/script.py.mako new file mode 100755 index 00000000..2c015630 --- /dev/null +++ b/ocdskingfisherprocess/maindatabase/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/ocdskingfisherprocess/maindatabase/migrations/versions/0177d90da78e_start.py b/ocdskingfisherprocess/maindatabase/migrations/versions/0177d90da78e_start.py new file mode 100644 index 00000000..0187081d --- /dev/null +++ b/ocdskingfisherprocess/maindatabase/migrations/versions/0177d90da78e_start.py @@ -0,0 +1,138 @@ +"""start + +Revision ID: 0177d90da78e +Revises: +Create Date: 2018-12-14 08:58:07.868680 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + + +# revision identifiers, used by Alembic. +revision = '0177d90da78e' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('collection', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('source_id', sa.Text, nullable=False), + sa.Column('data_version', sa.Text, nullable=False), + sa.Column('store_start_at', sa.DateTime(timezone=False), nullable=False), + sa.Column('store_end_at', sa.DateTime(timezone=False), nullable=True), + sa.Column('sample', sa.Boolean, nullable=False, default=False), + sa.UniqueConstraint('source_id', 'data_version', 'sample'), + ) + + op.create_table('collection_file', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('collection_id', sa.Integer, + sa.ForeignKey("collection.id"), nullable=False), + sa.Column('filename', sa.Text, nullable=True), + sa.Column('store_start_at', sa.DateTime(timezone=False), + nullable=True), + sa.Column('store_end_at', sa.DateTime(timezone=False), + nullable=True), + sa.Column('warnings', JSONB, nullable=True), + sa.UniqueConstraint('collection_id', 'filename'), + ) + + op.create_table('collection_file_item', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('collection_file_id', sa.Integer, + sa.ForeignKey("collection_file.id"), + nullable=False), + sa.Column('store_start_at', sa.DateTime(timezone=False), + nullable=True), + sa.Column('store_end_at', sa.DateTime(timezone=False), + nullable=True), + sa.Column('number', sa.Integer), + sa.UniqueConstraint('collection_file_id', 'number'), + ) + + op.create_table('data', + + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('hash_md5', sa.Text, nullable=False, unique=True), + sa.Column('data', JSONB, nullable=False), + ) + + op.create_table('package_data', + + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('hash_md5', sa.Text, nullable=False, unique=True), + sa.Column('data', JSONB, nullable=False), + ) + + op.create_table('release', + + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('collection_file_item_id', sa.Integer, + sa.ForeignKey("collection_file_item.id"), nullable=False), + sa.Column('release_id', sa.Text, nullable=True), + sa.Column('ocid', sa.Text, nullable=True), + sa.Column('data_id', sa.Integer, sa.ForeignKey("data.id"), nullable=False), + sa.Column('package_data_id', sa.Integer, sa.ForeignKey("package_data.id"), + nullable=False), + ) + + op.create_table('record', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('collection_file_item_id', sa.Integer, + sa.ForeignKey("collection_file_item.id"), nullable=False), + sa.Column('ocid', sa.Text, nullable=True), + sa.Column('data_id', sa.Integer, sa.ForeignKey("data.id"), nullable=False), + sa.Column('package_data_id', sa.Integer, sa.ForeignKey("package_data.id"), + nullable=False), + ) + + op.create_table('release_check', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('release_id', sa.Integer, sa.ForeignKey("release.id"), index=True, + unique=False, nullable=False), + sa.Column('override_schema_version', sa.Text, nullable=True), + sa.Column('cove_output', JSONB, nullable=False), + sa.UniqueConstraint('release_id', 'override_schema_version', + name='ix_release_check_release_id_and_more') + ) + + op.create_table('record_check', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('record_id', sa.Integer, sa.ForeignKey("record.id"), index=True, + unique=False, + nullable=False), + sa.Column('override_schema_version', sa.Text, nullable=True), + sa.Column('cove_output', JSONB, nullable=False), + sa.UniqueConstraint('record_id', 'override_schema_version', + name='ix_record_check_record_id_and_more') + ) + + op.create_table('release_check_error', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('release_id', sa.Integer, sa.ForeignKey("release.id"), + index=True, + unique=False, nullable=False), + sa.Column('override_schema_version', sa.Text, nullable=True), + sa.Column('error', sa.Text, nullable=False), + sa.UniqueConstraint('release_id', 'override_schema_version', + name='ix_release_check_error_release_id_and_more') + ) + + op.create_table('record_check_error', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('record_id', sa.Integer, sa.ForeignKey("record.id"), + index=True, + unique=False, nullable=False), + sa.Column('override_schema_version', sa.Text, nullable=True), + sa.Column('error', sa.Text, nullable=False), + sa.UniqueConstraint('record_id', 'override_schema_version', + name='ix_record_check_error_record_id_and_more') + ) + + +def downgrade(): + pass diff --git a/ocdskingfisherprocess/maindatabase/migrations/versions/__init__.py b/ocdskingfisherprocess/maindatabase/migrations/versions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ocdskingfisherprocess/models.py b/ocdskingfisherprocess/models.py new file mode 100644 index 00000000..93388c9c --- /dev/null +++ b/ocdskingfisherprocess/models.py @@ -0,0 +1,22 @@ + + +class CollectionModel: + + def __init__(self, database_id=None, source_id=None, data_version=None, sample=None): + self.database_id = database_id + self.source_id = source_id + self.data_version = data_version + self.sample = sample + + +class FileModel: + + def __init__(self, database_id=None, filename=None): + self.database_id = database_id + self.filename = filename + + +class FileItemModel: + + def __init__(self, database_id=None): + self.database_id = database_id diff --git a/ocdskingfisherprocess/store.py b/ocdskingfisherprocess/store.py new file mode 100644 index 00000000..e8a4ca7a --- /dev/null +++ b/ocdskingfisherprocess/store.py @@ -0,0 +1,129 @@ +import json +from ocdskingfisherprocess.database import DatabaseStore + + +class Store: + + def __init__(self, config, database): + self.config = config + self.collection_id = None + self.database = database + + def load_collection(self, collection_source, collection_data_version, collection_sample): + self.collection_id = self.database.get_or_create_collection_id(collection_source, collection_data_version, collection_sample) + + def store_file_from_local(self, filename, url, data_type, encoding, local_filename): + + if data_type == 'release_package_json_lines' or data_type == 'record_package_json_lines': + try: + with open(local_filename, encoding=encoding) as f: + number = 0 + raw_data = f.readline() + while raw_data: + self.store_file_item(filename, url, data_type, json.loads(raw_data), number) + raw_data = f.readline() + number += 1 + except Exception as e: + raise e + # TODO Store error in database and make nice HTTP response! + + else: + try: + with open(local_filename, encoding=encoding) as f: + data = json.load(f) + + except Exception as e: + raise e + # TODO Store error in database and make nice HTTP response! + + objects_list = [] + if data_type == 'record_package_list_in_results': + objects_list.extend(data['results']) + elif data_type == 'release_package_list_in_results': + objects_list.extend(data['results']) + elif data_type == 'record_package_list' or data_type == 'release_package_list': + objects_list.extend(data) + else: + objects_list.append(data) + + number = 0 + for item_data in objects_list: + + try: + self.store_file_item(filename, url, data_type, item_data, number) + number += 1 + + except Exception as e: + raise e + # TODO Store error in database and make nice HTTP response! + + self.database.mark_collection_file_store_done(self.collection_id, filename) + + def store_file_item_from_local(self, filename, url, data_type, encoding, number, local_filename): + + try: + with open(local_filename, encoding=encoding) as f: + data = json.load(f) + + except Exception as e: + raise e + # TODO Store error in database and make nice HTTP response! + + try: + self.store_file_item(filename, url, data_type, data, number) + + except Exception as e: + raise e + # TODO Store error in database and make nice HTTP response! + + def store_file_item(self, filename, url, data_type, json_data, number): + + if not isinstance(json_data, dict): + raise Exception("Can not process data as JSON is not an object") + + with DatabaseStore(database=self.database, collection_id=self.collection_id, file_name=filename, number=number) as store: + + if data_type == 'release' or data_type == 'record': + data_list = [json_data] + elif data_type == 'release_package' or \ + data_type == 'release_package_json_lines' or \ + data_type == 'release_package_list_in_results' or \ + data_type == 'release_package_list': + if 'releases' not in json_data: + if data_type == 'release_package_json_lines' and \ + self.ignore_release_package_json_lines_missing_releases_error: + return + raise Exception("Release list not found") + elif not isinstance(json_data['releases'], list): + raise Exception("Release list which is not a list found") + data_list = json_data['releases'] + elif data_type == 'record_package' or \ + data_type == 'record_package_json_lines' or \ + data_type == 'record_package_list_in_results' or \ + data_type == 'record_package_list': + if 'records' not in json_data: + raise Exception("Record list not found") + elif not isinstance(json_data['records'], list): + raise Exception("Record list which is not a list found") + data_list = json_data['records'] + else: + raise Exception("data_type not a known type") + + package_data = {} + if not data_type == 'release': + for key, value in json_data.items(): + if key not in ('releases', 'records'): + package_data[key] = value + + for row in data_list: + if not isinstance(row, dict): + raise Exception("Row in data is not a object") + + if data_type == 'record' or \ + data_type == 'record_package' or \ + data_type == 'record_package_json_lines' or \ + data_type == 'record_package_list_in_results' or \ + data_type == 'record_package_list': + store.insert_record(row, package_data) + else: + store.insert_release(row, package_data) diff --git a/ocdskingfisherprocess/util.py b/ocdskingfisherprocess/util.py new file mode 100644 index 00000000..18ff12a8 --- /dev/null +++ b/ocdskingfisherprocess/util.py @@ -0,0 +1,131 @@ +import time +import requests +import json +import hashlib + +RETRY_TIME = 10 + + +def get_hash_md5_for_data(data): + data_str = json.dumps(data, sort_keys=True) + return hashlib.md5(data_str.encode('utf-8')).hexdigest() + + +def get_url_request(url, headers=None, stream=False, tries=1, errors=None, verify_ssl=True): + ''' + Handle transient network errors, and URLs with + intermittent timeouts. + ''' + if not errors: + errors = [] + + error_msg = None + + if tries > 3: + return (None, errors) + try: + r = requests.get(url, headers=headers, stream=stream, verify=verify_ssl) + r.raise_for_status() + except requests.exceptions.Timeout: + error_msg = 'Request timeout' + except requests.ConnectionError: + error_msg = 'Connection error' + except requests.exceptions.TooManyRedirects: + error_msg = 'Too many redirects' + except requests.exceptions.RequestException as e: + error_msg = 'Request exception (Code %s): %s' % (r.status_code, e) + + if not error_msg: + return r, [] + + # only add to errors list if it is different from last error. + if not errors or (errors and errors[-1] != error_msg): + errors.append(error_msg) + + time.sleep(RETRY_TIME) + return get_url_request(url, headers, stream, tries + 1, errors) + + +control_codes_to_filter_out = [ + b'\\u0000', + b'\x02', + b'\x03', + b'\x04', + b'\x05', + b'\x06', + b'\x07', + b'\x08', + b'\x09', + b'\x0A', + b'\x0B', + b'\x0C', + b'\x0D', + b'\x0E', + b'\x0F', + b'\x10', + b'\x11', + b'\x12', + b'\x13', + b'\x14', + b'\x15', + b'\x16', + b'\x17', + b'\x18', + b'\x19', + b'\x1A', + b'\x1B', + b'\x1C', + b'\x1D', + b'\x1E', + b'\x1F', +] + + +def control_code_to_filter_out_to_human_readable(control_code_to_filter_out): + if len(control_code_to_filter_out) == 1: + return 'chr('+str(ord(control_code_to_filter_out))+')' + else: + return control_code_to_filter_out + + +def save_content(url, filepath, headers=None, verify_ssl=True, replace_control_codes=True): + request, errors = get_url_request(url, stream=True, headers=headers, verify_ssl=verify_ssl) + + if not request: + return SaveContentResult(errors=errors) + + warnings = [] + try: + with open(filepath, 'wb') as f: + for chunk in request.iter_content(1024 ^ 2): + for control_code_to_filter_out in control_codes_to_filter_out: + if replace_control_codes and control_code_to_filter_out in chunk: + chunk = chunk.replace(control_code_to_filter_out, b'') + warning = 'We had to replace control codes: ' \ + + control_code_to_filter_out_to_human_readable(control_code_to_filter_out) + if warning not in warnings: + warnings.append(warning) + f.write(chunk) + return SaveContentResult(warnings=warnings) + except Exception as e: + return SaveContentResult(errors=[str(e)], warnings=warnings) + + +class SaveContentResult: + def __init__(self, warnings=[], errors=[]): + self.errors = errors + self.warnings = warnings + + +# if is_json: +# try: +# data = r.json() +# return data, [] +# except json.JSONDecodeError: +# error_msg = 'Failed to decode json' +# else: +# try: +# content = r.content +# return content, [] +# except Exception as e: +# error_msg = 'Unable to decode content: %s' % e diff --git a/ocdskingfisherprocess/web/__init__.py b/ocdskingfisherprocess/web/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ocdskingfisherprocess/web/app.py b/ocdskingfisherprocess/web/app.py new file mode 100644 index 00000000..904f4273 --- /dev/null +++ b/ocdskingfisherprocess/web/app.py @@ -0,0 +1,104 @@ +from flask import Flask, request +from ocdskingfisherprocess.config import Config +from ocdskingfisherprocess.store import Store +from ocdskingfisherprocess.database import DataBase +import tempfile +import os + +config = Config() +config.load_user_config() + +app = Flask(__name__) + + +@app.route("/") +def hello(): + return "OCDS Kingfisher" + + +@app.route("/robots.txt") +def robots_txt(): + return "User-agent: *\nDisallow: /" + + +@app.route("/api/") +def api(): + return "OCDS Kingfisher APIs" + + +@app.route("/api/v1/") +def api_v1(): + return "OCDS Kingfisher APIs V1" + + +@app.route("/api/v1/submit/file/", methods=['POST']) +def api_v1_submit_file(): + # TODO this allows GET API_KEY values only, allow POST and header too! + api_key = request.args.get('API_KEY') + if not api_key or api_key not in config.web_api_keys: + return "ACCESS DENIED" # TODO proper error + + # TODO check all required fields are there! + + database = DataBase(config=config) + store = Store(config=config, database=database) + + store.load_collection( + request.form.get('collection_source'), + request.form.get('collection_data_version'), + True if request.form.get('collection_sample', '0') in ['1'] else False, + ) + + (tmp_file, tmp_filename) = tempfile.mkstemp(prefix="ocdskf-") + os.close(tmp_file) + + request.files['file'].save(tmp_filename) + + store.store_file_from_local( + request.form.get('file_name'), + request.form.get('file_url'), + request.form.get('file_data_type'), + request.form.get('file_encoding'), + tmp_filename + ) + + os.remove(tmp_filename) + + return "OCDS Kingfisher APIs V1 Submit" + + +@app.route("/api/v1/submit/item/", methods=['POST']) +def api_v1_submit_item(): + # TODO this allows GET API_KEY values only, allow POST and header too! + api_key = request.args.get('API_KEY') + if not api_key or api_key not in config.web_api_keys: + return "ACCESS DENIED" # TODO proper error + + # TODO check all required fields are there! + + database = DataBase(config=config) + store = Store(config=config, database=database) + + store.load_collection( + request.form.get('collection_source'), + request.form.get('collection_data_version'), + True if request.form.get('collection_sample', '0') in ['1'] else False, + ) + + (tmp_file, tmp_filename) = tempfile.mkstemp(prefix="ocdskf-") + os.close(tmp_file) + + request.files['file'].save(tmp_filename) + + store.store_file_item_from_local( + request.form.get('file_name'), + request.form.get('file_url'), + request.form.get('file_data_type'), + request.form.get('file_encoding'), + int(request.form.get('number')), + tmp_filename + ) + + os.remove(tmp_filename) + + return "OCDS Kingfisher APIs V1 Submit" diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..6ecfe2fa --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +python_files=test*.py diff --git a/requirements.in b/requirements.in new file mode 100644 index 00000000..bc24cbe0 --- /dev/null +++ b/requirements.in @@ -0,0 +1,6 @@ +-e git+https://github.com/OpenDataServices/flatten-tool.git@v0.5.0#egg=flattentool +-e git+https://github.com/open-contracting/lib-cove-ocds.git@v0.1.0#egg=libcoveocds +Flask +alembic +pgpasslib +psycopg2 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..f83b3afc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,38 @@ +-e git+https://github.com/OpenDataServices/flatten-tool.git@4c13ef0b32a59e810919a3de09bc8f64ce8f9392#egg=flattentool +-e git+https://github.com/open-contracting/lib-cove-ocds.git@2487c7160bb102559d34039a14bdeea38e25d5b6#egg=libcoveocds +Flask==1.0.2 +alembic==1.0.5 +pgpasslib==1.1.0 +psycopg2==2.7.6.1 +## The following requirements were added by pip freeze: +bleach==3.0.2 +cached-property==1.5.1 +certifi==2018.11.29 +chardet==3.0.4 +Click==7.0 +commonmark==0.8.1 +Django==2.1.4 +et-xmlfile==1.0.1 +future==0.17.1 +idna==2.8 +itsdangerous==1.1.0 +jdcal==1.4 +Jinja2==2.10 +json-merge-patch==0.2 +jsonref==0.2 +jsonschema==2.6.0 +lxml==4.2.5 +Mako==1.0.7 +MarkupSafe==1.1.0 +openpyxl==2.5.12 +python-dateutil==2.7.5 +python-editor==1.0.3 +pytz==2018.7 +requests==2.21.0 +schema==0.6.8 +six==1.12.0 +SQLAlchemy==1.2.15 +urllib3==1.24.1 +webencodings==0.5.1 +Werkzeug==0.14.1 +xmltodict==0.11.0 diff --git a/sample_ocdsdata_config.ini b/sample_ocdsdata_config.ini new file mode 100644 index 00000000..e3e1707e --- /dev/null +++ b/sample_ocdsdata_config.ini @@ -0,0 +1,10 @@ +; sample_ocdsdata_config.ini +[DBHOST] +HOSTNAME = localhost +PORT = 5432 +USERNAME = ocdsdata +PASSWORD = FIXME +DBNAME = ocdsdata + +[WEB] +;API_KEYS = FIXME diff --git a/sample_ocdsdata_logging_config.json b/sample_ocdsdata_logging_config.json new file mode 100644 index 00000000..6a8732dd --- /dev/null +++ b/sample_ocdsdata_logging_config.json @@ -0,0 +1,61 @@ +{ + "version": 1, + "disable_existing_loggers": false, + "formatters": { + "simple": { + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + } + }, + + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": "WARN", + "formatter": "simple", + "stream": "ext://sys.stdout" + }, + + "info_file_handler": { + "class": "logging.handlers.RotatingFileHandler", + "level": "INFO", + "formatter": "simple", + "filename": "info.log", + "maxBytes": 10485760, + "backupCount": 20, + "encoding": "utf8" + }, + + "error_file_handler": { + "class": "logging.handlers.RotatingFileHandler", + "level": "ERROR", + "formatter": "simple", + "filename": "errors.log", + "maxBytes": 10485760, + "backupCount": 20, + "encoding": "utf8" + } + }, + + "loggers": { + "ocdskingfisher": { + "level": "DEBUG", + "handlers": ["console","info_file_handler","error_file_handler"], + "propagate": false + }, + "sqlalchemy": { + "level": "WARN", + "handlers": ["console","info_file_handler","error_file_handler"], + "propagate": false + }, + "alembic": { + "level": "INFO", + "handlers": ["console","info_file_handler","error_file_handler"], + "propagate": false + } + }, + + "root": { + "level": "WARN", + "handlers": ["console", "info_file_handler", "error_file_handler"] + } +} \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..0b99afff --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +import io +import os +from distutils.core import setup + + +here = os.path.abspath(os.path.dirname(__file__)) + +# Import the README and use it as the long-description. +with io.open(os.path.join(here, 'README.md'), encoding='utf-8') as f: + long_description = '\n' + f.read() + +setup(name='ocdskingfisher', + version='0.0.1', + description='Get, extract and process data in the Open Contracting Data Standard format', + long_description=long_description, + long_description_content_type='text/markdown', + author='Open Contracting Partnership, Open Data Services, Iniciativa Latinoamericana para los Datos Abiertos', + author_email='data@open-contracting.org', + url='https://open-contracting.org', + license='BSD', + packages=[ + 'ocdskingfisher', + 'ocdskingfisher.maindatabase', + 'ocdskingfisher.maindatabase.migrations', + 'ocdskingfisher.maindatabase.migrations.versions', + 'ocdskingfisher.cli', + 'ocdskingfisher.cli.commands', + 'ocdskingfisher.web' + ], + scripts=['ocdskingfisher-process-cli'], + package_data={'ocdskingfisher': [ + 'maindatabase/migrations/script.py.mako' + ]}, + include_package_data=True + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 00000000..26e34711 --- /dev/null +++ b/tests/base.py @@ -0,0 +1,14 @@ +from ocdskingfisherprocess.database import DataBase +from ocdskingfisherprocess.config import Config + + +class BaseTest: + + def setup_method(self, test_method): + self.config = Config() + self.config.load_user_config() + self.database = DataBase(config=self.config) + + def setup_main_database(self): + self.database.delete_tables() + self.database.create_tables() diff --git a/tests/tests.py b/tests/tests.py new file mode 100644 index 00000000..b7c03584 --- /dev/null +++ b/tests/tests.py @@ -0,0 +1,26 @@ +import ocdskingfisherprocess.util +from tests.base import BaseTest + + +class TestDataBase(BaseTest): + + def test_create_tables(self): + self.setup_main_database() + + +class TestUtil(BaseTest): + + def test_database_get_hash_md5_for_data(self): + assert ocdskingfisherprocess.util.get_hash_md5_for_data({'cats': 'many'}) == '538dd075f4a37d77be84c683b711d644' + + def test_database_get_hash_md5_for_data2(self): + assert ocdskingfisherprocess.util.get_hash_md5_for_data({'cats': 'none'}) == '562c5f4221c75c8f08da103cc10c4e4c' + + +class TestControlCodes(BaseTest): + + def test_control_code_to_filter_out_to_human_readable(self): + for control_code_to_filter_out in ocdskingfisherprocess.util.control_codes_to_filter_out: + # This test just calls it and make sure it runs without crashing + # (some code was crashing, so wanted test to check all future values of control_codes_to_filter_out) + print(ocdskingfisherprocess.util.control_code_to_filter_out_to_human_readable(control_code_to_filter_out))