-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize the python driver's serialization code. #4569
Comments
Thank you for the script and data @sontek ! |
One avenue might be to write a custom JSONDecoder that does pseudotype conversion at parse time. If we did that it would open the door to #3506 |
Yeah, I think having a JSONDecoder will be a great approach (I've monkeypatched it to use simplejson to get 5% perf boost) but until we convert that function to use a while loop instead of recursion then JSON actually isn't the slow part yet |
I was thinking in lieu of having the recursive function at all. So it would do the work of that recursive function during the initial parse. Right now it does everything in two passes, the second one being slow because of function call overhead in the recursive pseudotype conversion |
For anyone watching this thread, we're doing an on-the-wire performance test (without the Python driver) to figure out baseline server performance for this workload. Once we get this information, it will be relatively easy to figure out what we need to do to fix this. |
The raw server performance for a single client reading the data, measured without any parsing on the client-side, is approximately 0.8 seconds per 100,000 documents. This is using the documents generated by the script above, just slightly modified to store a more compact unix timestamp rather than a ReQL date/time object in the In this test, all data gets transferred to the client, but the client doesn't parse the response (except for a small header). @sontek would that be fast enough or do you need higher single-client throughput? |
@danielmewes Does that scale out linearly? Whats it at for 50,000 and 500,000? I think 0.8 seconds would be acceptable as long as the python driver overhead wasn't adding much on top of that. Another thing we could look at is building out an async query model like: import rethinkdb as r
conn = r.connect()
futures = []
for i in range(0, 2):
future = r.table('boom').filter(
lambda row: row['id'] % 2 == i
).run_async(conn)
futures.append(future)
for f in futures:
f.join() So if we know the queries scale linearly we can asynchronously spawn the queries off to get the desired query speed. So if there are 200k results set, in that example I would still expect to wait no more than 0.8 seconds to get the data. |
@sontek -- talked to @danielmewes in depth, and here are some more details:
Note that one reason why you were getting very slow speeds is that the We're doing a product planning meeting tomorrow where we'll discuss 2.2 scheduling (including Python driver optimizations). I'll be able to post an ETA then. Thanks for giving us the workload info -- this makes investigating all this dramatically easier! |
To help you with more data, I produced the same script using postgres to compare against:
So you can see on average it takes 283ms to query out 100k records from postgres: import random
import pytz
import time
import yappi
import psycopg2
from datetime import datetime
conn = psycopg2.connect("dbname=sontek user=sontek")
UTC = pytz.utc
YAPPI = False
DO_CREATE = False
DO_INSERTS = False
cur = conn.cursor()
if DO_CREATE:
cur.execute("""
CREATE TABLE test (respondent_id bigint, row_id bigint, column_id bigint, value_id bigint,
rid1 bigint, rid2 bigint, rid3 bigint, now timestamp);
""")
START_BIGINT = 100000000000000000
END_BIGINT = 999999999999999999
def utc_now():
now = datetime.utcnow()
tz_now = now.replace(tzinfo=UTC)
return tz_now
def get_rint(start=1000000, end=9999999):
"""
Generate a very large integer
:return:
"""
return random.randint(start, end)
def get_bigint():
"""
Generate a random BIGINT
:return:
"""
return get_rint(start=START_BIGINT, end=END_BIGINT)
if DO_INSERTS:
for i in range(0, 100000):
cur.execute("""
INSERT INTO test(respondent_id, row_id, column_id, value_id, rid1, rid2, rid3, now)
VALUES(%s, %s, %s, %s, %s, %s, %s, %s)
""", (get_bigint(), get_bigint(), get_bigint(), get_bigint(), get_bigint(),
get_bigint(), get_bigint(), utc_now())
)
start = time.time()
if YAPPI:
yappi.set_clock_type('cpu')
yappi.start(builtins=True)
cur.execute("""
SELECT * FROM test;
""")
result = cur.fetchall()
data = []
for row in result:
data.append(row)
count = len(data)
end = time.time()
conn.commit()
cur.close()
conn.close()
if YAPPI:
stats = yappi.get_func_stats()
stats.save('callgrind.out', type='callgrind')
print('checkout callgrind.out')
print("count is %s" % count)
duration = int(1000 * (end - start))
print("int query took %sms" % duration) @coffeemug Is there anymore information on what might be done on the rethinkdb side? |
Using postgres as a document store (since it supports
and here is the code for that one: import random
import pytz
import time
import yappi
import psycopg2
import ujson
from functools import partial
from datetime import datetime
from psycopg2.extras import register_default_jsonb
from psycopg2.extras import Json
json = partial(Json, dumps=ujson.dumps)
conn = psycopg2.connect("dbname=sontek user=sontek")
UTC = pytz.utc
YAPPI = False
DO_CREATE = False
DO_INSERTS = False
cur = conn.cursor()
register_default_jsonb(cur, loads=ujson.loads)
if DO_CREATE:
cur.execute("""
CREATE TABLE test2 (body jsonb)
""")
START_BIGINT = 100000000000000000
END_BIGINT = 999999999999999999
def utc_now():
now = datetime.utcnow()
tz_now = now.replace(tzinfo=UTC)
return tz_now
def get_rint(start=1000000, end=9999999):
"""
Generate a very large integer
:return:
"""
return random.randint(start, end)
def get_bigint():
"""
Generate a random BIGINT
:return:
"""
return get_rint(start=START_BIGINT, end=END_BIGINT)
if DO_INSERTS:
for i in range(0, 100000):
data = {
'respondent_id': get_bigint(),
'row_id': get_bigint(),
'column_id': get_bigint(),
'value_id': get_bigint(),
'rid1': get_bigint(),
'rid2': get_bigint(),
'rid3': get_rint(),
'now': utc_now()
}
cur.execute("""
INSERT INTO test2(body)
VALUES(%s)
""", [json(data)]
)
start = time.time()
if YAPPI:
yappi.set_clock_type('cpu')
yappi.start(builtins=True)
cur.execute("""
SELECT * FROM test2;
""")
result = cur.fetchall()
data = []
for row in result:
data.append(row)
count = len(data)
end = time.time()
conn.commit()
cur.close()
conn.close()
if YAPPI:
stats = yappi.get_func_stats()
stats.save('callgrind.out', type='callgrind')
print('checkout callgrind.out')
print("count is %s" % count)
duration = int(1000 * (end - start))
print("int query took %sms" % duration)
|
Running a few more tests today; should post more details by EOD. |
We're currently preparing for the next RethinkDB point release (2.1.3), which will come with an optimized Python driver (#4585, #4782, #4795). Using the RethinkDB 2.1.3 code base, switching the datetime object generated by
... or about 550ms per 100,000 rows. |
👍 |
In preparation for this we sat down and had one our devs write a C wrapper around https://github.com/kenrobbins/python-rapidjson You can see the performance differences here:
and it supports object_hook: import rapidjson
DATA = """
{
"attr1": "foo",
"attr2": "bar",
"_class": "Foo",
"attr3": {
"attr3_1": "spam",
"attr3_2": "egg",
"_class": "Bar"
}
}
"""
class Base(object):
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def __repr__(self):
s = "<%s: %s>" % (
self.__class__.__name__,
", ".join("%s=%s" %
(key, value) for key, value in self.__dict__.items()))
return s
class Foo(Base):
pass
class Bar(Base):
pass
def hook(dct):
class_name = dct.get('_class')
if class_name == 'Foo':
return Foo(**dct)
elif class_name == 'Bar':
return Bar(**dct)
return dct
def main():
"""
>>> main()
<Foo: _class=Foo, attr2=bar, attr3=<Bar: attr3_2=egg, attr3_1=spam, _class=Bar>, attr1=foo>
"""
result = rapidjson.loads(DATA, object_hook=hook)
print(result)
if __name__ == '__main__':
main() The one caveat is it only supports python3. |
I updated our json benchmark to be clearer:
|
@sontek That is awesome! |
@danielmewes Yeah, its significantly faster when dumping the data and its on-par with loading the data and supports both |
@danielmewes @coffeemug Do you have code example of how you were able to multiprocess and get down to 550ms per 100k? I setup a 4 node cluster and distributed 1 million rows evenly across them and I'm not seeing that performance, here is the code I wrote to test: import rethinkdb as r
import random
import rapidjson
import pytz
import time
import yappi
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
UTC = pytz.utc
YAPPI = False
DO_CREATE = False
DO_INSERTS = False
class RapidJsonDecoder(object):
def __init__(self, reql_format_opts):
pass
def decode(self, s):
return rapidjson.loads(s)
def create_decoder(format_opts):
return RapidJsonDecoder(format_opts)
conn = r.connect("mt1-rethinkd1c1")
conn._get_json_decoder = create_decoder
if DO_CREATE:
r.table_create('test').run(conn)
START_BIGINT = 100000000000000000
END_BIGINT = 999999999999999999
def utc_now():
now = datetime.utcnow()
tz_now = now.replace(tzinfo=UTC)
return tz_now
def get_rint(start=1000000, end=9999999):
"""
Generate a very large integer
:return:
"""
return random.randint(start, end)
def get_bigint():
"""
Generate a random BIGINT
:return:
"""
return get_rint(start=START_BIGINT, end=END_BIGINT)
if DO_INSERTS:
objects_to_insert = []
for i in range(0, 1000000):
objects_to_insert.append({
'respondent_id': get_bigint(),
'row_id': get_bigint(),
'column_id': get_bigint(),
'value_id': get_bigint(),
'rid1': get_bigint(),
'rid2': get_bigint(),
'rid3': get_rint(),
'now': utc_now()
})
if i % 4000 == 0:
r.table('test').insert(objects_to_insert).run(conn)
objects_to_insert = []
r.table('test').insert(objects_to_insert).run(conn)
start = time.time()
if YAPPI:
yappi.set_clock_type('cpu')
yappi.start(builtins=True)
def query(start, end):
data = []
tconn = r.connect("mt1-rethinkd1c1")
tconn._get_json_decoder = create_decoder
print('starting...', start, end)
results = r.table('test').between(start, end, index='rid3').run(tconn)
for result in results:
data.append(result)
print('finished...', start, end)
return data
def get_multi_proc():
start_count = 1000000
end_count = 2000000
max_numbers = 9999999
incrementer = 1000000
futures = []
with ProcessPoolExecutor(max_workers=4) as executor:
while end_count < max_numbers:
future = executor.submit(query, start_count, end_count)
futures.append(future)
start_count = end_count
end_count = start_count + incrementer
future = executor.submit(query, start_count, end_count)
futures.append(future)
data = []
for future in futures:
data += future.result()
return data
def get_single():
# select all
data = []
result = r.table('test').run(conn)
for row in result:
data.append(row)
return data
#data = get_single()
data = get_multi_proc()
count = len(data)
end = time.time()
if YAPPI:
stats = yappi.get_func_stats()
stats.save('callgrind.out', type='callgrind')
print('checkout callgrind.out')
print("count is %s" % count)
duration = int(1000 * (end - start))
print("int query took %sms" % duration) I also created a secondary index on |
This is the run of the code from that:
|
@sontek The 550ms were actually with a single client without splitting the query. The only difference (apart from hardware) that I see is the You can speed up the insert part by using the following code for if DO_INSERTS:
for i in range(0, 300000):
r.table('test_numeric').insert({
'respondent_id': get_bigint(),
'row_id': get_bigint(),
'column_id': get_bigint(),
'value_id': get_bigint(),
'rid1': get_bigint(),
'rid2': get_bigint(),
'rid3': get_rint(),
'now': get_rint()
}).run(conn, noreply=True, durability="soft")
conn.noreply_wait() |
For the |
Also note that this use case (returning a large resultset) isn't yet well parallelizable across shards. We're working on this now, and it will go into 2.2 (due out in about a month). So right now you won't see the benefit of sharding on this use case, but you will see a large benefit after 2.2 is out. |
After talking with @danielmewes a bit yesterday I tried to do a few benchmarks:
This is my results: You can see it pretty much always slows down from 1 to 4 nodes, here is the actual data:
This is the code: import rethinkdb as r
import random
import rapidjson
import pytz
import time
import yappi
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
UTC = pytz.utc
YAPPI = False
DO_CREATE = False
DO_INSERTS = False
SERVER = "mt1-rethinkd1c1"
TABLE = "test2"
class RapidJsonDecoder(object):
def __init__(self, reql_format_opts):
pass
def decode(self, s):
return rapidjson.loads(s)
def create_decoder(format_opts):
return RapidJsonDecoder(format_opts)
conn = r.connect(SERVER)
conn._get_json_decoder = create_decoder
if DO_CREATE:
r.table_create(TABLE).run(conn)
START_BIGINT = 100000000000000000
END_BIGINT = 999999999999999999
def utc_now():
now = datetime.utcnow()
tz_now = now.replace(tzinfo=UTC)
return tz_now.timestamp()
def get_rint(start=1000000, end=9999999):
"""
Generate a very large integer
:return:
"""
return random.randint(start, end)
def get_bigint():
"""
Generate a random BIGINT
:return:
"""
return get_rint(start=START_BIGINT, end=END_BIGINT)
if DO_INSERTS:
objects_to_insert = []
for i in range(0, 1000000):
objects_to_insert.append({
'survey_id': get_rint(start=1, end=4),
'respondent_id': get_bigint(),
'row_id': get_bigint(),
'column_id': get_bigint(),
'value_id': get_bigint(),
'rid1': get_bigint(),
'rid2': get_bigint(),
'rid3': get_rint(),
'now': utc_now()
})
if i % 4000 == 0:
r.table(TABLE).insert(objects_to_insert).run(conn)
objects_to_insert = []
r.table(TABLE).insert(objects_to_insert).run(conn, noreply=True, durability="soft")
start = time.time()
if YAPPI:
yappi.set_clock_type('cpu')
yappi.start(builtins=True)
def query(survey_id):
data = []
tconn = r.connect("mt1-rethinkd1c1")
tconn._get_json_decoder = create_decoder
results = r.table(TABLE).filter({'survey_id': survey_id}).run(tconn)
for result in results:
data.append(result)
return data
def get_multi_proc(use_threads=True):
futures = []
if use_threads:
klass = ThreadPoolExecutor
else:
klass = ProcessPoolExecutor
with klass(max_workers=4) as executor:
for i in range(1, 5):
future = executor.submit(query, i)
futures.append(future)
data = []
for future in futures:
data += future.result()
return data
def get_single():
# select all
data = []
# result = r.table(TABLE).run(conn)
result = r.table(TABLE).between(1, 5, index='survey_id').run(conn)
# result = r.table(TABLE).filter(
# (r.row['survey_id'] >= 1) & (r.row['survey_id'] <= 4)
# ).run(conn)
for row in result:
data.append(row)
return data
#data = get_single()
#data = get_multi_proc()
data = get_multi_proc(use_threads=False)
count = len(data)
end = time.time()
if YAPPI:
stats = yappi.get_func_stats()
stats.save('callgrind.out', type='callgrind')
print('checkout callgrind.out')
print("count is %s" % count)
duration = int(1000 * (end - start))
print("int query took %sms" % duration) I have a secondary index on Here is a google drive excel document with the numbers in it: https://docs.google.com/spreadsheets/d/1kfyt9DjYrZwXowCVx4eSA7kLUrgK6oYvgbmtoOnXsTE/edit?usp=sharing |
I'm not able to reproduce that 500ms per 100k resultsets you were able to get. The fastest I'm able to get is with the threaded worker which gets be down to 11seconds for 1million records |
The difference in performance could be because of hardware differences. We'll re-test this with a configuration closer to yours. |
I'm marking this closed. The prominent inefficiencies in the Python driver have been addressed in the past few releases. @sontek I'll contact you directly in the next days regarding any additional testing and/or bottlenecks. |
Currently if you want to work with 100,000 records it is extremely slow:
4 seconds to query 164,000 rows! I dug into this a little bit using the python library
yappi
and the toolkcachegrind
to see where it was spending the time. Here is the python code I used:and this showed that most of the time is spent in
recursively_convert_pseudo_types
, here is a screenshot of the kcachegrind:The text was updated successfully, but these errors were encountered: