Skip to content

Commit

Permalink
Merge remote-tracking branch 'ar/migrate-to-postgres'
Browse files Browse the repository at this point in the history
  • Loading branch information
mayhem committed Jun 21, 2016
2 parents bb20882 + cfa7491 commit 39f266b
Show file tree
Hide file tree
Showing 21 changed files with 394 additions and 104 deletions.
1 change: 1 addition & 0 deletions admin/sql/create_primary_keys.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
BEGIN;

ALTER TABLE "user" ADD CONSTRAINT user_pkey PRIMARY KEY (id);
ALTER TABLE listens ADD CONSTRAINT listens_pkey PRIMARY KEY (user_id, ts);

COMMIT;
10 changes: 10 additions & 0 deletions admin/sql/create_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,14 @@ CREATE TABLE "user" (
);
ALTER TABLE "user" ADD CONSTRAINT user_musicbrainz_id_key UNIQUE (musicbrainz_id);

CREATE TABLE listens (
id SERIAL,
user_id VARCHAR NOT NULL,
ts TIMESTAMP WITH TIME ZONE NOT NULL,
artist_msid UUID NOT NULL,
album_msid UUID,
recording_msid UUID NOT NULL,
raw_data JSONB
);

COMMIT;
10 changes: 10 additions & 0 deletions config.py.sample
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ SECRET_KEY = "CHANGE_ME"

# Primary database
SQLALCHEMY_DATABASE_URI = "postgresql://listenbrainz@/listenbrainz"
# Oldest listens which can be stored in the database, in days.
MAX_POSTGRES_LISTEN_HISTORY = "90"
# Log Postgres queries if they execeed this time, in milliseconds.
PG_QUERY_TIMEOUT = "3000"
# Disable synchronous_commit to OFF for Postgres. Default: False
PG_ASYNC_LISTEN_COMMIT = "False"

MESSYBRAINZ_SQLALCHEMY_DATABASE_URI = "postgresql://messybrainz@/messybrainz"

Expand All @@ -26,12 +32,16 @@ KAFKA_RUN_CLASS_BINARY = "/opt/kafka/bin/kafka-run-class.sh"
# CASSANDRA
CASSANDRA_SERVER = "localhost"
CASSANDRA_KEYSPACE = "listenbrainz"
CASSANDRA_REPLICATION_FACTOR = "1"


# MusicBrainz OAuth
MUSICBRAINZ_CLIENT_ID = "CLIENT_ID"
MUSICBRAINZ_CLIENT_SECRET = "CLIENT_SECRET"

# Last.fm
LASTFM_API_KEY = "USE_LASTFM_API_KEY"


# LOGGING

Expand Down
1 change: 1 addition & 0 deletions listenstore/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
build
listenbrainz_store.egg-info
dist
listenstore.conf
9 changes: 5 additions & 4 deletions listenstore/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@

## Development

$ PYTHONPATH=. bin/listenstore-test.py

## Pip, eggs, etc
Expand All @@ -13,12 +12,14 @@ then install the egg:

bin scripts get installed to `/usr/local/bin`.

## Configuration
Listenstore uses same configuration file as used for ListenBrainz. If the
location of the `config.py` is changed, the change them to appropriate values.

## Schema

Cassandra table called "listens" stores everything. It is keyed on `(uid,
idkey)`, where idkey is the first 3 numbers in the unixtimestamp of the
listen.
Postgres table called "listens" stores everything. It is keyed on `(user_id,
ts)`, where ts is the timestamp (with timezone) of the listen.

This effectively shards listens by a user over multiple row keys, since
very wide rows are inefficient.
Expand Down
10 changes: 9 additions & 1 deletion listenstore/bin/listenstore-kafka-to-cassandra-writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python
import logging
from listenstore.cli import Command
from listenstore.listenstore import CassandraListenStore



class KafkaToCassandra(Command):
Expand All @@ -11,7 +13,13 @@ def __init__(self):
self.log = logging.getLogger(__name__)

def run(self):
self.kafkaConsumer.start_listens(self.listenStore)
self.kafka_consumer.start_listens(self.listen_store)

@property
def listen_store(self):
self._listenStore = CassandraListenStore(self.config)
return self._listenStore



