In [2]:
import ray
import torch
import os
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import langchain_community
from ray.data import ActorPoolStrategy
from tqdm import tqdm
import pandas as pd
from ray.data import from_pandas
from functools import partial
from langchain.text_splitter import RecursiveCharacterTextSplitter
import torch
from functools import partial

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device}.")

Using cuda.


### Initializing Ray

In [4]:
runtime_env = {
    "pip": [
        "langchain-text-splitters",
        "langchain_community", 
        "sentence_transformers"
    ],
}

if not ray.is_initialized():
    ray.init(runtime_env=runtime_env)
else:
    ray.shutdown()

2024-04-02 07:31:36,964	INFO worker.py:1567 -- Connecting to existing Ray cluster at address: 10.10.2.206:6379...
2024-04-02 07:31:36,977	INFO worker.py:1743 -- Connected to Ray cluster. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
[36m(process_chunk pid=665906, ip=10.10.2.65)[0m Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.
[36m(process_chunk pid=665906, ip=10.10.2.65)[0m Traceback (most recent call last):
[36m(process_chunk pid=665906, ip=10.10.2.65)[0m   File "/home/ubuntu/miniconda3/envs/ray/lib/python3.9/site-packages/ray/_private/serialization.py", line 404, in deserialize_objects
[36m(process_chunk pid=665906, ip=10.10.2.65)[0m     obj = self._deserialize_object(data, metadata, object_ref)
[36m(process_chunk pid=665906, ip=10.10.2.65)[0m   File "/home/ubuntu/miniconda3/envs/ray/lib

In [6]:
available_resources = ray.available_resources()
print("Verfügbare Ressourcen:", available_resources)

Verfügbare Ressourcen: {'CPU': 32.0, 'object_store_memory': 17882033356.0, 'memory': 40631812916.0, 'GPU': 4.0, 'accelerator_type:T4': 4.0, 'node:10.10.3.5': 1.0, 'node:10.10.3.72': 1.0, 'node:10.10.2.65': 1.0, 'node:10.10.2.206': 1.0, 'node:__internal_head__': 1.0}


In [4]:
directory_path = "data/pubmed/chunk/"
file_names = os.listdir(directory_path)
file_paths = [os.path.join(directory_path, file_name) for file_name in file_names]
jsonl_file_paths = [file_path for file_path in file_paths if file_path.endswith('.jsonl')]

jsonl_file_paths[:5]

['data/pubmed/chunk/pubmed23n0046.jsonl',
 'data/pubmed/chunk/pubmed23n0050.jsonl',
 'data/pubmed/chunk/pubmed23n0003.jsonl',
 'data/pubmed/chunk/pubmed23n0068.jsonl',
 'data/pubmed/chunk/pubmed23n0010.jsonl']

### Using only head node for embedding.

Initializing BioBERT Embedding Model

In [8]:
from sentence_transformers import SentenceTransformer, models
import torch

class EmbedChunks:
    def __init__(self, max_length=512):
        if torch.cuda.is_available():
            self.device = "cuda"
        else:
            self.device = "cpu"
        
        self.max_length = max_length
        
        # Laden des vortrainierten BioBERT-Modells und Hinzufügen eines MEAN-Pooling-Layers
        # Durchschnitt der Werte der Eingabemerkmale berechnet und als Ausgabe verwendet
        word_embedding_model = models.Transformer('dmis-lab/biobert-v1.1', max_seq_length=self.max_length)
        pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(),
                                       pooling_mode_mean_tokens=True,
                                       pooling_mode_cls_token=False,
                                       pooling_mode_max_tokens=False)

        self.model = SentenceTransformer(modules=[word_embedding_model, pooling_model], device=self.device)

    def __call__(self, batch):
        contents = [item["content"] for item in batch]
        embeddings = self.model.encode(contents, batch_size=len(contents), show_progress_bar=False)
        return [{"id": item["id"], "title": item["title"], "content": item["content"], "PMID": item.get("PMID", None), "embeddings": embedding.tolist()} for item, embedding in zip(batch, embeddings)]


Iterating through every JSONL file adding the attribute "embeddings"

In [None]:
import os
import json
from pathlib import Path

embedder = EmbedChunks()

# Definiere die Pfade für die Quell- und Zielverzeichnisse
source_directory = Path('data/pubmed/chunk')
target_directory = Path('data/pubmed/embedded')
target_directory.mkdir(parents=True, exist_ok=True)

# Iteriert durch jede Datei im Quellverzeichnis
for file_name in os.listdir(source_directory):
    if file_name.endswith('.jsonl'):
        source_file = source_directory / file_name
        target_file = target_directory / file_name

        # Erstellt eine neue Datei im Zielverzeichnis
        with open(target_file, 'w') as target:
            with open(source_file, 'r') as source:
                for line in source:
                    # Jede Zeile ist ein JSON-Objekt
                    item = json.loads(line)
                    # Verarbeite das Item mit EmbedChunks
                    embedded_item = embedder([item])[0]  # [0], weil embedder eine Liste zurückgibt
                    # Schreibe das bearbeitete Objekt in die Zieldatei
                    target.write(json.dumps(embedded_item) + '\n')
            print(f"{target_file} has been successfully written to data/pubmed/embedded")
                    

print("Alle Dateien wurden verarbeitet und gespeichert.")

data/pubmed/embedded/pubmed23n0117.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0118.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0182.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0143.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0167.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0170.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0152.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0101.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0115.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0114.jsonl has been successfully written to data/pubmed/embedded
data/pubmed/embedded/pubmed23n0166.jsonl has been 

To improve performance we'll try to distribute the embedding process on the Ray cluster using 4 nodes with GPUs. With one node the embedding of 200 JSONL files took 