From 40eb3332c926d8436a10db63ad552acf90dc66ea Mon Sep 17 00:00:00 2001 From: Romain Beaumont Date: Fri, 18 Mar 2022 00:50:44 +0100 Subject: [PATCH 1/4] inference example computing safety predictions on top of clip embeddings --- examples/inference_example.py | 62 +++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 examples/inference_example.py diff --git a/examples/inference_example.py b/examples/inference_example.py new file mode 100644 index 0000000..45c8782 --- /dev/null +++ b/examples/inference_example.py @@ -0,0 +1,62 @@ +from embedding_reader import EmbeddingReader +import fire +import os +os.environ["CUDA_VISIBLE_DEVICES"] = "" +import numpy as np +import fsspec +import math + +def load_safety_model(): + """load the safety model""" + import autokeras as ak # pylint: disable=import-outside-toplevel + from tensorflow.keras.models import load_model # pylint: disable=import-outside-toplevel + from os.path import expanduser # pylint: disable=import-outside-toplevel + + home = expanduser("~") + + cache_folder = home + "/.cache/clip_retrieval" + model_dir = cache_folder + "/clip_autokeras_binary_nsfw" + if not os.path.exists(model_dir): + os.makedirs(cache_folder, exist_ok=True) + + from urllib.request import urlretrieve # pylint: disable=import-outside-toplevel + + path_to_zip_file = cache_folder + "/clip_autokeras_binary_nsfw.zip" + url_model = ( + "https://raw.githubusercontent.com/LAION-AI/CLIP-based-NSFW-Detector/main/clip_autokeras_binary_nsfw.zip" + ) + urlretrieve(url_model, path_to_zip_file) + import zipfile # pylint: disable=import-outside-toplevel + + with zipfile.ZipFile(path_to_zip_file, "r") as zip_ref: + zip_ref.extractall(cache_folder) + + loaded_model = load_model(model_dir, custom_objects=ak.CUSTOM_OBJECTS) + loaded_model.predict(np.random.rand(10 ** 3, 768).astype("float32"), batch_size=10 ** 3) + + return loaded_model + + +def main(input_folder, output_folder, batch_size=10**6, end=None): + """main function""" + reader = EmbeddingReader(input_folder) + fs, relative_output_path = fsspec.core.url_to_fs(output_folder) + fs.mkdirs(relative_output_path, exist_ok=True) + + model = load_safety_model() + + total = reader.count + batch_count = math.ceil(total // batch_size) + padding = int(math.log10(batch_count)) + 1 + + for i, (embeddings, ids) in enumerate(reader(batch_size=batch_size, start=0, end=end)): + predictions = model.predict(embeddings, batch_size=embeddings.shape[0]) + batch = np.hstack(predictions) + padded_id = str(i).zfill(padding) + output_file_path = os.path.join(relative_output_path, padded_id + ".npy") + with fs.open(output_file_path, "wb") as f: + np.save(f, batch) + + +if __name__ == '__main__': + fire.Fire(main) From 915c612f7a26cf8523e9f50796f53d6ee7e2d6ce Mon Sep 17 00:00:00 2001 From: Romain Beaumont Date: Thu, 24 Mar 2022 00:38:33 +0100 Subject: [PATCH 2/4] better inference --- examples/inference_example.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/examples/inference_example.py b/examples/inference_example.py index 45c8782..e43f7c4 100644 --- a/examples/inference_example.py +++ b/examples/inference_example.py @@ -5,6 +5,7 @@ import numpy as np import fsspec import math +import pandas as pd def load_safety_model(): """load the safety model""" @@ -36,10 +37,20 @@ def load_safety_model(): return loaded_model +import mmh3 +def compute_hash(url, text): + if url is None: + url = '' -def main(input_folder, output_folder, batch_size=10**6, end=None): + if text is None: + text = '' + + total = (url + text).encode("utf-8") + return mmh3.hash64(total)[0] + +def main(embedding_folder, metadata_folder, output_folder, batch_size=10**6, end=None): """main function""" - reader = EmbeddingReader(input_folder) + reader = EmbeddingReader(embedding_folder, metadata_folder=metadata_folder, file_format="parquet_npy", meta_columns=["url", "caption"]) fs, relative_output_path = fsspec.core.url_to_fs(output_folder) fs.mkdirs(relative_output_path, exist_ok=True) @@ -49,13 +60,16 @@ def main(input_folder, output_folder, batch_size=10**6, end=None): batch_count = math.ceil(total // batch_size) padding = int(math.log10(batch_count)) + 1 - for i, (embeddings, ids) in enumerate(reader(batch_size=batch_size, start=0, end=end)): + for i, (embeddings, ids) in enumerate(reader(batch_size=batch_size, start=0, end=end, parallel_pieces=10, max_piece_size=10**4)): predictions = model.predict(embeddings, batch_size=embeddings.shape[0]) batch = np.hstack(predictions) padded_id = str(i).zfill(padding) - output_file_path = os.path.join(relative_output_path, padded_id + ".npy") + output_file_path = os.path.join(relative_output_path, padded_id + ".parquet") + df = pd.DataFrame(batch, columns=["prediction"]) + df["hash"] = [compute_hash(x, y) for x, y in zip(ids['url'], ids['caption'])] + df["url"] = ids['url'] with fs.open(output_file_path, "wb") as f: - np.save(f, batch) + df.to_parquet(f) if __name__ == '__main__': From 698346689b3fa70e7975b16d54b22f1e36e8ec55 Mon Sep 17 00:00:00 2001 From: Romain Beaumont Date: Wed, 20 Apr 2022 00:55:27 +0200 Subject: [PATCH 3/4] faster numpy parquet --- embedding_reader/parquet_numpy_reader.py | 33 +++++++++++++++++++----- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/embedding_reader/parquet_numpy_reader.py b/embedding_reader/parquet_numpy_reader.py index b02dc30..a0f2f5d 100644 --- a/embedding_reader/parquet_numpy_reader.py +++ b/embedding_reader/parquet_numpy_reader.py @@ -2,6 +2,7 @@ Assumptions: the numpy and parquet files should be completely aligned """ +from functools import lru_cache import pandas as pd from multiprocessing.pool import ThreadPool from tqdm import tqdm @@ -14,6 +15,26 @@ from threading import Semaphore import pyarrow.parquet as pq +r = Semaphore(1) + +d = {} +def open_parquet_file(fs, filename): + global d + r.acquire() + if filename in d: + r.release() + return d[filename]["t"] + # new file, new dict + if len(d) > 0: + for v in d.values(): + v["f"].close() + d = {} + print(filename) + f = fs.open(filename, "rb") + t = pq.read_table(f, use_threads=False) + d[filename] = {"f": f, "t": t} + r.release() + return t class ParquetNumpyReader: """Parquet numpy reader class, implements init to read the files headers and call to procuce embeddings batches""" @@ -102,12 +123,12 @@ def read_piece(piece): metadata_path = piece.metadata_path header_offset = piece.header_offset - with self.metadata_fs.open(metadata_path, "rb") as f: - length = end - start - table = pq.read_table(f, use_threads=False) - id_columns = self.metadata_column_names - table_slice = table.slice(start, length) - ids = table_slice.select(id_columns).to_pandas() + + length = end - start + id_columns = self.metadata_column_names + table = open_parquet_file(self.metadata_fs, metadata_path) + table_slice = table.slice(start, length) + ids = table_slice.select(id_columns).to_pandas() with self.fs.open(path, "rb") as f: length = end - start From 913f259b9a2859711717839c4c04c323f4fbc825 Mon Sep 17 00:00:00 2001 From: Romain Beaumont Date: Mon, 16 May 2022 01:19:55 +0200 Subject: [PATCH 4/4] better parquet numpy reader --- embedding_reader/parquet_numpy_reader.py | 43 +++++++++--------------- 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/embedding_reader/parquet_numpy_reader.py b/embedding_reader/parquet_numpy_reader.py index a0f2f5d..90008e2 100644 --- a/embedding_reader/parquet_numpy_reader.py +++ b/embedding_reader/parquet_numpy_reader.py @@ -2,7 +2,6 @@ Assumptions: the numpy and parquet files should be completely aligned """ -from functools import lru_cache import pandas as pd from multiprocessing.pool import ThreadPool from tqdm import tqdm @@ -15,26 +14,6 @@ from threading import Semaphore import pyarrow.parquet as pq -r = Semaphore(1) - -d = {} -def open_parquet_file(fs, filename): - global d - r.acquire() - if filename in d: - r.release() - return d[filename]["t"] - # new file, new dict - if len(d) > 0: - for v in d.values(): - v["f"].close() - d = {} - print(filename) - f = fs.open(filename, "rb") - t = pq.read_table(f, use_threads=False) - d[filename] = {"f": f, "t": t} - r.release() - return t class ParquetNumpyReader: """Parquet numpy reader class, implements init to read the files headers and call to procuce embeddings batches""" @@ -115,7 +94,8 @@ def __call__(self, batch_size, start=0, end=None, max_piece_size=None, parallel_ cols = PIECES_BASE_COLUMNS + metadata_columns Piece = namedtuple("Count", cols) - def read_piece(piece): + def read_piece(t): + (piece, table) = t try: start = piece.piece_start end = piece.piece_end @@ -123,10 +103,8 @@ def read_piece(piece): metadata_path = piece.metadata_path header_offset = piece.header_offset - length = end - start id_columns = self.metadata_column_names - table = open_parquet_file(self.metadata_fs, metadata_path) table_slice = table.slice(start, length) ids = table_slice.select(id_columns).to_pandas() @@ -149,13 +127,22 @@ def read_piece(piece): semaphore = Semaphore(parallel_pieces) stopped = False + # from path to table and file + open_parquet_files = {} - def piece_generator(pieces): + def piece_generator(pieces, open_parquet_files): + current_parquet_file = None for piece in (Piece(*parts) for parts in zip(*[pieces[col] for col in cols])): if stopped: break semaphore.acquire() - yield piece + if piece.metadata_path not in open_parquet_files: + file = self.metadata_fs.open(piece.metadata_path, "rb") + table = pq.read_table(file, use_threads=True) + open_parquet_files[piece.metadata_path] = {"file": file, "table": table} + if current_parquet_file != piece.metadata_path: + current_parquet_file = piece.metadata_path + yield (piece, open_parquet_files[piece.metadata_path]["table"]) batch = None batch_meta = None @@ -164,7 +151,7 @@ def piece_generator(pieces): if show_progress: pbar = tqdm(total=len(pieces)) with ThreadPool(parallel_pieces) as p: - for err, (data, meta, piece) in p.imap(read_piece, piece_generator(pieces)): + for err, (data, meta, piece) in p.imap(read_piece, piece_generator(pieces, open_parquet_files)): if err is not None: stopped = True semaphore.release() @@ -187,6 +174,8 @@ def piece_generator(pieces): batch = None batch_meta = None batch_offset = 0 + open_parquet_files[piece.metadata_path]["file"].close() + del open_parquet_files[piece.metadata_path] if show_progress: pbar.update(1)