In [1]:
# Event registry to store event-function mappings
event_registry = {}

In [2]:
def on_event(event_name):
    """
    A decorator that registers a function to an event.
    """
    def decorator(func):
        # Register the function in the event registry
        event_registry[event_name] = func
        return func
    return decorator

In [3]:
@on_event('start')
def start_event():
    print("Start event triggered!")


In [4]:
@on_event('stop')
def stop_event():
    print("Stop event triggered!")


In [5]:
@on_event('custom')
def custom_event(message):
    print(f"Custom event triggered with message: {message}")


In [6]:
def trigger_event(event_name, *args, **kwargs):
    """
    Function to trigger an event and run the corresponding function.
    """
    if event_name in event_registry:
        # Call the registered function with the provided arguments
        return event_registry[event_name](*args, **kwargs)
    else:
        print(f"No function registered for event '{event_name}'")
        return None

In [7]:
trigger_event('start')
trigger_event('stop')
trigger_event('custom', message="Hello, World!")
trigger_event('undefined_event')  # This will not trigger any function


Start event triggered!
Stop event triggered!
Custom event triggered with message: Hello, World!
No function registered for event 'undefined_event'


In [8]:
import re

# Event registry to store regex patterns and corresponding functions
event_registry = []

def on_event(event_pattern):
    """
    A decorator that registers a function to a regex event pattern.
    """
    def decorator(func):
        # Compile the regex pattern and store it with the function
        event_registry.append((re.compile(event_pattern), func))
        return func
    return decorator

def trigger_event(event_name, *args, **kwargs):
    """
    Function to trigger an event by matching the event_name against registered regex patterns.
    """
    for pattern, func in event_registry:
        match = pattern.match(event_name) 
        if match:
            # Call the matched function with the provided arguments
            return func(*args, **kwargs, **match.groupdict())
    print(f"No function registered for event '{event_name}'")
    return None

# Example usage
@on_event(r'^start$')
def start_event():
    print("Start event triggered!")

@on_event(r'^stop$')
def stop_event():
    print("Stop event triggered!")

@on_event(r'^custom/(?P<message>.+)$')
def custom_event(message):
    print(f"Custom event triggered with message: {message}")

# Trigger events
trigger_event('start')
trigger_event('stop')
trigger_event('custom/Hello, World!')
trigger_event('custom/Another message')
trigger_event('undefined_event')  # This will not trigger any function


Start event triggered!
Stop event triggered!
Custom event triggered with message: Hello, World!
Custom event triggered with message: Another message
No function registered for event 'undefined_event'


In [21]:
import re

# Event registry to store regex patterns and corresponding functions
event_registry = {}

def on_event(event_pattern):
    """
    A decorator that registers a function to a regex event pattern.
    """
    def decorator(func):
        event_registry.update({re.compile(event_pattern): func})
        return func
    return decorator

def trigger_event(event_name, *args, **kwargs):
    """
    Function to trigger an event by matching the event_name against registered regex patterns.
    """
    for pattern, func in event_registry.items():
        match = pattern.match(event_name)
        if match:
            # Call the matched function with the provided arguments and regex group dict
            return func(*args, **kwargs, **match.groupdict())
    print(f"No function registered for event '{event_name}'")
    return None

# Example function to handle the event
@on_event(r'^message/(?P<uuid>[0-9a-fA-F/-/]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})$')
def handle_message(uuid, message=None):
    print(f"Message event triggered with UUID: {uuid}")
    if message:
        print(f"Message: {message}")

# Trigger events with an optional 'message' argument
trigger_event('message/123e4567-e89b-12d3-a456-426614174000', message="This is a test message")
trigger_event('message/123e4567-e89b-12d3-a456-426614174000')  # Without the message argument


Message event triggered with UUID: 123e4567-e89b-12d3-a456-426614174000
Message: This is a test message
Message event triggered with UUID: 123e4567-e89b-12d3-a456-426614174000


In [19]:
len("message/123e4567-e89b-12d3-a456-426614174000")

44

In [10]:
@on_event(r'^message2/(?P<uuid>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})$')
def handle_message(uuid, *args, **kwargs):
    print(f"Message event triggered with UUID: {uuid}")
    print(f"{kwargs}", "hi")

In [11]:
trigger_event('message2/123e4567-e89b-12d3-a456-426614174000', message="This is a test message")

Message event triggered with UUID: 123e4567-e89b-12d3-a456-426614174000
{'message': 'This is a test message'} hi


In [12]:
import functools

def log_class_creation(cls):
    """A decorator to log when an instance of the class is created."""
    original_init = cls.__init__

    @functools.wraps(original_init)
    def new_init(self, *args, **kwargs):
        print(f"Creating instance of {cls.__name__} with arguments: {args} and kwargs: {kwargs}")
        original_init(self, *args, **kwargs)

    cls.__init__ = new_init
    return cls

# Example usage of the decorator
@log_class_creation
class ExampleClass:
    def __init__(self, value):
        self.value = value

    def display(self):
        print(f"Value is {self.value}")

# Create an instance to see the decorator in action
example = ExampleClass(42)
example.display()


