Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup kafka bits & Fix tests setup #87

Merged
merged 10 commits into from
Jul 4, 2016
Merged
3 changes: 2 additions & 1 deletion config.py.sample
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ SQLALCHEMY_DATABASE_URI = "postgresql://listenbrainz:listenbrainz@db:5432/listen
MESSYBRAINZ_SQLALCHEMY_DATABASE_URI = "postgresql://messybrainz:messybrainz@db:5432/messybrainz"

# Database for testing
TEST_SQLALCHEMY_DATABASE_URI = "postgresql://lb_test@/lb_test"
TEST_SQLALCHEMY_DATABASE_URI = "postgresql://lb_test:lb_test@db:5432/lb_test"


# Other postgres configuration options
# Oldest listens which can be stored in the database, in days.
Expand Down
13 changes: 8 additions & 5 deletions db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool

# This value must be incremented after schema changes on replicated tables!
SCHEMA_VERSION = 1

db = SQLAlchemy()
engine = None


def init_db_connection(app):
def init_db_connection(connect_str):
"""Initializes database connection using the specified Flask app.

Configuration file must contain `SQLALCHEMY_DATABASE_URI` key. See
https://pythonhosted.org/Flask-SQLAlchemy/config.html#configuration-keys
for more info.
"""
db.init_app(app)
global engine
engine = create_engine(connect_str, poolclass=NullPool)


def run_sql_script(sql_file_path):
with open(sql_file_path) as sql:
db.session.connection().execute(sql.read())
with engine.connect() as connection:
connection.execute(sql.read())
Empty file added db/test/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions db/test/test_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
from db.testing import DatabaseTestCase
import db.user


class UserTestCase(DatabaseTestCase):

def test_create(self):
user_id = db.user.create("izzy_cheezy")
self.assertIsNotNone(db.user.get(user_id))
43 changes: 43 additions & 0 deletions db/testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import db
import unittest
import json
import os

# Configuration
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
import config

ADMIN_SQL_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'admin', 'sql')
TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_data')


class DatabaseTestCase(unittest.TestCase):

def setUp(self):
self.config = config
db.init_db_connection(config.TEST_SQLALCHEMY_DATABASE_URI)
self.reset_db()

def tearDown(self):
pass

def reset_db(self):
self.drop_tables()
self.init_db()

def init_db(self):
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_tables.sql'))
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_primary_keys.sql'))
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_foreign_keys.sql'))
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_indexes.sql'))

def drop_tables(self):
with db.engine.connect() as connection:
connection.execute('DROP TABLE IF EXISTS "user" CASCADE')
connection.execute('DROP TABLE IF EXISTS listen CASCADE')

def load_data_files(self):
""" Get the data files from the disk """
# return os.path.join(TEST_DATA_PATH, file_name)
return
55 changes: 29 additions & 26 deletions db/user.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,47 @@
from db import db
import db
import uuid

import sqlalchemy

def create(musicbrainz_id):
result = db.session.execute("""INSERT INTO "user" (musicbrainz_id, auth_token)
VALUES (:mb_id, :token)
RETURNING id""",
{"mb_id": musicbrainz_id, "token": str(uuid.uuid4())})
db.session.commit()
return result.fetchone()["id"]
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""INSERT INTO "user" (musicbrainz_id, auth_token)
VALUES (:mb_id, :token)
RETURNING id""" ),
{"mb_id": musicbrainz_id, "token": str(uuid.uuid4())})
return result.fetchone()["id"]


def get(id):
"""Get user with a specified ID (integer)."""
result = db.session.execute("""SELECT id, created, musicbrainz_id, auth_token
FROM "user"
WHERE id = :id""",
{"id": id})
row = result.fetchone()
return dict(row) if row else None
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id, auth_token
FROM "user"
WHERE id = :id"""),
{"id": id})
row = result.fetchone()
return dict(row) if row else None


def get_by_mb_id(musicbrainz_id):
"""Get user with a specified MusicBrainz ID."""
result = db.session.execute("""SELECT id, created, musicbrainz_id, auth_token
FROM "user"
WHERE LOWER(musicbrainz_id) = LOWER(:mb_id)""",
{"mb_id": musicbrainz_id})
row = result.fetchone()
return dict(row) if row else None
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id, auth_token
FROM "user"
WHERE LOWER(musicbrainz_id) = LOWER(:mb_id)"""),
{"mb_id": musicbrainz_id})
row = result.fetchone()
return dict(row) if row else None


def get_by_token(token):
"""Get user from an auth token"""
result = db.session.execute("""SELECT id, created, musicbrainz_id
FROM "user"
WHERE auth_token = :auth_token""",
{"auth_token": token})
row = result.fetchone()
return dict(row) if row else None
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id
FROM "user"
WHERE auth_token = :auth_token"""),
{"auth_token": token})
row = result.fetchone()
return dict(row) if row else None


def get_or_create(musicbrainz_id):
Expand Down
9 changes: 1 addition & 8 deletions listenstore/listenstore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from .utils import argparse_factory, parse_args_and_config
from .listenstore import PostgresListenStore
from .kafkaconsumer import KafkaConsumer


