Skip to content

Commit

Permalink
faster events (#81)
Browse files Browse the repository at this point in the history
* faster events

* adding _patch_copy_events

* upgrade es

* fix timeout and user event display

* improve patch script

* fixing patch scripts

* upgrade es in travis
  • Loading branch information
samuelcolvin committed Oct 3, 2017
1 parent d344fde commit 24bc06d
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 69 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ python:
- '3.6'

install:
- curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.0.deb
- sudo dpkg -i --force-confnew elasticsearch-5.4.0.deb
- curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.2.deb
- sudo dpkg -i --force-confnew elasticsearch-5.6.2.deb
- "echo 'path.repo: [\"/tmp\"]' | sudo tee --append /etc/elasticsearch/elasticsearch.yml"
- sudo service elasticsearch start
- make install
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,13 @@ Set up your environment. If you have ElasticSearch installed and running you're
then

make

### to monitor

backup in progress

docker run -it --rm --network morpheus_default python:3.6 curl elastic:9200/_cat/recovery?v

indices

docker run -it --rm --network morpheus_default python:3.6 curl elastic:9200/_cat/indices/?v
2 changes: 1 addition & 1 deletion elasticsearch/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM docker.elastic.co/elasticsearch/elasticsearch:5.4.2
FROM docker.elastic.co/elasticsearch/elasticsearch:5.6.2
RUN /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3
90 changes: 87 additions & 3 deletions morpheus/app/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from datetime import datetime
from time import time

from arq import create_pool_lenient

from .settings import Settings
from .utils import THIS_DIR, ApiSession
from .utils import THIS_DIR, ApiError, ApiSession

main_logger = logging.getLogger('morpheus.elastic')

Expand Down Expand Up @@ -94,7 +96,8 @@ async def create_snapshot(self):
main_logger.info('creating elastic search snapshot...')
r = await self.put(
f'/_snapshot/{self.settings.snapshot_repo_name}/'
f'snapshot-{datetime.now():%Y-%m-%d_%H-%M-%S}?wait_for_completion=true'
f'snapshot-{datetime.now():%Y-%m-%d_%H-%M-%S}?wait_for_completion=true',
timeout_=1000,
)
main_logger.info('snapshot created: %s', json.dumps(await r.json(), indent=2))

Expand All @@ -110,7 +113,7 @@ async def restore_snapshot(self, snapshot_name):
start = time()
r = await self.post(
f'/_snapshot/{self.settings.snapshot_repo_name}/{snapshot_name}/_restore?wait_for_completion=true',
timeout=None,
timeout_=None,
)
main_logger.info(json.dumps(await r.json(), indent=2))

Expand All @@ -133,6 +136,81 @@ async def _patch_update_mappings(self):
main_logger.info('%d types updated for %s, re-opening index', len(types), index_name)
await self.post(f'{index_name}/_open')

async def _patch_copy_events(self):
r = await self.get(f'messages/_mapping')
all_mappings = await r.json()
redis_pool = await create_pool_lenient(self.settings.redis_settings, loop=None)
async with redis_pool.get() as redis:
for t in all_mappings['messages']['mappings']:
r = await self.get(f'/messages/{t}/_search?scroll=2m', sort=['_doc'], query={'match_all': {}},
size=1000)
data = await r.json()
scroll_id = data['_scroll_id']
main_logger.info(f'messages/{t} {data["hits"]["total"]} messages to move events for')
added, skipped = 0, 0
set_key = f'event-set-{t}'
events = set(await redis.smembers(set_key, encoding='utf8'))
for i in range(int(1e9)):
rows = set()
for hit in data['hits']['hits']:
msg_id = hit['_id']
for event in hit['_source'].get('events', []):
row = json.dumps({'message': msg_id, **event}, sort_keys=True)
if row in events:
skipped += 1
else:
added += 1
rows.add(row)
r = await self.get('_search/scroll', scroll='2m', scroll_id=scroll_id, timeout_=10)
data = await r.json()
main_logger.info(' %d: %d events added, %d skipped, adding %d rows', i, added, skipped, len(rows))
if rows:
await asyncio.gather(
redis.sadd(set_key, *rows),
redis.setex(set_key, 86400),
)
if not data['hits']['hits']:
break
main_logger.info(f'messages/{t} {added} events added, {skipped} skipped')

redis_pool.close()
await redis_pool.wait_closed()
await redis_pool.clear()

async def _patch_create_events(self):
r = await self.get(f'messages/_mapping')
all_mappings = await r.json()
redis_pool = await create_pool_lenient(self.settings.redis_settings, loop=None)
async with redis_pool.get() as redis:
async def bulk_insert(rows):
post_data = '\n'.join(rows)
start = time()
async with self.session.post(self.root + '_bulk', data=post_data, timeout=3600,
headers={'Content-Type': 'application/x-ndjson'}) as r:
if r.status != 200:
raise ApiError('post', '_bulk', rows, r, await r.text())
return time() - start

for t in all_mappings['messages']['mappings']:
set_key = f'event-set-{t}'
for i in range(int(1e9)):
insert_rows = []
rows = await redis.srandmember(set_key, count=200, encoding='utf8')
if not rows:
break
for r in rows:
insert_rows.append(json.dumps({'index': {'_index': 'events', '_type': t}}))
insert_rows.append(r)
dur = await bulk_insert(insert_rows)
await redis.srem(set_key, *rows)
if i % 50 == 0:
remaining = await redis.scard(set_key)
main_logger.info('events/%s inserted %d rows in %0.2fs, remaining %d',
t, len(insert_rows), dur, remaining)
redis_pool.close()
await redis_pool.wait_closed()
await redis_pool.clear()


KEYWORD = {'type': 'keyword'}
DATE = {'type': 'date'}
Expand Down Expand Up @@ -176,5 +254,11 @@ async def _patch_update_mappings(self):
'send_method': KEYWORD,
'send_message_id': KEYWORD,
'expires_ts': DATE,
},
'events': {
'message': KEYWORD,
'ts': DATE,
'status': KEYWORD,
'extra': DYNAMIC,
}
}
3 changes: 2 additions & 1 deletion morpheus/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ async def put(self, uri, *, allowed_statuses=(200, 201), **data):
async def _request(self, method, uri, allowed_statuses=(200, 201), **data) -> Response:
method, url, data = self._modify_request(method, self.root + str(uri).lstrip('/'), data)
headers = data.pop('headers_', {})
async with self.session.request(method, url, json=data, headers=headers) as r:
timeout = data.pop('timeout_', 300)
async with self.session.request(method, url, json=data, headers=headers, timeout=timeout) as r:
# always read entire response before closing the connection
response_text = await r.text()

