In [1]:
import pyaida
from IPython.display import Markdown
from pyaida import Runner, AbstractModel
from pyaida import pg
from pyaida.core.parsing.email import HtmlEmail
from pyaida.core.data.AbstractModel import MetaModel
repo = pg.repository(MetaModel)
#repo.register()
#Markdown(HtmlEmail._get_system_prompt_markdown())

In [3]:
"""test case ensure we can also transfer - this is important because its our schema extension"""
meta =HtmlEmail.to_meta_model()
M = AbstractModel.create_from_meta_model(meta)
#meta.model_dump()
M.model_fields



{'id': FieldInfo(annotation=str, required=True),
 'description': FieldInfo(annotation=str, required=True, description='The summary or abstract of the resource', json_schema_extra={'embedding_provider': 'openai.text-embedding-ada-002'}),
 'content': FieldInfo(annotation=Any, required=False, default=None),
 'sender': FieldInfo(annotation=str, required=True),
 'receiver': FieldInfo(annotation=str, required=True),
 'subject': FieldInfo(annotation=str, required=True),
 'date': FieldInfo(annotation=str, required=True)}

In [28]:
from pydantic import BaseModel, create_model, Field
from typing import Any, Dict, Type, Union

def resolve_ref(schema: Dict[str, Any], ref: str) -> Dict[str, Any]:
    """Resolve $ref to its definition in the schema."""
    ref_path = ref.lstrip("#/").split("/")
    resolved = schema
    for part in ref_path:
        resolved = resolved.get(part, {})
    return resolved

def build_pydantic_model(schema: Dict[str, Any], 
                         definitions: Dict[str, Any] = None, 
                         model_functions:dict=None, 
                         model_namespace:str=None) -> Type[BaseModel]:
    """
    Recursively generate Pydantic models from a JSON Schema.
    """
    if definitions is None:
        definitions = schema.get("definitions", {})

    name = schema['title']
    properties = schema.get("properties", {})
    required_fields = set(schema.get("required", []))
    model_fields = {}

    # Iterate over properties and resolve fields
    for field_name, field_info in properties.items():
        if "$ref" in field_info:
            # Resolve the $ref and recursively build a model
            ref_schema = resolve_ref({"definitions": definitions}, field_info["$ref"])
            sub_model = build_pydantic_model(ref_schema, definitions, name=field_name.capitalize())
            field_type = sub_model
        elif field_info.get("type") == "object":
            # Handle nested object
            field_type = build_pydantic_model(field_info, definitions, name=field_name.capitalize())
        elif field_info.get("type") == "array":
            # Handle arrays (assumes single-type arrays)
            items = field_info.get("items", {})
            if "$ref" in items:
                ref_schema = resolve_ref({"definitions": definitions}, items["$ref"])
                field_type = list[build_pydantic_model(ref_schema, definitions, name=field_name.capitalize())]
            else:
                field_type = list
        else:
            # Map simple types
            field_type = {
                "string": str,
                "integer": int,
                "boolean": bool,
                "number": float,
                "array": list,
                "object": dict,
            }.get(field_info.get("type"), Any)

        # Add field with description
        description = field_info.get("description", None)
        if field_name in required_fields:
            model_fields[field_name] = (field_type, Field(..., description=description))
        else:
            model_fields[field_name] = (field_type, Field(None, description=description))

    # Create the model dynamically
    model =  create_model(name, **model_fields)
    
    model.__doc__ = schema.get('description')
    class Config:
        namespace: str = model_namespace
        functions: dict= model_functions
            
    model.Config = Config
    return model

# Example JSON schema with complex types
json_schema = MetaModel.model_json_schema()

# Build the Pydantic model
Model = build_pydantic_model(json_schema)

Model.__doc__


'the meta model is a persisted version of a concrete model\nthis can be saved and reloaded from the database and is used for agents'

In [27]:
Model.Config.namespace

In [18]:
HtmlEmail.model_json_schema()