if __name__ == '__main__':
Expand Down
26 changes: 26 additions & 0 deletions listenstore/bin/listenstore-kafka-to-postgres-writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python
import logging
from listenstore.cli import Command
from listenstore.listenstore import PostgresListenStore



class KafkaToPostgres(Command):
desc = "Print listens fetched from kafka"

def __init__(self):
super(KafkaToPostgres, self).__init__()
self.log = logging.getLogger(__name__)

def run(self):
self.kafka_consumer.start_listens(self.listen_store)

@property
def listen_store(self):
self._listenStore = PostgresListenStore(self.config)
return self._listenStore



if __name__ == '__main__':
KafkaToPostgres().start()
4 changes: 2 additions & 2 deletions listenstore/bin/listenstore-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ def run(self):
self.log.info("ListenPrinter starting..")
#unixtime = int(calendar.timegm(time.gmtime()))
#item = {'user_id':'rj','listened_at':unixtime,'body':{'foo':'bar','now':time.strftime("%c")}}
#self.listenStore.insert(item)
#self.listen_store.insert(item)

res = self.listenStore.fetch_listens(uid='rj')
res = self.listen_store.fetch_listens(uid='rj')
for r in res:
print repr(r)

Expand Down
2 changes: 1 addition & 1 deletion listenstore/bin/listenstore-tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self):
def run(self):
uid = self.config['userid']
self.log.info("ListenTool starting for uid %s.." % (uid))
res = self.listenStore.fetch_listens(uid=uid, limit=self.config['limit'])
res = self.listen_store.fetch_listens(uid=uid, limit=self.config['limit'])
for r in res:
print repr(r)

Expand Down
15 changes: 8 additions & 7 deletions listenstore/listenstore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging

from .utils import argparse_factory, parse_args_and_config
from .listenstore import ListenStore
from .listenstore import PostgresListenStore, CassandraListenStore
from .kafkaconsumer import KafkaConsumer


Expand All @@ -24,7 +24,7 @@ def __init__(self):

# NB: only sets level after our Command starts running
def set_log_level(self):
l = self.config['loglevel']
l = self.config['LOGLEVEL']
lev = logging.INFO
if l == "DEBUG":
lev = logging.DEBUG
Expand Down Expand Up @@ -61,13 +61,14 @@ def run():
raise NotImplementedError()

@property
def listenStore(self):
if self._listenStore is None:
self._listenStore = ListenStore(self.config)
return self._listenStore
def listen_store(self):
""" Override this method in bin scripts to support writing
to both Casandra and Postgres
"""
pass

@property
def kafkaConsumer(self):
def kafka_consumer(self):
if self._kafkaConsumer is None:
self._kafkaConsumer = KafkaConsumer(self.config)
return self._kafkaConsumer
Expand Down
12 changes: 10 additions & 2 deletions listenstore/listenstore/kafkaconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .listen import Listen
from time import time, sleep
from cassandra.cluster import NoHostAvailable
from .utils import wrapper, time_it

KAFKA_READ_TIMEOUT = 5
CASSANDRA_BATCH_SIZE = 1000
Expand All @@ -12,10 +13,11 @@
class KafkaConsumer(object):
def __init__(self, conf):
self.log = logging.getLogger(__name__)
self.client = KafkaClient(conf["kafka_server"])
self.client = KafkaClient(conf["KAFKA_CONNECT"])
self.total_inserts = 0
self.inserts = 0
self.listenstore = None
self.conf = conf


def start_listens(self, listenstore):
Expand Down Expand Up @@ -51,7 +53,13 @@ def start(self, group_name, topic_name):
broken = True
while broken:
try:
self.listenstore.insert_batch(listens)
threshold = 10000 # Default threshold in ms
if 'PG_QUERY_TIMEOUT' in self.conf:
threshold = int(self.conf['PG_QUERY_TIMEOUT'])

time_it(wrapper(self.listenstore.insert_postgresql, listens), \
threshold, self.log, 'Threshold Postgres_Query_Time exceeded')

broken = False
except ValueError as e:
self.log.error("Cannot insert listens: %s" % unicode(e))
Expand Down

0 comments on commit 39f266b

Please sign in to comment.