Skip to content

Commit

Permalink
analysis/k_specific: write in chunks, multiprocessed
Browse files Browse the repository at this point in the history
Types are changed to uint64 row, uint16 k_size, float64 values.
The output differs, but for the better.
A neat `("value", np.float64 if self.clock_frequency else str)` hack
has been considered, but, turns out, opaque strings are slower than doubles.
~41MB/s on my machine
  • Loading branch information
t184256 committed Apr 5, 2024
1 parent 9cc8852 commit 37650be
Showing 1 changed file with 48 additions and 41 deletions.
89 changes: 48 additions & 41 deletions tlsfuzzer/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1860,18 +1860,15 @@ def _read_bit_size_measurement_file(self, status=None):
first_line = in_fp.readline().split(',')
previous_row = int(first_line[0])
max_k_size = int(first_line[1])
previous_max_k_value = pd.to_numeric(float(first_line[2]),
downcast='float')
previous_max_k_value = float(first_line[2])

if self.clock_frequency:
previous_max_k_value /= self.clock_frequency

yield (previous_max_k_value, previous_max_k_value, max_k_size)

chunks = pd.read_csv(
in_fp, iterator=True, chunksize=100000,
dtype=[("row", np.int16), ("k_size", np.int16),
("value", np.float32)],
dtype=[("row", np.uint64), ("k_size", np.uint16),
("value", np.float64)],
names=["row", "k_size", "value"])

for chunk in chunks:
Expand Down Expand Up @@ -1914,19 +1911,37 @@ def _read_bit_size_measurement_file(self, status=None):

out = chunk.drop(columns="row")
out = out.assign(curr_maxk_val=curr_maxk_vals)[mask]
for vs in zip(out['curr_maxk_val'], out['value'],
out['k_size']):
yield vs
yield max_k_size, out

previous_row = rows.iat[-1]
previous_max_k_value = curr_maxk_vals.iat[-1]

@staticmethod
def _k_specific_writing_worker(k_folder_path, pipe, k_size, max_k_size):
os.makedirs(k_folder_path)

with open(join(k_folder_path, "timing.csv"), 'wb') as f:
header = (
"{0},{1}\n".format(max_k_size, k_size)
if k_size != max_k_size else
"{0},{0}-sanity\n".format(max_k_size)
)
f.write(header.encode('ascii'))

while True:
subchunk = pipe.recv()
if subchunk is None:
break
subchunk = subchunk[['curr_maxk_val', 'value']]
subchunk.to_csv(f, header=False, index=False)
pipe.close()

def create_k_specific_dirs(self):
"""
Creates a folder with timing.csv for each K bit-size so it can be
analyzed one at a time.
"""
k_size_files = {}
k_size_process_pipe = {}

if self.verbose:
print("Creating a dir for each bit size...")
Expand All @@ -1945,45 +1960,37 @@ def create_k_specific_dirs(self):
except FileNotFoundError:
pass

data_iter = self._read_bit_size_measurement_file(status=status)

data = next(data_iter)
max_k_size = data[2]
measurement_iter = self._read_bit_size_measurement_file(status=status)

try:
for data in data_iter:
k_size = data[2]

if k_size not in k_size_files:
k_folder_path = join(
self.output,
"analysis_results/k-by-size/{0}".format(k_size)
)

os.makedirs(k_folder_path)
k_size_files[k_size] = open(
join(k_folder_path, "timing.csv"), 'w',
encoding="utf-8"
)
if k_size == max_k_size:
header = "{0},{0}-sanity\n".format(max_k_size)
else:
header = "{0},{1}\n".format(max_k_size, k_size)
k_size_files[k_size].write(header)

k_size_files[k_size].write(
"{0},{1}\n".format(data[0], data[1])
)
for max_k_size, chunk in measurement_iter:
for k_size, subchunk in chunk.groupby("k_size"):
if k_size not in k_size_process_pipe:
pipe_recv, pipe_send = mp.Pipe(duplex=False)
k_folder_path = join(
self.output,
"analysis_results/k-by-size/{0}".format(k_size)
)
p = mp.Process(target=self._k_specific_writing_worker,
args=(k_folder_path, pipe_recv,
k_size, max_k_size))
p.start()
k_size_process_pipe[k_size] = (p, pipe_send)

_, pipe = k_size_process_pipe[k_size]
pipe.send(subchunk)
finally:
for process, pipe in k_size_process_pipe.values():
pipe.send(None)
pipe.close()
process.join()

if status:
status[2].set()
progress.join()
print()

for file in k_size_files.values():
file.close()

k_sizes = list(k_size_files.keys())
k_sizes = list(k_size_process_pipe.keys())
k_sizes = sorted(k_sizes, reverse=True)

if self.skip_sanity and max_k_size in k_sizes:
Expand Down

0 comments on commit 37650be

Please sign in to comment.