{'description': 'You are an email and newsletter agent. If asked about emails or newsletters you can run a search to answer the users question.\n    \n    ',
 'properties': {'id': {'format': 'uuid', 'title': 'Id', 'type': 'string'},
  'description': {'description': 'The summary or abstract of the resource',
   'embedding_provider': 'openai.text-embedding-ada-002',
   'title': 'Description',
   'type': 'string'},
  'content': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
   'default': None,
   'title': 'Content'},
  'sender': {'title': 'Sender', 'type': 'string'},
  'receiver': {'title': 'Receiver', 'type': 'string'},
  'subject': {'title': 'Subject', 'type': 'string'},
  'date': {'title': 'Date', 'type': 'string'}},
 'required': ['id', 'description', 'sender', 'receiver', 'subject', 'date'],
 'title': 'HtmlEmail',
 'type': 'object'}

In [2]:
Markdown(MetaModel._get_system_prompt_markdown())


# Model details
name: MetaModel

## System prompt
The Meta Model is the abstract representation of models or agents as can be saved or shared declaratively
    
## Structured Response Objects (if required)
### 1. _MetaField

#### Required fields - name (str):  
 - type (str):  
 - description (str):  
 - embedding_provider (str):  
 - default (str):  


            
### 2. MetaModel

#### Required fields - id (str):  
 - description (str): System prompt or other overview description 
 - name (str):  
 - namespace (str): An optional namespace 
 - functions (dict): A mapping of functions to use. simply map the function id to a description in a flat dict 
 - key_field (str): The primary key field - convention is to simply use id 
 - fields (_MetaField): The fields and their properties 


            

In [3]:
# import typing
# from pyaida.core.data.sql.helper import SqlHelper
# from pyaida.core.utils import inspection
# for k,v in typing.get_type_hints(MetaModel).items():
#     #v = inspection.get_innermost_args(v)
#     print(k, v, SqlHelper.pydantic_to_postgres_type(v))

In [12]:
MetaModel.model_json_schema()

