diff --git a/config.py.sample b/config.py.sample index b3d98f93b1..6ae6263fb5 100644 --- a/config.py.sample +++ b/config.py.sample @@ -10,7 +10,8 @@ SQLALCHEMY_DATABASE_URI = "postgresql://listenbrainz:listenbrainz@db:5432/listen MESSYBRAINZ_SQLALCHEMY_DATABASE_URI = "postgresql://messybrainz:messybrainz@db:5432/messybrainz" # Database for testing -TEST_SQLALCHEMY_DATABASE_URI = "postgresql://lb_test@/lb_test" +TEST_SQLALCHEMY_DATABASE_URI = "postgresql://lb_test:lb_test@db:5432/lb_test" + # Other postgres configuration options # Oldest listens which can be stored in the database, in days. diff --git a/db/__init__.py b/db/__init__.py index cb076d5ee9..043172797f 100644 --- a/db/__init__.py +++ b/db/__init__.py @@ -1,21 +1,24 @@ -from flask_sqlalchemy import SQLAlchemy +from sqlalchemy import create_engine +from sqlalchemy.pool import NullPool # This value must be incremented after schema changes on replicated tables! SCHEMA_VERSION = 1 -db = SQLAlchemy() +engine = None -def init_db_connection(app): +def init_db_connection(connect_str): """Initializes database connection using the specified Flask app. Configuration file must contain `SQLALCHEMY_DATABASE_URI` key. See https://pythonhosted.org/Flask-SQLAlchemy/config.html#configuration-keys for more info. """ - db.init_app(app) + global engine + engine = create_engine(connect_str, poolclass=NullPool) def run_sql_script(sql_file_path): with open(sql_file_path) as sql: - db.session.connection().execute(sql.read()) + with engine.connect() as connection: + connection.execute(sql.read()) diff --git a/db/test/__init__.py b/db/test/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/db/test/test_user.py b/db/test/test_user.py new file mode 100644 index 0000000000..45faafe00d --- /dev/null +++ b/db/test/test_user.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +from db.testing import DatabaseTestCase +import db.user + + +class UserTestCase(DatabaseTestCase): + + def test_create(self): + user_id = db.user.create("izzy_cheezy") + self.assertIsNotNone(db.user.get(user_id)) diff --git a/db/testing.py b/db/testing.py new file mode 100644 index 0000000000..b003c0821c --- /dev/null +++ b/db/testing.py @@ -0,0 +1,43 @@ +import db +import unittest +import json +import os + +# Configuration +import sys +sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) +import config + +ADMIN_SQL_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'admin', 'sql') +TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_data') + + +class DatabaseTestCase(unittest.TestCase): + + def setUp(self): + self.config = config + db.init_db_connection(config.TEST_SQLALCHEMY_DATABASE_URI) + self.reset_db() + + def tearDown(self): + pass + + def reset_db(self): + self.drop_tables() + self.init_db() + + def init_db(self): + db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_tables.sql')) + db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_primary_keys.sql')) + db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_foreign_keys.sql')) + db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_indexes.sql')) + + def drop_tables(self): + with db.engine.connect() as connection: + connection.execute('DROP TABLE IF EXISTS "user" CASCADE') + connection.execute('DROP TABLE IF EXISTS listen CASCADE') + + def load_data_files(self): + """ Get the data files from the disk """ + # return os.path.join(TEST_DATA_PATH, file_name) + return diff --git a/db/user.py b/db/user.py index e694140018..da20e846c0 100644 --- a/db/user.py +++ b/db/user.py @@ -1,44 +1,47 @@ -from db import db +import db import uuid - +import sqlalchemy def create(musicbrainz_id): - result = db.session.execute("""INSERT INTO "user" (musicbrainz_id, auth_token) - VALUES (:mb_id, :token) - RETURNING id""", - {"mb_id": musicbrainz_id, "token": str(uuid.uuid4())}) - db.session.commit() - return result.fetchone()["id"] + with db.engine.connect() as connection: + result = connection.execute(sqlalchemy.text("""INSERT INTO "user" (musicbrainz_id, auth_token) + VALUES (:mb_id, :token) + RETURNING id""" ), + {"mb_id": musicbrainz_id, "token": str(uuid.uuid4())}) + return result.fetchone()["id"] def get(id): """Get user with a specified ID (integer).""" - result = db.session.execute("""SELECT id, created, musicbrainz_id, auth_token - FROM "user" - WHERE id = :id""", - {"id": id}) - row = result.fetchone() - return dict(row) if row else None + with db.engine.connect() as connection: + result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id, auth_token + FROM "user" + WHERE id = :id"""), + {"id": id}) + row = result.fetchone() + return dict(row) if row else None def get_by_mb_id(musicbrainz_id): """Get user with a specified MusicBrainz ID.""" - result = db.session.execute("""SELECT id, created, musicbrainz_id, auth_token - FROM "user" - WHERE LOWER(musicbrainz_id) = LOWER(:mb_id)""", - {"mb_id": musicbrainz_id}) - row = result.fetchone() - return dict(row) if row else None + with db.engine.connect() as connection: + result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id, auth_token + FROM "user" + WHERE LOWER(musicbrainz_id) = LOWER(:mb_id)"""), + {"mb_id": musicbrainz_id}) + row = result.fetchone() + return dict(row) if row else None def get_by_token(token): """Get user from an auth token""" - result = db.session.execute("""SELECT id, created, musicbrainz_id - FROM "user" - WHERE auth_token = :auth_token""", - {"auth_token": token}) - row = result.fetchone() - return dict(row) if row else None + with db.engine.connect() as connection: + result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id + FROM "user" + WHERE auth_token = :auth_token"""), + {"auth_token": token}) + row = result.fetchone() + return dict(row) if row else None def get_or_create(musicbrainz_id): diff --git a/listenstore/listenstore/cli.py b/listenstore/listenstore/cli.py index 262b94e889..590202dd01 100644 --- a/listenstore/listenstore/cli.py +++ b/listenstore/listenstore/cli.py @@ -4,7 +4,6 @@ from .utils import argparse_factory, parse_args_and_config from .listenstore import PostgresListenStore -from .kafkaconsumer import KafkaConsumer class Command(object): @@ -19,7 +18,6 @@ def __init__(self): You have no logger, databases, or config in __init__. """ self.opt_parser = argparse_factory(self.desc) self._listenStore = None - self._kafkaConsumer = None # NB: only sets level after our Command starts running @@ -53,6 +51,7 @@ def start(self): finally: self.cleanup() + def cleanup(self): return @@ -67,11 +66,5 @@ def listen_store(self): """ pass - @property - def kafka_consumer(self): - if self._kafkaConsumer is None: - self._kafkaConsumer = KafkaConsumer(self.config) - return self._kafkaConsumer - def renice(self, increment): os.nice(increment) diff --git a/listenstore/listenstore/listenstore.py b/listenstore/listenstore/listenstore.py index 37ad45d50e..da33661136 100644 --- a/listenstore/listenstore/listenstore.py +++ b/listenstore/listenstore/listenstore.py @@ -96,9 +96,10 @@ def insert_postgresql(self, listens): except Exception, e: # Log errors self.log.error(e) - def execute(self, connection, query, params=None): - res = connection.execute(query, params) - return res.fetchall() + def execute(self, query, params={}): + with self.engine.connect() as connection: + res = connection.execute(query, params) + return res.fetchall() def fetch_listens_from_storage(self, user_id, from_id, to_id, limit, order, precision): query = """ SELECT id, user_id, extract(epoch from ts), artist_msid, album_msid, recording_msid, raw_data """ + \ @@ -106,12 +107,12 @@ def fetch_listens_from_storage(self, user_id, from_id, to_id, limit, order, prec """ AND extract(epoch from ts) > %(from_id)s AND extract(epoch from ts) < %(to_id)s """ + \ """ ORDER BY extract(epoch from ts) """ + ORDER_TEXT[order] + """ LIMIT %(limit)s""" params = { - 'user_id' : user_id, - 'from_id' : from_id, - 'to_id' : to_id, - 'limit' : limit + 'user_id': user_id, + 'from_id': from_id, + 'to_id': to_id, + 'limit': limit } - with self.engine.connect() as connection: - results = self.execute(connection, query, params) + + results = self.execute(query, params) for row in results: yield self.convert_row(row) diff --git a/listenstore/listenstore/tests/test_listenstore.py b/listenstore/listenstore/tests/test_listenstore.py deleted file mode 100644 index 2cc8bf8ff5..0000000000 --- a/listenstore/listenstore/tests/test_listenstore.py +++ /dev/null @@ -1,25 +0,0 @@ -# coding=utf-8 -from __future__ import division, absolute_import, print_function, unicode_literals -import unittest -from datetime import date -from .. import listenstore - - -class RangeTestCase(unittest.TestCase): - - def testDateRange(self): - self.assertEqual(listenstore.daterange(date(2015, 8, 12), 'day'), (2015, 8, 12)) - self.assertEqual(listenstore.daterange(date(2015, 8, 12), 'month'), (2015, 8)) - self.assertEqual(listenstore.daterange(date(2015, 8, 12), 'year'), (2015,)) - - def testDateRanges(self): - max_date = date(2015, 9, 6) - min_date = date(2014, 6, 1) - - expected = [(2015, 9), (2015, 8), (2015, 7), (2015, 6), (2015, 5), (2015, 4), - (2015, 3), (2015, 2), (2015, 1), (2014, 12), (2014, 11), (2014, 10), - (2014, 9), (2014, 8), (2014, 7), (2014, 6)] - - self.assertEqual(list(listenstore.dateranges(listenstore.date_to_id(min_date), - listenstore.date_to_id(max_date), 'month', 'asc')), - expected) diff --git a/listenstore/tests/test_listenstore.py b/listenstore/tests/test_listenstore.py index fbe4cc4860..f0e2b7d2a5 100644 --- a/listenstore/tests/test_listenstore.py +++ b/listenstore/tests/test_listenstore.py @@ -1,38 +1,32 @@ # coding=utf-8 from __future__ import division, absolute_import, print_function, unicode_literals -import unittest2 +from db.testing import DatabaseTestCase import logging -from datetime import date, datetime -from .util import generate_data -from listenstore.listenstore import ListenStore +from datetime import datetime +from .util import generate_data, to_epoch +from webserver.postgres_connection import init_postgres_connection -# TODO: update for postgres -class TestListenStore(unittest2.TestCase): +class TestListenStore(DatabaseTestCase): - @classmethod - @unittest2.skip("We don't have Cassandra on Jenkins server") - def setUpClass(self): + def setUp(self): + super(TestListenStore, self).setUp() self.log = logging.getLogger(__name__) - conf = {"replication_factor": 1, - "cassandra_keyspace": "listenbrainz_integration_test", - "cassandra_server": "localhost:9092"} - self.logstore = ListenStore(conf) + self.logstore = init_postgres_connection(self.config.TEST_SQLALCHEMY_DATABASE_URI) self._create_test_data() - @classmethod + def tearDown(self): + # self.logstore.drop_schema() + self.logstore = None + def _create_test_data(self): + date = datetime(2015, 9, 3, 0, 0, 0) self.log.info("Inserting test data...") - test_data = generate_data(datetime(2015, 9, 3, 0, 0, 0), 1000) - self.logstore.insert_batch(test_data) + test_data = generate_data(date, 100) + self.logstore.insert_postgresql(test_data) self.log.info("Test data inserted") - @classmethod - def tearDownClass(self): - #self.logstore.drop_schema() - self.logstore = None - - @unittest2.skip("We don't have Cassandra on Jenkins server") def test_fetch_listens(self): - listens = self.logstore.fetch_listens(uid="test", limit=10) - self.assertEqual(len(list(listens)), 10) + date = datetime(2015, 9, 3, 0, 0, 0) + listens = self.logstore.fetch_listens(user_id="test", from_id=to_epoch(date), limit=10) + self.assertEquals(len(list(listens)), 10) diff --git a/listenstore/tests/util.py b/listenstore/tests/util.py index 3bbf23f9db..4f02ca8fb5 100644 --- a/listenstore/tests/util.py +++ b/listenstore/tests/util.py @@ -1,19 +1,22 @@ # coding=utf-8 from __future__ import division, absolute_import, print_function, unicode_literals -from datetime import timedelta +from datetime import datetime, timedelta from listenstore.listen import Listen import uuid def generate_data(from_date, num_records): test_data = [] - - current_date = from_date - + current_date = to_epoch(from_date) artist_msid = str(uuid.uuid4()) - for i in range(1, num_records): - item = Listen(uid="test", timestamp=current_date, artist_msid=artist_msid, - track_msid=str(uuid.uuid4())) + + for i in range(num_records): + current_date += 1 # Add one second + item = Listen(user_id="test", timestamp=current_date, artist_msid=artist_msid, + recording_msid=str(uuid.uuid4())) test_data.append(item) - current_date += timedelta(seconds=1) return test_data + + +def to_epoch(date): + return (date - datetime.utcfromtimestamp(0)).total_seconds() diff --git a/manage.py b/manage.py index e339203b87..cf6fa6a46f 100644 --- a/manage.py +++ b/manage.py @@ -23,11 +23,6 @@ def runserver(host, port, debug): schedule_jobs(app) app.run(host=host, port=port, debug=debug) -@cli.command() -def init_kafka(archive, force): - """Initializes kafka""" - - print("Done!") @cli.command() @click.option("--force", "-f", is_flag=True, help="Drop existing database and user.") @@ -43,7 +38,7 @@ def init_db(force, skip_create): uri = urlsplit(create_app().config['SQLALCHEMY_DATABASE_URI']) if force: - exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + + exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + ' -h ' + uri.hostname + ' -p ' + str(uri.port) + ' < ' + os.path.join(ADMIN_SQL_DIR, 'drop_db.sql'), shell=True) @@ -89,29 +84,33 @@ def init_test_db(force=False): `PG_CONNECT_TEST` variable must be defined in the config file. """ + + uri = urlsplit(create_app().config['TEST_SQLALCHEMY_DATABASE_URI']) if force: - exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + ' < ' + + exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + + ' -h ' + uri.hostname + ' -p ' + str(uri.port) + ' < ' + os.path.join(ADMIN_SQL_DIR, 'drop_test_db.sql'), shell=True) if exit_code != 0: raise Exception('Failed to drop existing database and user! Exit code: %i' % exit_code) print('Creating database and user for testing...') - exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + ' < ' + + exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + + ' -h ' + uri.hostname + ' -p ' + str(uri.port) + ' < ' + os.path.join(ADMIN_SQL_DIR, 'create_test_db.sql'), shell=True) if exit_code != 0: raise Exception('Failed to create new database and user! Exit code: %i' % exit_code) - exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + ' -d ab_test < ' + + exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + ' -d lb_test ' + + ' -h ' + uri.hostname + ' -p ' + str(uri.port) + ' < ' + os.path.join(ADMIN_SQL_DIR, 'create_extensions.sql'), shell=True) if exit_code != 0: raise Exception('Failed to create database extensions! Exit code: %i' % exit_code) - db.init_db_connection(config.PG_CONNECT_TEST) + db.init_db_connection(config.TEST_SQLALCHEMY_DATABASE_URI) - db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_types.sql')) db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_tables.sql')) db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_primary_keys.sql')) db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_foreign_keys.sql')) @@ -139,7 +138,7 @@ def init_msb_db(force, skip_create): uri = urlsplit(create_app().config['MESSYBRAINZ_SQLALCHEMY_DATABASE_URI']) if force: - exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + + exit_code = subprocess.call('psql -U ' + config.PG_SUPER_USER + ' -h ' + uri.hostname + ' -p ' + str(uri.port) + ' < ' + os.path.join(MSB_ADMIN_SQL_DIR, 'drop_db.sql'), shell=True) diff --git a/webserver/__init__.py b/webserver/__init__.py index 8fc041536e..477fe7483c 100644 --- a/webserver/__init__.py +++ b/webserver/__init__.py @@ -33,7 +33,7 @@ def create_app(): # Database connection import db - db.init_db_connection(app) + db.init_db_connection(app.config['SQLALCHEMY_DATABASE_URI']) from webserver.external import messybrainz messybrainz.init_db_connection(app.config['MESSYBRAINZ_SQLALCHEMY_DATABASE_URI']) diff --git a/webserver/templates/index/contribute.html b/webserver/templates/index/contribute.html index d4326d176c..214bcf5c0b 100644 --- a/webserver/templates/index/contribute.html +++ b/webserver/templates/index/contribute.html @@ -23,7 +23,7 @@

Developers

ListenBrainz is in its infancy and we need a lot of help to implement more features and to debug the existing - features. If you feel like helping out and have experience with Python, Postgres, Kafka and/or Cassandra, + features. If you feel like helping out and have experience with Python, Postgres and Redis, we'd love some help.