Skip to content

Commit

Permalink
Merge pull request #87 from pinkeshbadjatiya/fix_tests_cleanup
Browse files Browse the repository at this point in the history
Cleanup kafka bits & Fix tests setup
  • Loading branch information
mayhem committed Jul 4, 2016
2 parents ac7a72b + 00f4914 commit aac58f7
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 120 deletions.
3 changes: 2 additions & 1 deletion config.py.sample
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ SQLALCHEMY_DATABASE_URI = "postgresql://listenbrainz:listenbrainz@db:5432/listen
MESSYBRAINZ_SQLALCHEMY_DATABASE_URI = "postgresql://messybrainz:messybrainz@db:5432/messybrainz"

# Database for testing
TEST_SQLALCHEMY_DATABASE_URI = "postgresql://lb_test@/lb_test"
TEST_SQLALCHEMY_DATABASE_URI = "postgresql://lb_test:lb_test@db:5432/lb_test"


# Other postgres configuration options
# Oldest listens which can be stored in the database, in days.
Expand Down
13 changes: 8 additions & 5 deletions db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool

# This value must be incremented after schema changes on replicated tables!
SCHEMA_VERSION = 1

db = SQLAlchemy()
engine = None


def init_db_connection(app):
def init_db_connection(connect_str):
"""Initializes database connection using the specified Flask app.
Configuration file must contain `SQLALCHEMY_DATABASE_URI` key. See
https://pythonhosted.org/Flask-SQLAlchemy/config.html#configuration-keys
for more info.
"""
db.init_app(app)
global engine
engine = create_engine(connect_str, poolclass=NullPool)


def run_sql_script(sql_file_path):
with open(sql_file_path) as sql:
db.session.connection().execute(sql.read())
with engine.connect() as connection:
connection.execute(sql.read())
Empty file added db/test/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions db/test/test_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
from db.testing import DatabaseTestCase
import db.user


class UserTestCase(DatabaseTestCase):

def test_create(self):
user_id = db.user.create("izzy_cheezy")
self.assertIsNotNone(db.user.get(user_id))
43 changes: 43 additions & 0 deletions db/testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import db
import unittest
import json
import os

# Configuration
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
import config

ADMIN_SQL_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'admin', 'sql')
TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_data')


class DatabaseTestCase(unittest.TestCase):

def setUp(self):
self.config = config
db.init_db_connection(config.TEST_SQLALCHEMY_DATABASE_URI)
self.reset_db()

def tearDown(self):
pass

def reset_db(self):
self.drop_tables()
self.init_db()

def init_db(self):
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_tables.sql'))
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_primary_keys.sql'))
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_foreign_keys.sql'))
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_indexes.sql'))

def drop_tables(self):
with db.engine.connect() as connection:
connection.execute('DROP TABLE IF EXISTS "user" CASCADE')
connection.execute('DROP TABLE IF EXISTS listen CASCADE')

def load_data_files(self):
""" Get the data files from the disk """
# return os.path.join(TEST_DATA_PATH, file_name)
return
55 changes: 29 additions & 26 deletions db/user.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,47 @@
from db import db
import db
import uuid

import sqlalchemy

def create(musicbrainz_id):
result = db.session.execute("""INSERT INTO "user" (musicbrainz_id, auth_token)
VALUES (:mb_id, :token)
RETURNING id""",
{"mb_id": musicbrainz_id, "token": str(uuid.uuid4())})
db.session.commit()
return result.fetchone()["id"]
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""INSERT INTO "user" (musicbrainz_id, auth_token)
VALUES (:mb_id, :token)
RETURNING id""" ),
{"mb_id": musicbrainz_id, "token": str(uuid.uuid4())})
return result.fetchone()["id"]


def get(id):
"""Get user with a specified ID (integer)."""
result = db.session.execute("""SELECT id, created, musicbrainz_id, auth_token
FROM "user"
WHERE id = :id""",
{"id": id})
row = result.fetchone()
return dict(row) if row else None
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id, auth_token
FROM "user"
WHERE id = :id"""),
{"id": id})
row = result.fetchone()
return dict(row) if row else None


def get_by_mb_id(musicbrainz_id):
"""Get user with a specified MusicBrainz ID."""
result = db.session.execute("""SELECT id, created, musicbrainz_id, auth_token
FROM "user"
WHERE LOWER(musicbrainz_id) = LOWER(:mb_id)""",
{"mb_id": musicbrainz_id})
row = result.fetchone()
return dict(row) if row else None
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id, auth_token
FROM "user"
WHERE LOWER(musicbrainz_id) = LOWER(:mb_id)"""),
{"mb_id": musicbrainz_id})
row = result.fetchone()
return dict(row) if row else None


def get_by_token(token):
"""Get user from an auth token"""
result = db.session.execute("""SELECT id, created, musicbrainz_id
FROM "user"
WHERE auth_token = :auth_token""",
{"auth_token": token})
row = result.fetchone()
return dict(row) if row else None
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""SELECT id, created, musicbrainz_id
FROM "user"
WHERE auth_token = :auth_token"""),
{"auth_token": token})
row = result.fetchone()
return dict(row) if row else None


def get_or_create(musicbrainz_id):
Expand Down
9 changes: 1 addition & 8 deletions listenstore/listenstore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from .utils import argparse_factory, parse_args_and_config
from .listenstore import PostgresListenStore
from .kafkaconsumer import KafkaConsumer