{'$defs': {'_MetaField': {'properties': {'name': {'title': 'Name',
     'type': 'string'},
    'type': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
     'default': 'str',
     'title': 'Type'},
    'description': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
     'default': None,
     'title': 'Description'},
    'embedding_provider': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
     'default': None,
     'title': 'Embedding Provider'},
    'default': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
     'default': None,
     'title': 'Default'}},
   'required': ['name'],
   'title': '_MetaField',
   'type': 'object'}},
 'description': 'the meta model is a persisted version of a concrete model\nthis can be saved and reloaded from the database and is used for agents',
 'properties': {'id': {'title': 'Id', 'type': 'string'},
  'description': {'default': '',
   'description': 'System prompt or other overview description',
   'title': 'Description',
   'type': 'string'},
  '

In [4]:
d = {'id': 'website_summary_agent', 'description': 'An agent to check websites and summarize their content using the external function endpoint_content_get.', 'name': 'Website Summary Agent', 'namespace': 'web_tools', 'functions': {'endpoint_content_get': 'Fetches and summarizes content from a given URL.'}, 'key_field': 'id', 'fields': [{'name': 'name', 'description': 'The name of the website or content.', 'embedding_provider': 'default', 'default': ''}, {'name': 'url', 'description': 'The URL of the website to be summarized.', 'embedding_provider': 'default', 'default': ''}, {'name': 'description', 'description': 'A brief summary of the website content.', 'embedding_provider': 'default', 'default': ''}, {'name': 'authors', 'description': 'The authors of the website content.', 'embedding_provider': 'default', 'default': ''}]}
m = MetaModel(**d)
m.model_dump()
repo.update_records(m)

[{'id': 'website_summary_agent',
  'description': 'An agent to check websites and summarize their content using the external function endpoint_content_get.',
  'name': 'Website Summary Agent',
  'namespace': 'web_tools',
  'functions': {'endpoint_content_get': 'Fetches and summarizes content from a given URL.'},
  'key_field': 'id',
  'fields': [{'name': 'name',
    'type': 'str',
    'description': 'The name of the website or content.',
    'embedding_provider': 'default',
    'default': ''},
   {'name': 'url',
    'type': 'str',
    'description': 'The URL of the website to be summarized.',
    'embedding_provider': 'default',
    'default': ''},
   {'name': 'description',
    'type': 'str',
    'description': 'A brief summary of the website content.',
    'embedding_provider': 'default',
    'default': ''},
   {'name': 'authors',
    'type': 'str',
    'description': 'The authors of the website content.',
    'embedding_provider': 'default',
    'default': ''}],
  'created_at': date

In [5]:
MetaModel.model_fields

{'id': FieldInfo(annotation=str, required=True),
 'description': FieldInfo(annotation=str, required=False, default='', description='System prompt or other overview description'),
 'name': FieldInfo(annotation=str, required=True),
 'namespace': FieldInfo(annotation=str, required=False, default='public', description='An optional namespace'),
 'functions': FieldInfo(annotation=Union[dict, NoneType], required=False, default={}, description='A mapping of functions to use. simply map the function id to a description in a flat dict'),
 'key_field': FieldInfo(annotation=Union[str, NoneType], required=False, default='id', description='The primary key field - convention is to simply use id'),
 'fields': FieldInfo(annotation=Union[List[_MetaField], NoneType], required=True, description='The fields and their properties')}

In [6]:
from pydantic.fields import FieldInfo
FieldInfo (annotation=str, default=None, descrtion='test', json_schema_extra={'embedding_provider':'default'})

FieldInfo(annotation=str, required=False, default=None, json_schema_extra={'embedding_provider': 'default'})

In [9]:
M = AbstractModel.create_from_meta_model(m)
M.model_json_schema()

{'properties': {'name': {'default': '',
   'description': 'The name of the website or content.',
   'title': 'Name',
   'type': 'string'},
  'url': {'default': '',
   'description': 'The URL of the website to be summarized.',
   'title': 'Url',
   'type': 'string'},
  'description': {'default': '',
   'description': 'A brief summary of the website content.',
   'title': 'Description',
   'type': 'string'},
  'authors': {'default': '',
   'description': 'The authors of the website content.',
   'title': 'Authors',
   'type': 'string'}},
 'title': 'Website_Summary_Agent',
 'type': 'object'}

In [10]:
r = Runner(M)
r("What is your prime objective")



'My prime objective is to check websites and summarize their content using the external function endpoint_content_get.'

In [5]:
#add in plan over multiple database agents and log and test more crud for aidb / add user ids on all crud for multi tenency 
#then we need to start thinking about background tasks and query planners-the trick will be to enable a sort of smart index that covers all bases -> plan sproc 
from pyaida.core.data.AbstractModel import MetaModel

r = Runner(MetaModel)
a = r.run("Create and save an agent to check websites and summarize their content using the external function endpoint_content_get. You should return an object that has the name, url, description, authors ")
Markdown(a)
#Markdown(pyaida.ask("Create an agent to check websites and summarize their content using the external function endpoint_content_get. You should return an object that has the name, url, description, authors ", model=MetaModel))

[32m2024-12-15 17:26:42.668[0m | [34m[1mDEBUG   [0m | [36mpyaida.core.lang.Runner[0m:[36minvoke[0m:[36m155[0m - [34m[1mfunction_call=FunctionCall(name='f73fc_MetaModel_save', arguments={'data': {'id': 'website_summary_agent', 'description': 'An agent to check websites and summarize their content using the external function endpoint_content_get.', 'name': 'Website Summary Agent', 'namespace': 'web_tools', 'functions': {'endpoint_content_get': 'Fetches the content of a given URL.'}, 'key_field': 'id', 'fields': [{'name': 'name', 'description': 'The name of the website or content.', 'embedding_provider': 'default', 'default': ''}, {'name': 'url', 'description': 'The URL of the website to be checked.', 'embedding_provider': 'default', 'default': ''}, {'name': 'description', 'description': 'A summary of the website content.', 'embedding_provider': 'default', 'default': ''}, {'name': 'authors', 'description': 'The authors of the website content.', 'embedding_provider': 'default'

The agent has been successfully created and saved. It is designed to check websites and summarize their content using the external function `endpoint_content_get`. The object includes fields for name, url, description, and authors.

In [6]:
data = pg.repository(MetaModel).select_to_model()
data[0].model_dump()

{'id': 'website_summary_agent',
 'description': 'An agent to check websites and summarize their content using the external function endpoint_content_get.',
 'name': 'Website Summary Agent',
 'namespace': 'web_tools',
 'functions': {'endpoint_content_get': 'Fetches the content of a given URL.'},
 'key_field': 'id',
 'fields': [{'name': 'name',
   'description': 'The name of the website or content.',
   'embedding_provider': 'default',
   'default': ''},
  {'name': 'url',
   'description': 'The URL of the website to be checked.',
   'embedding_provider': 'default',
   'default': ''},
  {'name': 'description',
   'description': 'A summary of the website content.',
   'embedding_provider': 'default',
   'default': ''},
  {'name': 'authors',
   'description': 'The authors of the website content.',
   'embedding_provider': 'default',
   'default': ''}]}

In [8]:
import json
from ast import literal_eval
literal_eval(data[1]['fields'])

{'{"name": "authors", "description": "The authors of the website content.", "embedding_provider": "default", "default": ""}',
 '{"name": "description", "description": "A brief summary of the website content.", "embedding_provider": "default", "default": ""}',
 '{"name": "name", "description": "The name of the website or content.", "embedding_provider": "default", "default": ""}',
 '{"name": "url", "description": "The URL of the website to be summarized.", "embedding_provider": "default", "default": ""}'}

In [None]:

Markdown(pyaida.ask("What emails do we have related to culture, history and human cultivation or cosmographia", model=HtmlEmail))

In [None]:
Markdown(pyaida.ask("What emails do we have related to AI", model=HtmlEmail))

In [None]:
#!pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
#!pip uninstall -y pycrypto &  pip install pycryptodome
import os.path
import base64
import google.auth
import tqdm
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from datetime import datetime, timedelta

import pyaida



# If modifying these SCOPES, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
def fetch_gmail_since(limit=50, domain_filters=None, start_date=None, sender_domain=None):
    """Shows basic usage of the Gmail API.
    Lists the user's Gmail labels.
    """
    creds = None
    # The file token.json stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first
    # time.
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                '/Users/sirsh/Documents/client_secret_628072079999-9jn030r6d13vfk8mjsp3kpsv2fkijqvm.apps.googleusercontent.com.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # Save the credentials for the next run
        with open('token.json', 'w') as token:
            token.write(creds.to_json())

    try:
        service = build('gmail', 'v1', credentials=creds)
        today = datetime.now().date()
        start_date = datetime.strptime(start_date, '%Y-%m-%d').date() if start_date else today
        all_messages = [] 
        current_date = start_date

        while current_date < today:
            next_date = current_date + timedelta(days=30)
            if next_date > today:
                next_date = today  # Ensure we don't go beyond today

            from_date = current_date.strftime('%Y/%m/%d')
            to_date = next_date.strftime('%Y/%m/%d')
            query = f'after:{from_date} before:{to_date}'
            if sender_domain:
                query += f' from:@{sender_domain}'

            print(f"Fetching messages for: {from_date} to {to_date}")

            next_page_token = None
        
            while True:  # Pagination loop
                results = service.users().messages().list(userId='me', q=query, maxResults=limit,pageToken = next_page_token).execute()
                messages = results.get('messages', [])
                print(f'chunking... {next_page_token}')
                for message in tqdm.tqdm(messages):
                    
                    message = service.users().messages().get(userId='me', id=message['id'], format='raw').execute()
                    m = HtmlEmail.parse_raw_to_html(message['raw'])
                    print(m.sender)
                    if domain_filters:
                        for d in domain_filters:
                            if d in m.sender:
                                all_messages.append(m)
                    else:
                        all_messages.append(m)
                        
                next_page_token = results.get('nextPageToken')
                print([m.sender for m in all_messages[-2:]])
                repo.update_records(all_messages)
                all_messages = []
                if not next_page_token:
                    break  # No more pages for this query
                    
                
            # Move to the next 30-day window
            current_date = next_date

        return data
    except HttpError as error:
        print(f'An error occurred: {error}')
        
m = fetch_gmail_since(start_date='2024-09-01', sender_domain='substack.com')
m[0]

In [None]:
from IPython.display import Markdown
import pyaida
from pyaida import Runner, AbstractModel
from pyaida.core.data.AbstractModel import MetaModel
from pyaida.core.lang.functions import FunctionModel, Function
from pyaida.core.lang import FunctionManager
from pyaida import pg

In [None]:
#pg.repository(MetaModel).register()

# The most basic thing 
- you should be able to do is create an object and run
- the object has a system prompt and some functions including externs
- later we can load these objects from the database

In [None]:
class MyAgent(AbstractModel):
    """You are an agent that calls the functions the user suggests to get the answer"""
    
    class Config:
        functions: dict = {
            "describe_url_get": "A function to get the details at some url"
        }
    name: str
    description: str
        
    @classmethod
    def do_the_thing(cls, the_input:str):
        """
        This function does the thing - if the user supplies no value you can pass in any input you like as this is for testing
        
        Args:
            the_input: pass in any input to do the thing - choose any value if none supplied by user
        """
        if the_input is None:
            return "You called the function without any parameters - please call it again and supply ANY value without asking the user for help"
        return f"You have called the function with the input [{the_input}]"

from IPython.display import Markdown

Markdown(MyAgent._get_system_prompt_markdown())

In [None]:
MyAgent.to_meta_model()

In [None]:
pg.repository(MetaModel).update_records(MyAgent.to_meta_model())

## Get an instance of the runner and check its default functions
- it has runner functions and also any that are defined on the agent
- check the spec (naming must be global and openai friendly)
- and check the callable

In [None]:
r=Runner(MyAgent)
r

In [None]:
r.functions

In [None]:
r.functions['90cc5_MyAgent_do_the_thing'].to_json_spec()

In [None]:
r.functions['90cc5_MyAgent_do_the_thing'](the_input='test it')

## Next test that the agent runs and calls the function

In [None]:
r("what parameters are specified for the function defintion of `do_the_thing`")

In [None]:
r("please do the thing")

In [None]:
r.messages.model_dump()

## Now we want to understand how external functions are loaded
- discover on other object
- discover on API

In [None]:
uri = 'http://127.0.0.1:8002/openapi.json'
from pyaida.core.parsing.openapi import OpenApiSpec

s = OpenApiSpec(uri)

s._endpoint_methods

In [None]:
"""the function manager can now use the proxy to call functions"""

from pyaida.core.lang import FunctionManager

f = FunctionManager()
"""function managers can load API proxies - by default the pyaida api is used"""
f.proxy.endpoints

In [None]:
f.proxy.invoke_function('get_resources__get')[0]

In [None]:
"""test that the spec matches the correct format"""
f.proxy.get_resources__get.spec

In [None]:
"""we can access the operation as an attribute so that the proxy is more object like"""
f.proxy.get_resources__get()[0]

In [None]:
"""we now need to check activation
1. by default the pyaida should be used
2. we should be able to qualify an API and there should be a lookup in the database to gets its url and token
"""

added_functions = f.add_functions_by_key('get_resources__get')
#cjec call , no params -> added_functions[0]()
added_functions

In [None]:
added_functions[0].to_json_spec(), added_functions[0].hashed_qualified_name

In [None]:
"""now check that we can also load libray functions e.g. any agent in the code base has functions"""

added_functions = f.add_functions_by_key('system.Plan.test_plan')
#cjec call , no params -> added_functions[0](context='test')
added_functions

In [None]:
added_functions[0].to_json_spec(), added_functions[0].hashed_qualified_name

In [None]:
"""now test it in the runner - if the function manager can add functions and they are callable
the agent should be able to also
"""
from IPython.display import Markdown

Markdown(r("Please call the function get_resources__get and tell me what you find"))

# Part 2 - reading models from the database
- we just check the basic binding and the ability to create new objects via the API
- then we swtich to a planning mode which will flex the postgres modalities 

In [None]:
from IPython.display import Markdown
import pyaida
from pyaida import Runner, AbstractModel
from pyaida.core.lang.functions import FunctionModel, Function
from pyaida.core.lang import FunctionManager
from pyaida import pg

In [None]:
#pg.repository(FunctionModel).register(plan=False)

In [None]:
pg.execute("SELECT * FROM FunctionModel")

In [None]:
import uuid

def test_function(arg:str):
    """some test function
    Args:
        arg: description
    """
    return arg
test_function.__name__
fn = FunctionModel.from_function(test_function)
fn
#pg.update_records(fns)

In [None]:
pg.repository(FunctionModel).update_records(fn)

In [None]:
pg.repository(FunctionModel).load_model_from_key('test_function')

In [None]:
op_uid = None
model = pyaida.pg.repository(FunctionModel).select(op_uid) 
print(model.Config.functions)
Markdown(model._get_system_prompt_markdown())

In [None]:
uuid.uuid1()