Expand Down
45 changes: 32 additions & 13 deletions morpheus/app/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,25 +283,45 @@ async def query(self, *, message_id=None, tags=None, query=None):
{'send_ts': 'desc'},
]

return await self.app['es'].get(
r = await self.app['es'].get(
f'messages/{self.request.match_info["method"]}/_search?filter_path=hits', **es_query
)
assert r.status == 200, r.status
return await r.json()

@staticmethod
def _event_data(e):
data = e['_source']
data.pop('message')
return data

async def insert_events(self, data):
t = self.request.match_info['method']
for hit in data['hits']['hits']:
r = await self.app['es'].get(
f'events/{t}/_search?filter_path=hits',
query={
'term': {'message': hit['_id']}
},
size=100,
)
assert r.status == 200, r.status
event_data = await r.json()
hit['_source']['events'] = [self._event_data(e) for e in event_data['hits']['hits']]


class UserMessagesJsonView(_UserMessagesView):
async def call(self, request):
r = await self.query(
data = await self.query(
message_id=request.query.get('message_id'),
tags=request.query.getall('tags', None),
query=request.query.get('q')
)
if 'sms' in request.match_info['method'] and self.session.company != '__all__':
body = await r.json()
body['spend'] = await self.sender.check_sms_limit(self.session.company)
body = json.dumps(body)
else:
body = await r.text()
return Response(body=body, content_type='application/json')
data['spend'] = await self.sender.check_sms_limit(self.session.company)

await self.insert_events(data)
return self.json_response(**data)


class UserMessageDetailView(TemplateView, _UserMessagesView):
Expand All @@ -310,8 +330,8 @@ class UserMessageDetailView(TemplateView, _UserMessagesView):

async def call(self, request):
msg_id = self.request.match_info['id']
r = await self.query(message_id=msg_id)
data = await r.json()
data = await self.query(message_id=msg_id)
await self.insert_events(data)
if len(data['hits']['hits']) == 0:
raise HTTPNotFound(text='message not found')
data = data['hits']['hits'][0]
Expand Down Expand Up @@ -378,11 +398,10 @@ class UserMessageListView(TemplateView, _UserMessagesView):
template = 'user/list.jinja'

async def call(self, request):
r = await self.query(
data = await self.query(
tags=request.query.getall('tags', None),
query=request.query.get('q', None)
)
data = await r.json()
total_sms_spend = None
if 'sms' in request.match_info['method'] and self.session.company != '__all__':
total_sms_spend = '{:,.3f}'.format(await self.sender.check_sms_limit(self.session.company))
Expand Down Expand Up @@ -769,7 +788,7 @@ async def call(self, request):
'age': {
'avg': {
'script': {
'inline': 'doc.update_ts.value - doc.send_ts.value'
'source': 'doc.update_ts.value - doc.send_ts.value'
}
},
},
Expand Down
50 changes: 22 additions & 28 deletions morpheus/app/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import base64
import hashlib
import json
import logging
import re
Expand Down Expand Up @@ -317,7 +318,6 @@ async def _store_email(self, uid, send_ts, j: EmailJob, email_info: EmailInfo):
subject=email_info.subject,
body=email_info.html_body,
attachments=[f'{a["id"] or ""}::{a["name"]}' for a in j.pdf_attachments],
events=[]
)
for url, token in email_info.shortened_link:
await self.es.post(
Expand Down Expand Up @@ -588,7 +588,6 @@ async def _store_sms(self, uid, send_ts, j: SmsJob, sms_data: SmsData, cost: flo
body=sms_data.message,
cost=cost,
extra=sms_data.length._asdict(),
events=[],
)
for url, token in sms_data.shortened_link:
await self.es.post(
Expand Down Expand Up @@ -656,51 +655,46 @@ async def store_click(self, *, target, ip, ts, user_agent, send_method, send_mes
await self.update_message_status(send_method, m)

async def update_message_status(self, es_type, m: BaseWebhook, log_each=True):
h = hashlib.md5(f'{to_unix_ms(m.ts)}-{m.status}-{json.dumps(m.extra(), sort_keys=True)}'.encode())
ref = f'event-{h.hexdigest()}'
async with await self.get_redis_conn() as redis:
v = await redis.incr(ref)
if v > 1:
main_logger.info('event already exists %s, ts: %s, status: %s. skipped', m.message_id, m.ts, m.status)
return
await redis.expire(ref, 7200)

r = await self.es.get(f'messages/{es_type}/{m.message_id}', allowed_statuses=(200, 404))
if r.status == 404:
return
data = await r.json()

events = data['_source']['events']
if events:
last_event = data['_source']['events'][-1]
if (to_unix_ms(m.ts) == last_event['ts'] and
m.status == last_event['status'] and
json.dumps(m.extra(), sort_keys=True) == json.dumps(last_event['extra'], sort_keys=True)):
main_logger.info('event already exists %s, ts: %s, status: %s. skipped', m.message_id, m.ts, m.status)
return

old_update_ts = from_unix_ms(data['_source']['update_ts'])
if m.ts.tzinfo:
old_update_ts = old_update_ts.replace(tzinfo=timezone.utc)

update_uri = f'messages/{es_type}/{m.message_id}/_update?retry_on_conflict=10'
update_uri = f'messages/{es_type}/{m.message_id}/_update?retry_on_conflict=5'
try:
# give 1 second "lee way" for new event to have happened just before the old event
if m.ts >= (old_update_ts - timedelta(seconds=1)):
await self.es.post(update_uri, doc={'update_ts': m.ts, 'status': m.status})
log_each and main_logger.info('updating message %s, ts: %s, status: %s', m.message_id, m.ts, m.status)
await self.es.post(
update_uri,
script={
'inline': 'ctx._source.events.add(params.event)',
'params': {
'event': {
'ts': m.ts,
'status': m.status,
'extra': m.extra(),
}
}
}
)
await self.es.post(update_uri, doc={'update_ts': m.ts, 'status': m.status}, timeout_=20)
except ApiError as e: # pragma: no cover
# no error here if we know the problem
if e.status == 409:
main_logger.info('ElasticSearch conflict for %s, ts: %s, status: %s', m.message_id, m.ts, m.status)
return
else:
raise

log_each and main_logger.info('updating message %s, ts: %s, status: %s', m.message_id, m.ts, m.status)
await self.es.post(
f'events/{es_type}/',
message=m.message_id,
ts=m.ts,
status=m.status,
extra=m.extra(),
timeout_=20,
)


class AuxActor(Actor): # pragma: no cover
def __init__(self, settings: Settings = None, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion run-es.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ docker run --rm \
-e "transport.host=127.0.0.1" \
-e "xpack.security.enabled=false" \
-e "path.repo=[\"/snapshots\"]" \
docker.elastic.co/elasticsearch/elasticsearch:5.4.2
docker.elastic.co/elasticsearch/elasticsearch:5.6.2
Loading

0 comments on commit 24bc06d

Please sign in to comment.