class Command(object):
Expand All @@ -19,7 +18,6 @@ def __init__(self):
You have no logger, databases, or config in __init__. """
self.opt_parser = argparse_factory(self.desc)
self._listenStore = None
self._kafkaConsumer = None


# NB: only sets level after our Command starts running
Expand Down Expand Up @@ -53,6 +51,7 @@ def start(self):
finally:
self.cleanup()


def cleanup(self):
return

Expand All @@ -67,11 +66,5 @@ def listen_store(self):
"""
pass

@property
def kafka_consumer(self):
if self._kafkaConsumer is None:
self._kafkaConsumer = KafkaConsumer(self.config)
return self._kafkaConsumer

def renice(self, increment):
os.nice(increment)
19 changes: 10 additions & 9 deletions listenstore/listenstore/listenstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,23 @@ def insert_postgresql(self, listens):
except Exception, e: # Log errors
self.log.error(e)

def execute(self, connection, query, params=None):
res = connection.execute(query, params)
return res.fetchall()
def execute(self, query, params={}):
with self.engine.connect() as connection:
res = connection.execute(query, params)
return res.fetchall()

def fetch_listens_from_storage(self, user_id, from_id, to_id, limit, order, precision):
query = """ SELECT id, user_id, extract(epoch from ts), artist_msid, album_msid, recording_msid, raw_data """ + \
""" FROM listen WHERE user_id = %(user_id)s """ + \
""" AND extract(epoch from ts) > %(from_id)s AND extract(epoch from ts) < %(to_id)s """ + \
""" ORDER BY extract(epoch from ts) """ + ORDER_TEXT[order] + """ LIMIT %(limit)s"""
params = {
'user_id' : user_id,
'from_id' : from_id,
'to_id' : to_id,
'limit' : limit
'user_id': user_id,
'from_id': from_id,
'to_id': to_id,
'limit': limit
}
with self.engine.connect() as connection:
results = self.execute(connection, query, params)

results = self.execute(query, params)
for row in results:
yield self.convert_row(row)
25 changes: 0 additions & 25 deletions listenstore/listenstore/tests/test_listenstore.py

This file was deleted.

42 changes: 18 additions & 24 deletions listenstore/tests/test_listenstore.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,32 @@
# coding=utf-8
from __future__ import division, absolute_import, print_function, unicode_literals
import unittest2
from db.testing import DatabaseTestCase
import logging
from datetime import date, datetime
from .util import generate_data
from listenstore.listenstore import ListenStore
from datetime import datetime
from .util import generate_data, to_epoch
from webserver.postgres_connection import init_postgres_connection

# TODO: update for postgres

class TestListenStore(unittest2.TestCase):
class TestListenStore(DatabaseTestCase):

@classmethod
@unittest2.skip("We don't have Cassandra on Jenkins server")
def setUpClass(self):
def setUp(self):
super(TestListenStore, self).setUp()
self.log = logging.getLogger(__name__)
conf = {"replication_factor": 1,
"cassandra_keyspace": "listenbrainz_integration_test",
"cassandra_server": "localhost:9092"}
self.logstore = ListenStore(conf)
self.logstore = init_postgres_connection(self.config.TEST_SQLALCHEMY_DATABASE_URI)
self._create_test_data()

@classmethod
def tearDown(self):
# self.logstore.drop_schema()
self.logstore = None

def _create_test_data(self):
date = datetime(2015, 9, 3, 0, 0, 0)
self.log.info("Inserting test data...")
test_data = generate_data(datetime(2015, 9, 3, 0, 0, 0), 1000)
self.logstore.insert_batch(test_data)
test_data = generate_data(date, 100)
self.logstore.insert_postgresql(test_data)
self.log.info("Test data inserted")

@classmethod
def tearDownClass(self):
#self.logstore.drop_schema()
self.logstore = None

@unittest2.skip("We don't have Cassandra on Jenkins server")
def test_fetch_listens(self):
listens = self.logstore.fetch_listens(uid="test", limit=10)
self.assertEqual(len(list(listens)), 10)
date = datetime(2015, 9, 3, 0, 0, 0)
listens = self.logstore.fetch_listens(user_id="test", from_id=to_epoch(date), limit=10)
self.assertEquals(len(list(listens)), 10)
19 changes: 11 additions & 8 deletions listenstore/tests/util.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
# coding=utf-8
from __future__ import division, absolute_import, print_function, unicode_literals
from datetime import timedelta
from datetime import datetime, timedelta
from listenstore.listen import Listen
import uuid


def generate_data(from_date, num_records):
test_data = []

current_date = from_date

current_date = to_epoch(from_date)
artist_msid = str(uuid.uuid4())
for i in range(1, num_records):
item = Listen(uid="test", timestamp=current_date, artist_msid=artist_msid,
track_msid=str(uuid.uuid4()))

for i in range(num_records):
current_date += 1 # Add one second
item = Listen(user_id="test", timestamp=current_date, artist_msid=artist_msid,
recording_msid=str(uuid.uuid4()))
test_data.append(item)
current_date += timedelta(seconds=1)
return test_data


def to_epoch(date):
return (date - datetime.utcfromtimestamp(0)).total_seconds()
Loading