Creating instance of ExampleClass with arguments: (42,) and kwargs: {}
Value is 42


In [68]:
import re
import logging
from ulid import ulid
from loguru import logger
class IEvent:
    logger = logger

logger = logging.getLogger(__name__)

event_registry = {}

class WebSocketMessageTextEvent(IEvent):

    @on_event(r'^message/text/image_generation$')
    async def execute(cls, data: dict):
        cls.logger.debug("Running Text to Image service")
        cls.logger.debug("Completed Text to Image service")

    @on_event(r'^message/text/text_generation$')
    async def execute(cls, data: dict):
        cls.logger.debug("Running Text to Speech service")
        cls.logger.debug("Completed Text to Speech service")

def on_event(event_pattern):
    """
    A decorator that registers a function to a regex event pattern.
    """
    def decorator(func):
        logger.debug(f"Registering event: {event_pattern}")
        event_registry[re.compile(event_pattern)] = func
        return func
    return decorator



async def trigger_event(event_name, *args, **kwargs):
    """
    Function to trigger an event by matching the event_name against registered regex patterns.
    """
    logger.debug(f"Triggering event: {event_name}")
    for pattern, func in event_registry.items():
        match = pattern.match(event_name)
        if match:
            logger.debug(f"Event matched: {event_name}")
            # Determine if func is a class method and get the class if needed
            cls = func.__dict__
            await method(cls, *args, **kwargs, **match.groupdict())
    logger.debug(f"No function registered for event '{event_name}'")
    return None




In [69]:
event_registry

{re.compile(r'^message/text/image_generation$',
            re.UNICODE): <function __main__.WebSocketMessageTextEvent.execute(cls, data: dict)>,
 re.compile(r'^message/text/text_generation$',
            re.UNICODE): <function __main__.WebSocketMessageTextEvent.execute(cls, data: dict)>}

In [70]:
func = event_registry[re.compile("^message/text/image_generation$")]
map_ = {
    WebSocketMessageTextEvent.__name__: WebSocketMessageTextEvent
}
obj = map_[func.__qualname__.split(".")[0]]

In [71]:
await func(obj, {'session_id': '12345', 'text': 'Generate an image of a sunset'})

