Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 159 lines (140 sloc) 5.84 KB
#!/usr/bin/env python3
# Processes updates to postgres Full Text Search for new/edited messages.
#
# Zulip manages its postgres full-text search as follows. When the
# content of a message is modified, a postgres trigger logs the
# message ID to the `fts_update_log` table. In the background, this
# program processes `fts_update_log`, updating the postgres full-text
# search column search_tsvector in the main zerver_message.
import sys
# We want to use a virtualenv in production, which will be in /home/zulip/deployments/current.
# So we should add that path to sys.path and then import scripts.lib.setup_path_on_import.
# But this file is also used in development, where the above path will not exist.
# So `import scripts.lib.setup_path_on_import` will raise an ImportError.
# In development, we just want to skip this step since we know that virtualenv will already be in use.
# So catch the ImportError and do nothing.
sys.path.append('/home/zulip/deployments/current')
try:
import scripts.lib.setup_path_on_import
except ImportError:
pass
import psycopg2
import psycopg2.extensions
import select
import time
import logging
import configparser
import sys
import os
BATCH_SIZE = 1000
def update_fts_columns(cursor):
# type: (psycopg2.extensions.cursor) -> int
cursor.execute("SELECT id, message_id FROM fts_update_log LIMIT %s;" % (
BATCH_SIZE,))
ids = []
for (id, message_id) in cursor.fetchall():
if USING_PGROONGA:
cursor.execute("UPDATE zerver_message SET "
"search_pgroonga = "
"escape_html(subject) || ' ' || rendered_content "
"WHERE id = %s", (message_id,))
cursor.execute("UPDATE zerver_message SET "
"search_tsvector = to_tsvector('zulip.english_us_search', "
"subject || rendered_content) "
"WHERE id = %s", (message_id,))
ids.append(id)
cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
return len(ids)
def am_master(cursor):
# type: (psycopg2.extensions.cursor) -> bool
cursor.execute("SELECT pg_is_in_recovery()")
return not cursor.fetchall()[0][0]
logging.Formatter.converter = time.gmtime
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.getLogger("process_fts_updates")
logger.setLevel(logging.DEBUG)
logger.info("process_fts_updates starting")
pg_args = {}
# Path to the root of the Zulip codebase in production
sys.path.insert(0, '/home/zulip/deployments/current')
# Path to the root of the Zulip codebase in development
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../..")))
try:
os.environ['DJANGO_SETTINGS_MODULE'] = 'zproject.settings'
from django.conf import settings
if settings.REMOTE_POSTGRES_HOST != '':
pg_args['host'] = settings.REMOTE_POSTGRES_HOST
if settings.REMOTE_POSTGRES_PORT != '':
pg_args['port'] = settings.REMOTE_POSTGRES_PORT
USING_PGROONGA = settings.USING_PGROONGA
except ImportError:
# process_fts_updates also supports running locally on a remote
# postgres server; in that case, one can just connect to localhost
USING_PGROONGA = False
# Since we don't want a hard dependency on being able to access the
# Zulip settings (as we may not be running on a server that has that
# data), we determine whether we're using pgroonga using
# /etc/zulip/zulip.conf.
#
# However, we still also check the `USING_PGROONGA` variable, since
# that's all we have in development.
config_file = configparser.RawConfigParser()
config_file.read("/etc/zulip/zulip.conf")
if config_file.has_option('machine', 'pgroonga'):
USING_PGROONGA = True
if 'host' in pg_args:
pg_args['password'] = ''
if settings.DATABASES['default']['PASSWORD'] is not None:
pg_args['password'] = settings.DATABASES['default']['PASSWORD']
pg_args['user'] = settings.DATABASES['default']['USER']
pg_args['dbname'] = settings.DATABASES['default']['NAME']
if settings.REMOTE_POSTGRES_SSLMODE != '':
pg_args['sslmode'] = settings.REMOTE_POSTGRES_SSLMODE
else:
pg_args['sslmode'] = 'verify-full'
pg_args['connect_timeout'] = '600'
else:
pg_args['user'] = 'zulip'
conn = None
retries = 1
while True:
try:
if conn is None:
conn = psycopg2.connect(**pg_args)
cursor = conn.cursor()
retries = 30
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
first_check = True
while not am_master(cursor):
if first_check:
first_check = False
logger.info("In recovery; sleeping")
time.sleep(5)
logger.info("Not in recovery; listening for FTS updates")
cursor.execute("LISTEN fts_update_log;")
# Catch up on any historical columns
while True:
rows_updated = update_fts_columns(cursor)
logger.info("Processed %s rows catching up" % (rows_updated,))
if rows_updated != BATCH_SIZE:
# We're caught up, so proceed to the listening for updates phase.
break
# TODO: If we go back into recovery, we should stop processing updates
if select.select([conn], [], [], 30) != ([], [], []):
conn.poll()
while conn.notifies:
conn.notifies.pop()
update_fts_columns(cursor)
except psycopg2.OperationalError as e:
retries -= 1
if retries <= 0:
raise
logger.info(e)
logger.info("Sleeping and reconnecting")
time.sleep(5)
if conn is not None:
conn.close()
conn = None
except KeyboardInterrupt:
print(sys.argv[0], "exited after receiving KeyboardInterrupt")
break
You can’t perform that action at this time.