class Command(object):
Expand All @@ -19,7 +18,6 @@ def __init__(self):
You have no logger, databases, or config in __init__. """
self.opt_parser = argparse_factory(self.desc)
self._listenStore = None
self._kafkaConsumer = None


# NB: only sets level after our Command starts running
Expand Down Expand Up @@ -53,6 +51,7 @@ def start(self):
finally:
self.cleanup()


def cleanup(self):
return

Expand All @@ -67,11 +66,5 @@ def listen_store(self):
"""
pass

@property
def kafka_consumer(self):
if self._kafkaConsumer is None:
self._kafkaConsumer = KafkaConsumer(self.config)
return self._kafkaConsumer

def renice(self, increment):
os.nice(increment)
19 changes: 10 additions & 9 deletions listenstore/listenstore/listenstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,23 @@ def insert_postgresql(self, listens):
except Exception, e: # Log errors
self.log.error(e)

def execute(self, connection, query, params=None):
res = connection.execute(query, params)
return res.fetchall()
def execute(self, query, params={}):
with self.engine.connect() as connection:
res = connection.execute(query, params)
return res.fetchall()

def fetch_listens_from_storage(self, user_id, from_id, to_id, limit, order, precision):
query = """ SELECT id, user_id, extract(epoch from ts), artist_msid, album_msid, recording_msid, raw_data """ + \
""" FROM listen WHERE user_id = %(user_id)s """ + \
""" AND extract(epoch from ts) > %(from_id)s AND extract(epoch from ts) < %(to_id)s """ + \
""" ORDER BY extract(epoch from ts) """ + ORDER_TEXT[order] + """ LIMIT %(limit)s"""
params = {
'user_id' : user_id,
'from_id' : from_id,
'to_id' : to_id,
'limit' : limit
'user_id': user_id,
'from_id': from_id,
'to_id': to_id,
'limit': limit
}
with self.engine.connect() as connection:
results = self.execute(connection, query, params)

results = self.execute(query, params)
for row in results:
yield self.convert_row(row)
25 changes: 0 additions & 25 deletions listenstore/listenstore/tests/test_listenstore.py

This file was deleted.

42 changes: 18 additions & 24 deletions listenstore/tests/test_listenstore.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,32 @@
# coding=utf-8
from __future__ import division, absolute_import, print_function, unicode_literals
import unittest2
from db.testing import DatabaseTestCase
import logging
from datetime import date, datetime
from .util import generate_data
from listenstore.listenstore import ListenStore
from datetime import datetime
from .util import generate_data, to_epoch
from webserver.postgres_connection import init_postgres_connection

# TODO: update for postgres

class TestListenStore(unittest2.TestCase):
class TestListenStore(DatabaseTestCase):

@classmethod
@unittest2.skip("We don't have Cassandra on Jenkins server")
def setUpClass(self):
def setUp(self):
super(TestListenStore, self).setUp()
self.log = logging.getLogger(__name__)
conf = {"replication_factor": 1,
"cassandra_keyspace": "listenbrainz_integration_test",
"cassandra_server": "localhost:9092"}
self.logstore = ListenStore(conf)
self.logstore = init_postgres_connection(self.config.TEST_SQLALCHEMY_DATABASE_URI)
self._create_test_data()

@classmethod
def tearDown(self):
# self.logstore.drop_schema()
self.logstore = None

def _create_test_data(self):
date = datetime(2015, 9, 3, 0, 0, 0)
self.log.info("Inserting test data...")
test_data = generate_data(datetime(2015, 9, 3, 0, 0, 0), 1000)
self.logstore.insert_batch(test_data)
test_data = generate_data(date, 100)
self.logstore.insert_postgresql(test_data)
self.log.info("Test data inserted")

@classmethod
def tearDownClass(self):
#self.logstore.drop_schema()
self.logstore = None

@unittest2.skip("We don't have Cassandra on Jenkins server")
def test_fetch_listens(self):
listens = self.logstore.fetch_listens(uid="test", limit=10)
self.assertEqual(len(list(listens)), 10)
date = datetime(2015, 9, 3, 0, 0, 0)
listens = self.logstore.fetch_listens(user_id="test", from_id=to_epoch(date), limit=10)
self.assertEquals(len(list(listens)), 10)
19 changes: 11 additions & 8 deletions listenstore/tests/util.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
# coding=utf-8
from __future__ import division, absolute_import, print_function, unicode_literals
from datetime import timedelta
from datetime import datetime, timedelta
from listenstore.listen import Listen
import uuid


def generate_data(from_date, num_records):
test_data = []

current_date = from_date

current_date = to_epoch(from_date)
artist_msid = str(uuid.uuid4())
for i in range(1, num_records):
item = Listen(uid="test", timestamp=current_date, artist_msid=artist_msid,
track_msid=str(uuid.uuid4()))

for i in range(num_records):
current_date += 1 # Add one second
item = Listen(user_id="test", timestamp=current_date, artist_msid=artist_msid,
recording_msid=str(uuid.uuid4()))
test_data.append(item)
current_date += timedelta(seconds=1)
return test_data


def to_epoch(date):
return (date - datetime.utcfromtimestamp(0)).total_seconds()

0 comments on commit aac58f7

Please sign in to comment.