[32m2024-09-12 10:13:45.765[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mexecute[0m:[36m16[0m - [34m[1mRunning Text to Image service[0m
[32m2024-09-12 10:13:45.766[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mexecute[0m:[36m17[0m - [34m[1mCompleted Text to Image service[0m


In [58]:
func.__qualname__.split(".")[0]

'WebSocketMessageTextEvent'

<__main__.WebSocketMessageTextEvent at 0x178c02620>

In [19]:
await trigger_event('message/text/image_generation', data={'session_id': '12345', 'text': 'Generate an image of a sunset'})

TypeError: WebSocketMessageTextEvent.execute() missing 1 required positional argument: 'cls'

In [29]:
from ulid import ulid

class IEvent:
    # Base class for events
    pass

class WebSocketMessageTextEvent(IEvent):
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
    
    @on_event(r'^message/text/image_generation$')
    async def execute_image_generation(self, data: dict):
        self.logger.debug("Running Text to Image service")
        # Your service call here
        self.logger.debug("Completed Text to Image service")

    @on_event(r'^message/text/text_generation$')
    async def execute_text_generation(self, data: dict):
        self.logger.debug("Running Text to Speech service")
        # Your service call here
        self.logger.debug("Completed Text to Speech service")


In [30]:
instance_registry = {}

def register_instance(cls):
    instance = cls()
    for method_name, method in cls.__dict__.items():
        if callable(method) and hasattr(method, '__event__'):
            instance_registry[method] = instance
    return instance

# Register an instance of WebSocketMessageTextEvent
instance = register_instance(WebSocketMessageTextEvent)


In [31]:
async def trigger_event(event_name, *args, **kwargs):
    """
    Function to trigger an event by matching the event_name against registered regex patterns.
    """
    logger.debug(f"Triggering event: {event_name}")
    for pattern, func in event_registry.items():
        match = pattern.match(event_name)
        if match:
            logger.debug(f"Event matched: {event_name}")
            # Check if func is a class method and retrieve the instance
            if func in instance_registry:
                instance = instance_registry[func]
                return await func(instance, *args, **kwargs, **match.groupdict())
            else:
                # Handle cases where func is not associated with an instance
                return await func(*args, **kwargs, **match.groupdict())
    logger.debug(f"No function registered for event '{event_name}'")
    return None

# Example call to trigger an event
await trigger_event('message/text/image_generation', data={'session_id': '12345', 'text': 'Generate an image of a sunset'})


TypeError: WebSocketMessageTextEvent.execute_image_generation() missing 1 required positional argument: 'self'

In [81]:
from start_utils import logger

[32mSeptember-12-2024[0m | [30m12:30:32[0m | [34m[1mDEBUG[0m | [36mInitialising websocket connection store[0m | [35mstart_utils:<module>:24[0m | [33m{}[0m
[32mSeptember-12-2024[0m | [30m12:30:32[0m | [34m[1mDEBUG[0m | [36mInitialising websocket connection store[0m | [35mstart_utils:<module>:24[0m | [33m{}[0m
[32mSeptember-12-2024[0m | [30m12:30:32[0m | [34m[1mDEBUG[0m | [36mInitialising websocket connection store[0m | [35mstart_utils:<module>:26[0m | [33m{}[0m
[32mSeptember-12-2024[0m | [30m12:30:32[0m | [34m[1mDEBUG[0m | [36mInitialising websocket connection store[0m | [35mstart_utils:<module>:26[0m | [33m{}[0m
[32mSeptember-12-2024[0m | [30m12:30:32[0m | [34m[1mDEBUG[0m | [36mInitialising websocket event registry[0m | [35mstart_utils:<module>:28[0m | [33m{}[0m
[32mSeptember-12-2024[0m | [30m12:30:32[0m | [34m[1mDEBUG[0m | [36mInitialising websocket event registry[0m | [35mstart_utils:<module>:28[0m | [33m{}

Loaded as API: https://black-forest-labs-flux-1-schnell.hf.space ✔


[32mSeptember-12-2024[0m | [30m12:30:35[0m | [34m[1mDEBUG[0m | [36mInitialised gradio flux client[0m | [35mstart_utils:<module>:165[0m | [33m{}[0m
[32mSeptember-12-2024[0m | [30m12:30:35[0m | [34m[1mDEBUG[0m | [36mInitialised gradio flux client[0m | [35mstart_utils:<module>:165[0m | [33m{}[0m
[32mSeptember-12-2024[0m | [30m12:30:35[0m | [34m[1mDEBUG[0m | [36mInitialised gradio flux client[0m | [35mstart_utils:<module>:165[0m | [33m{}[0m


In [84]:
from start_utils import logger

router = {}
from services.apis.chat.text_to_speech import TextToSpeechChatService

In [88]:
await TextToSpeechChatService("123").run(**{'data': {'message': 'tell me about Bruce Wayne', 'chat_urn': '4888c131-887b-4ea7-a3a8-31ed84ac019b'}})

[32mSeptember-12-2024[0m | [30m12:33:09[0m | [34m[1mDEBUG[0m | [36mInitializing Initiate Chat API service[0m | [35mservices.apis.chat.abstraction:__init__:22[0m | [33m{'urn': '123'}[0m
[32mSeptember-12-2024[0m | [30m12:33:09[0m | [34m[1mDEBUG[0m | [36mInitializing Initiate Chat API service[0m | [35mservices.apis.chat.abstraction:__init__:22[0m | [33m{'urn': '123'}[0m
[32mSeptember-12-2024[0m | [30m12:33:09[0m | [34m[1mDEBUG[0m | [36mInitializing Initiate Chat API service[0m | [35mservices.apis.chat.abstraction:__init__:22[0m | [33m{'urn': '123'}[0m
[32mSeptember-12-2024[0m | [30m12:33:09[0m | [34m[1mDEBUG[0m | [36mInitializing Initiate Chat API service[0m | [35mservices.apis.chat.text_to_speech:__init__:26[0m | [33m{'urn': '123'}[0m
[32mSeptember-12-2024[0m | [30m12:33:09[0m | [34m[1mDEBUG[0m | [36mInitializing Initiate Chat API service[0m | [35mservices.apis.chat.text_to_speech:__init__:26[0m | [33m{'urn': '123'}[0m
[32

ConnectionError: Error 8 connecting to redis:6379. nodename nor servname provided, or not known.

In [15]:
from gtts import gTTS

In [16]:
tts = gTTS(text="hi i am eva what you doing how can i help you. If you want to process the audio in chunks, you can handle it from the audio_bytes. However, gTTS doesn't provide direct streaming like gen.next(). You would need a different library for real-time TTS streaming (e.g., Google Cloud Text-to-Speech API).", lang="en", slow=False)

In [17]:
gen = tts.stream()

In [1]:
!pip3 install langchain_google_genai


Collecting langchain_google_genai
  Using cached langchain_google_genai-1.0.10-py3-none-any.whl.metadata (3.8 kB)
Collecting google-generativeai<0.8.0,>=0.7.0 (from langchain_google_genai)
  Using cached google_generativeai-0.7.2-py3-none-any.whl.metadata (4.0 kB)
Collecting google-ai-generativelanguage==0.6.6 (from google-generativeai<0.8.0,>=0.7.0->langchain_google_genai)
  Using cached google_ai_generativelanguage-0.6.6-py3-none-any.whl.metadata (5.6 kB)
Collecting google-api-python-client (from google-generativeai<0.8.0,>=0.7.0->langchain_google_genai)
  Using cached google_api_python_client-2.145.0-py2.py3-none-any.whl.metadata (6.7 kB)
Collecting protobuf (from google-generativeai<0.8.0,>=0.7.0->langchain_google_genai)
  Using cached protobuf-4.25.4-cp37-abi3-macosx_10_9_universal2.whl.metadata (541 bytes)
Collecting httplib2<1.dev0,>=0.19.0 (from google-api-python-client->google-generativeai<0.8.0,>=0.7.0->langchain_google_genai)
  Using cached httplib2-0.22.0-py3-none-any.whl.m