In [1]:
%load_ext beam_setup

Setting up the Beam environment for interactive use
Standard modules will be automatically imported so you can use them without explicit import
Done importing packages. It took:  4.4 seconds
Beam library is loaded from path: /home/elad/docker/beamds/beam
The Beam version is: 2.5.12


# Beam Resources

Beam features a unified and simplified *resource* API. A resource is defined as an **external entity** that operates outside the immediate memory space or execution context of the current program. Common examples include files, remote storage services, webpages, and remote algorithms. Conveniently, each resource can be uniquely identified by a [URI](https://en.wikipedia.org/wiki/Uniform_Resource_Identifier) (Uniform Resource Identifier). This URI is a string that specifies the resource type (scheme), its address, and additional metadata required to interact with it effectively.

As a programmer, when developing software, your primary concern often isn't the internal mechanics of a resource but rather the capabilities it offers through its API. For instance, in the context of storage, you might be interested in how you can read from and write to files efficiently, regardless of their physical location. Indeed, it would be beneficial if you could switch seamlessly between resources as needed. For example, if you initially used NFS for file storage and later decided to switch to S3, you’d prefer to make this transition with minimal changes to your program.

Beam simplifies this process beautifully. It allows you to transition between different resources merely by updating a URI string in your configuration files. This means you can switch resources without any code changes, significantly easing development and maintenance.

Here are the resources which Beam currently supports:

In [2]:
from beam.resources import resource_names
from beam.utils import pprint, pretty_print_dict
print(pretty_print_dict(resource_names, dense=False))

path=['file', 's3', 's3-pa', 'hdfs', 'hdfs-pa', 'sftp', 'comet', 'io', 'dict', 'redis', 'smb', 'nt', 'mlflow'],
serve=['beam-http', 'beam-https', 'beam-grpc', 'triton', 'triton-http', 'triton-grpc', 'triton-https', 'triton-grpcs'],
distributed=['async-http', 'async-https'],
llm=['openai', 'vllm', 'tgi', 'fastchat', 'huggingface', 'samurai', 'samur-openai', 'fastapi-dp'],
triton=['triton', 'triton-http', 'triton-grpc', 'triton-https', 'triton-grpcs'],
ray=['ray']


As you can see Beam supports many different storage endpoints. Imprtantly, they all share the same API, i.e. a [pathlib](https://docs.python.org/3/library/pathlib.html) augmented API that facilitates the interaction with files and folders and make sure that you can easily switch between different endpointsץ

## Path Like Resources

All resources are accessed by a single function named *resouce*, by providing their URI.

In [3]:
from beam import resource

Lets start with interacting with the simplest resource, i.e. a file, in this case we do not need to provide the scheme or the location (file is the default scheme).

In [19]:
path = resource('/tmp/path/to/folder')

The path resource follows the pathlib api, so functions like: mkdir, joinpath, exists are implemented:

In [20]:
path.mkdir()

In [21]:
path.exists()

True

In [22]:
file_path = path.joinpath('a.pt')

In [24]:
file_path.exists()

False

but it also supports read and write operations which determine the file type via the extention string

In [25]:
file_path.write(torch.randn(4))

file:///tmp/path/to/folder/a.pt

In [26]:
print(file_path.read())

tensor([ 1.1139,  0.1157, -1.0144, -0.8021])


it supports many file types including: torch (.pt), pickle (.pkl), feather (.fea), parquet (.parquet) and many more (see beam.path.core.PureBeamPath read and write operations)

we can also specify how we would like to store the file

In [27]:
path.joinpath('some_name_with_no_extention').write(np.random.randn(4), ext='.pkl')

file:///tmp/path/to/folder/some_name_with_no_extention

In [28]:
path.joinpath('some_name_with_no_extention').read(ext='.pkl')

array([ 0.06095397, -0.21831003,  1.29672265,  0.57001603])

we can also iter and list folders

In [29]:
list(path.iterdir())
list(path)

[file:///tmp/path/to/folder/some_name_with_no_extention,
 file:///tmp/path/to/folder/a.pt]

To access paths on different storage platforms use the standard URI conventions:

path = resource('scheme://\<hostname\>:\<port\>/path/to/my/file?arg1=val1&arg2=val2')

Some examples:
* s3 on AWS: s3:///\<bucket name\>/<\object\>?access_key=\<my aws access key\>&secret_key=\<my secret access key\>
* s3 on Minio: s3://\<hostname\>:\<port\>/\<bucket name\>/<\object\>?access_key=\<my aws access key\>&secret_key=\<my secret access key\>?tls=false
* HDFS: hdfs://\<hostname\>:\<http connection port usually 9870\>/path/to/my/file?access_key=\<my hdfs access key\>?tls=\<whether connecting via https\>


Note that you can replace the scheme s3 with s3-pa and hdfs with hdfs-pa to get access via pyarrow which can increase performance instead of native implementations like boto3.
For hdfs-pa you may need to replace the port to the data node communication port: usually at 50010 (consult hdfs-site.xml and core-site.xml files for details)  

For example, here we connect to a MinIO S3 compatible storage and repeat the same API commands we used when connecting to local files

In [4]:
path = resource('s3://172.17.0.1:9000/sandbox/?access-key=myaccesskey&secret-key=mysecretkey&tls=false')

In [6]:
file_path = path.joinpath('a.pt')

In [7]:
file_path.write(torch.randn(4))

s3://172.17.0.1:9000/sandbox/a.pt

In [8]:
file_path.read()

tensor([-0.3245,  1.0341,  0.9378, -2.1612])

## Use resource to access Large Language Models (LLMs)

You can use resource also to access LLMs on various platforms: openai, fastchat, tgi, internal fastapi, local huggingface

before accessing to the LLM, note that we can store access keys to our environment and to a local file s.t. it stays permanenty in our system

In [10]:
# this would print the stored key in your system
# beam_key['OPENAI_API_KEY']

In [11]:
# this will assign a new key to your system
# beam_key['OPENAI_API_KEY'] = 'my_key_here'

In [12]:
llm = resource('openai:///gpt-4')

you can give any model instructions or chat with it

In [13]:
llm.ask('when israel was founded? give me exact date').text

'Israel was founded on May 14, 1948.'

you can chat with the model

In [14]:
llm.chat('Hi my name is elad').text

'Hello Elad! How can I assist you today?'

In [15]:
llm.chat('Hi again, do you remember my name?').text

'Yes, your name is Elad. How can I assist you further?'

In [16]:
llm.chat('Hi again, do you remember my name?', reset_chat=True).text

"As an AI, I don't have the ability to remember personal data unless it's shared with me in the course of our conversation. I am designed to respect user privacy and confidentiality."

You can also parse the response in other formats

In [17]:
llm.ask('Hi how are you today? answer in a JSON format').json

{'status': 'success',
 'message': "I'm an AI, I don't have feelings, but I'm functioning as expected."}

In [18]:
llm.ask('Hi how are you today? answer in a YAML format').yaml

{'status': {'response': "I'm an AI, I don't have feelings, but I'm functioning as expected. Thank you for asking."}}

You can use the LLM also directly with langchain without any further wrapper

In [19]:
from langchain.schema import HumanMessage

text = "What would be a good company name for a company that makes colorful socks?"
messages = [HumanMessage(content=text)]

In [20]:
llm.invoke(text)

'1. "Rainbow Steps"\n2. "Colorful Cozies"\n3. "Spectrum Socks"\n4. "Vibrant Footprints"\n5. "Kaleidoscope Kicks"\n6. "Technicolor Toes"\n7. "Prismatic Peds"\n8. "ColorSocks"\n9. "Vivid Steps"\n10. "Hue Crew Socks"\n11. "ColorStride"\n12. "BrightSox"\n13. "Spectrum Strides"\n14. "Colorful Comfort"\n15. "Rainbow Wraps"\n16. "Vivid Veils"\n17. "ChromaSox"\n18. "Colorful Cushions"\n19. "Kaleidosocks"\n20. "Rainbow Rugs".'

In [21]:
llm.invoke(messages)

'"Rainbow Steps"'

With our URIs, You can also use openai syntax with any model (not just openai), simply import our simulator instead of openai

In [22]:
from beam.llm import openai_simulator as openai

In [23]:
llm = resource('openai:///gpt-4')

In [24]:
openai.Completion.create(prompt='2**10=?', model='openai:///text-davinci-003')

<OpenAIObject text_completion id=cmpl-8V35kGo6b7yXlv2Vsmgo6yaNLqEos at 0x7f6c5118fe70> JSON: {
  "id": "cmpl-8V35kGo6b7yXlv2Vsmgo6yaNLqEos",
  "object": "text_completion",
  "created": 1702410744,
  "model": "text-davinci-003",
  "choices": [
    {
      "text": "\n\n1024",
      "index": 0,
      "logprobs": null,
      "finish_reason": "stop"
    }
  ],
  "usage": {
    "prompt_tokens": 5,
    "completion_tokens": 3,
    "total_tokens": 8
  }
}

To access other LLM resource types follow the URI convention:

llm = resource('scheme://\<hostname\>:\<port\>/path/to/my/file?arg1=val1&arg2=val2')

possible schemes: openai, fastchat, tgi, fastapi, huggingface

## Use resource to access BeamServer algorithms

You can use beam also to quickly deploy an algorithm via ssh. You can then access this algorithm with a resource object from any machine that can access the server.

For example, here we build a sparse similarity server (like faiss but for sparse vectors e.g. TFIDF vectors)

In [26]:
from beam.sparse import SparseSimilarity
from beam.serve import beam_server
M = 40000

In [30]:
sparse_sim = SparseSimilarity(metric='cosine', format='coo', vec_size=M, device='cpu', k=10)
server = beam_server(sparse_sim, backend='waitress', non_blocking=True)

[32m2023-12-12 19:56:59[0m | BeamLog | [1mINFO[0m | [1mOpening a flask inference serve on port: 27450[0m


In [31]:
def gen_coo_vectors(k, M, nel):

    r = []
    c = []
    v = []

    for i in range(k):
        r.append(i * torch.ones(nel, dtype=torch.int64))
        c.append(torch.randint(M, size=(nel,)))
        v.append(torch.randn(nel))

    return torch.sparse_coo_tensor(torch.stack([torch.cat(r), torch.cat(c)]), torch.cat(v), size=(k, M))

In [32]:
s1 = gen_coo_vectors(20000, M, 100)
s2 = gen_coo_vectors(20, M, 100)

In [35]:
sparse_sim = resource('beam-http://localhost:27450')

In [36]:
sparse_sim.add(s1)

In [37]:
%%time
dist, ind = sparse_sim.search(s2, k=10)



CPU times: user 2.44 s, sys: 1.59 s, total: 4.03 s
Wall time: 532 ms


The beam server can wrap any function or class for quick and easy deployment

In [38]:
def llm_executor_and_store(prompt, llm='openai:///text-davinci-003?max_tokens=2048'):
    llm = resource(llm)
    code = llm.ask(f"Return executable python code that performs the following task. The final result should assigned to a variable name 'res':\n{prompt}\n\n\n").text
    try:
        exec(code)
        return res, code
    except:
        return 'ExecutionError', code

In [39]:
beam_server(llm_executor_and_store, backend='waitress', non_blocking=True)

<beam.serve.http_server.HTTPServer at 0x7f6c506df5e0>

[32m2023-12-12 19:58:09[0m | BeamLog | [1mINFO[0m | [1mOpening a flask inference serve on port: 27451[0m


In [41]:
remote_executor = resource('beam-http://localhost:27451')

In [42]:
r, c = remote_executor('what is the 18th number in the fibonacci series?')

print(c)
print(r)

def fibonacci(n): 
    a = 0
    b = 1
    if n < 0: 
        print("Incorrect input") 
    elif n == 0: 
        return a 
    elif n == 1: 
        return b 
    else: 
        for i in range(2,n): 
            c = a + b 
            a = b 
            b = c 
        return b 
  
res = fibonacci(18)
ExecutionError
