/
join_merge_benchmark.py
64 lines (51 loc) · 2.54 KB
/
join_merge_benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import argparse
import ray
import os
import modin.pandas as pd
from utils import time_logger
parser = argparse.ArgumentParser(description='arithmetic benchmark')
parser.add_argument('--left', dest='left', help='path to the left csv data '
'file')
parser.add_argument('--right', dest='right', help='path to the right csv data '
'file')
parser.add_argument('--logfile', dest='logfile', help='path to the log file')
args = parser.parse_args()
file_left = args.left
file_size_left = os.path.getsize(file_left)
file_right = args.right
file_size_right = os.path.getsize(file_right)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df_left = pd.read_csv(file_left)
df_right = pd.read_csv(file_right)
blocks = df_left._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
blocks = df_right._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger("Inner Join: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left,
file_size_right)):
result = df_left.join(df_right, how="inner", lsuffix='left_')
ray.wait(result._block_partitions.flatten().tolist())
with time_logger("Outer Join: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left,
file_size_right)):
result = df_left.join(df_right, how="outer", lsuffix='left_')
ray.wait(result._block_partitions.flatten().tolist())
with time_logger("Inner Merge: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left,
file_size_right)):
result = df_left.merge(df_right, how="inner",
left_index=True, right_index=True)
ray.wait(result._block_partitions.flatten().tolist())
with time_logger("Outer Merge: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left,
file_size_right)):
result = df_left.merge(df_right, how="outer",
left_index=True, right_index=True)
ray.wait(result._block_partitions.flatten().tolist())