In [None]:
# !pip install -U sentence-transformers

In [1]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')

In [2]:
sentences = ['This framework generates embeddings for each input sentence',
    'Sentences are passed as a list of string.', 
    'The quick brown fox jumps over the lazy dog.']
sentences_1 = ['This framework generates embeddings for each input sentence',]

In [3]:
sentence_embeddings = model.encode(sentences)

In [4]:
sentence_embeddings.shape

(3, 384)

In [5]:
type(sentence_embeddings)

numpy.ndarray

In [6]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [7]:
from apache_beam.options.pipeline_options import PipelineOptions

In [8]:
p = beam.Pipeline(InteractiveRunner(), options=PipelineOptions())

In [9]:
output = p | beam.Create(sentences) | beam.Map(lambda x: [model.encode(x)])



In [10]:
type(output)

apache_beam.pvalue.PCollection

In [11]:
output1 = p | beam.Create(sentences_1) | beam.Map(lambda x: [model.encode(x)])

In [12]:
ib.show_graph(p)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
/usr/local/bin/dot
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


In [13]:
res = ib.collect(output)



In [14]:
res.shape

(3, 1)

In [15]:
res

Unnamed: 0,0
0,"[-0.013717369, -0.042851534, -0.015628567, 0.0..."
1,"[0.0564525, 0.055002406, 0.03137959, 0.0339485..."
2,"[0.04393355, 0.058934387, 0.048178356, 0.07754..."


In [16]:
res1 = ib.collect(output1)



In [17]:
res1

Unnamed: 0,0
0,"[-0.013717369, -0.042851534, -0.015628567, 0.0..."


In [18]:
from sentence_transformers import util

# Any user-defined function.
# cross join is used as an example.
def cross_join(left, rights):
    for x in rights:
        yield (left, x)
    
comb_result = (
    output
    | 'ApplyCrossJoin' >> beam.FlatMap(
        cross_join, rights=beam.pvalue.AsIter(output1))
    | "Cosine" >> beam.Map(lambda x: float(util.cos_sim(x[0], x[1])[0][0])))

In [19]:
sim_res = ib.collect(comb_result)

  a = torch.tensor(a)


In [20]:
sim_res[0]

0    1.000000
1    0.538079
2    0.118056
Name: 0, dtype: float64

In [21]:
type(comb_result)

apache_beam.pvalue.PCollection

In [121]:
from pydantic import BaseModel, Field
from typing import List, Any

class Block(BaseModel):
    source: List["Block"] = []
    target: List["Block"] = []
    operation: beam.ParDo
    o: beam.pvalue.PCollection = None
    
    class Config:
        arbitrary_types_allowed = True

In [122]:
class SentenceEmbeddingBlock(Block):
    operation: beam.ParDo = Field(default=beam.Map(lambda x: [model.encode(x)]))

In [123]:
# block for beam.Create
from pydantic import BaseModel, ValidationError, root_validator

class CreateBlock(Block):
    operation: beam.Create
    values: List[Any]

    @root_validator(pre=True)
    def _set_fields(cls, values: dict) -> dict:
        values["operation"] = beam.Create(values["values"])
        return values

In [124]:
block_p = beam.Pipeline(InteractiveRunner(), options=PipelineOptions())

In [125]:
embed = SentenceEmbeddingBlock()

In [126]:
embed.o

In [127]:
create = CreateBlock(values = sentences)

In [128]:
create.operation

<Create(PTransform) label=[Create] at 0x16040f6a0>

In [129]:
create.o

In [130]:
class BlockAssembler:
    def __init__(self, blocks: List[Block], p: beam.pipeline.Pipeline):
        self.blocks = blocks
        self.p = p
    @classmethod
    def Sequential(cls, blocks: List[Block], p: beam.pipeline.Pipeline):
        # connect all the blocks using the list order
        for i, block in enumerate(blocks):
            if i > 0:
                block.source = [blocks[i-1]]
            if i < (len(blocks)-1):
                block.target = [blocks[i+1]]
        return cls(blocks, p)

    def compile(self):
        # sequential
        o = self.p
        for block in self.blocks:
            block.o = o | block.operation
            o = block.o
        return 
    
    def show_graph(self):
        ib.show_graph(self.p)

In [131]:
blocks = BlockAssembler.Sequential([create, embed], p=block_p)

In [132]:
blocks.compile()



In [133]:
blocks.blocks[0].target

[SentenceEmbeddingBlock(source=[CreateBlock(source=[], target=[SentenceEmbeddingBlock(source=[...], target=[], operation=<ParDo(PTransform) label=[[132]: Map(<lambda at 2263565550.py:2>)] at 0x16032f940>, o=<PCollection[[132]: Map(<lambda at 2263565550.py:2>).None] at 0x1601f5f70>)], operation=<Create(PTransform) label=[[132]: Create] at 0x16040f6a0>, o=<PCollection[[132]: Create/Map(decode).None] at 0x160307df0>, values=['This framework generates embeddings for each input sentence', 'Sentences are passed as a list of string.', 'The quick brown fox jumps over the lazy dog.'])], target=[], operation=<ParDo(PTransform) label=[[132]: Map(<lambda at 2263565550.py:2>)] at 0x16032f940>, o=<PCollection[[132]: Map(<lambda at 2263565550.py:2>).None] at 0x1601f5f70>)]

In [134]:
blocks.show_graph()

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
/usr/local/bin/dot
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
