# Get Started with AI Agents on Databricks

## Overview

- Start Serverless Warehouse

- Setup the current catalog and schema

- Load a dataset in Databricks
  - https://archive.ics.uci.edu/dataset/410/paper+reviews

- Prepare data

- Generate the embeddings and store in a Databricks table

- Configure a vector search index database on UC
  - Note: The version of the course we created doesn't cover this part. In order to test the concepts seen in the course, we performed the embedding, chunk, and vector search index steps, configuring all the necessary services to handle the dataset we chose to use as the basis for the experiments.

- Chunk the data of interest and store in the vector search index database

- Create a tool function to our AI agent

- Test all implementations

- Generate a `driver` notebook (ready-to-deploy) from the playground

- Run the driver notebook integrating the agent, tools, MLFlow features to our model

- Deploy the code as a service running at in a endpoint





## Initial setups



In [0]:
%sql
-- setup to catalog and schema of studies
use catalog `studies`;
use schema `databricks-studies`;

select current_catalog() as actual_catalog,  current_schema() as actual_schema;

actual_catalog,actual_schema
studies,`databricks-studies`


In [0]:
import warnings
warnings.filterwarnings('ignore')

import json
import re
from tqdm import tqdm

from sklearn.decomposition import PCA
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, FloatType

from databricks.sdk import WorkspaceClient


# create a Spark session if it doesn't exist
spark = SparkSession.builder.getOrCreate()

# instanciate client databricks
client = WorkspaceClient()


# define path of input data available in the Databricks volume
DATASET_PATH = '/Volumes/studies/databricks-studies/ai-agent-volume/reviews.json'

# set if export tables
EXPORT_TABLES_TO_DATABRICKS = False


# load json dataset
with open(DATASET_PATH, 'r', encoding='utf-8') as f:
    dataset = json.load(f)

# make some adjusts to the dataset
df = pd.json_normalize(dataset['paper'])
df.head()

# look at one sample
df['review'][0]

[{'confidence': '4',
  'evaluation': '1',
  'id': 1,
  'lan': 'es',
  'orientation': '0',
  'remarks': '',
  'text': '- El artículo aborda un problema contingente y muy relevante, e incluye tanto un diagnóstico nacional de uso de buenas prácticas como una solución (buenas prácticas concretas). - El lenguaje es adecuado.  - El artículo se siente como la concatenación de tres artículos diferentes: (1) resultados de una encuesta, (2) buenas prácticas de seguridad, (3) incorporación de buenas prácticas. - El orden de las secciones sería mejor si refleja este orden (la versión revisada es #2, #1, #3). - El artículo no tiene validación de ningún tipo, ni siquiera por evaluación de expertos.',
  'timespan': '2010-07-05'},
 {'confidence': '4',
  'evaluation': '1',
  'id': 2,
  'lan': 'es',
  'orientation': '1',
  'remarks': '',
  'text': 'El artículo presenta recomendaciones prácticas para el desarrollo de software seguro. Se describen las mejores prácticas recomendadas para desarrollar softwa


## Prepare data

### Normalize the input dataset

In [0]:
df = pd.json_normalize(
    dataset,
    record_path=['paper', 'review'],  # take columns paper and review; each item in review is mapped to a row.
    meta=[['paper', 'id']],  # add paper.id as column
)

df.rename(columns={'paper.id': 'paper_id'}, inplace=True)

print(df.shape)
df.head()

(405, 9)


Unnamed: 0,confidence,evaluation,id,lan,orientation,remarks,text,timespan,paper_id
0,4,1,1,es,0,,- El artículo aborda un problema contingente y...,2010-07-05,1
1,4,1,2,es,1,,El artículo presenta recomendaciones prácticas...,2010-07-05,1
2,5,1,3,es,1,,- El tema es muy interesante y puede ser de mu...,2010-07-05,1
3,4,2,1,es,1,,Se explica en forma ordenada y didáctica una e...,2010-07-05,2
4,4,2,2,es,0,,,2010-07-05,2


In [0]:
# handles missing values ​​in a simple way
df.text = df.text.apply(lambda x: pd.NA if len(x)==0 else x)
df.dropna(inplace=True)
df.shape

(397, 9)

### Utils

In [0]:
def pandas_to_databricks_table(table_name: str, pandas_df: pd.DataFrame) -> None:
    spark_df = spark.createDataFrame(pandas_df)
    spark_df.write.format('delta').mode('overwrite').saveAsTable(table_name)
    print(spark_df.count)
    display(spark_df.show(5))

### Create tables

In [0]:
df_reviews = df[['id', 'paper_id', 'text', 'timespan']]
print(df_reviews.shape)
df_reviews.head()

(397, 4)


Unnamed: 0,id,paper_id,text,timespan
0,1,1,- El artículo aborda un problema contingente y...,2010-07-05
1,2,1,El artículo presenta recomendaciones prácticas...,2010-07-05
2,3,1,- El tema es muy interesante y puede ser de mu...,2010-07-05
3,1,2,Se explica en forma ordenada y didáctica una e...,2010-07-05
5,3,2,Los autores describen una metodología para des...,2010-07-05


In [0]:
if EXPORT_TABLES_TO_DATABRICKS:
    pandas_to_databricks_table(table_name='db_paper_reviews', pandas_df=df)

In [0]:
if EXPORT_TABLES_TO_DATABRICKS:
    pandas_to_databricks_table(table_name='paper_reviews', pandas_df=df_reviews)

In [0]:
# Chunking do texto

def chunk_text(text, chunk_size=200, overlap=50):
    chunks = []
    start = 0
    text_length = len(text)

    while start < text_length:
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap

    return chunks

rows = []
CHUNK_SIZE = 100
OVERLAP = 10

for _, row in df_reviews.iterrows():
    chunks = chunk_text(row['text'], chunk_size=CHUNK_SIZE, overlap=OVERLAP)
    for i, chunk in enumerate(chunks):
        rows.append({
            'review_uri': f"paper_{row['paper_id']}_review_{row['id']}_chunk_{i}",
            'paper_id': row['paper_id'],
            'review_id': row['id'],
            'chunked_text': chunk,
            'chunk_size': len(chunk),
            'timespan': row['timespan']
        })

df_chunks = pd.DataFrame(rows)
print(df_chunks.shape)
df_chunks.head()

(4699, 6)


Unnamed: 0,review_uri,paper_id,review_id,chunked_text,chunk_size,timespan
0,paper_1_review_1_chunk_0,1,1,- El artículo aborda un problema contingente y...,100,2010-07-05
1,paper_1_review_1_chunk_1,1,1,co nacional de uso de buenas prácticas como un...,100,2010-07-05
2,paper_1_review_1_chunk_2,1,1,l lenguaje es adecuado. - El artículo se sien...,100,2010-07-05
3,paper_1_review_1_chunk_3,1,1,"iferentes: (1) resultados de una encuesta, (2)...",100,2010-07-05
4,paper_1_review_1_chunk_4,1,1,ación de buenas prácticas. - El orden de las s...,100,2010-07-05


In [0]:
df_chunks.chunk_size.value_counts(dropna=False)

chunk_size
100    4240
9        14
99       14
64       10
2         9
       ... 
82        1
68        1
84        1
76        1
26        1
Name: count, Length: 97, dtype: int64

In [0]:
# drop rows with chunk_size not equal
CHUNK_SIZE = 100
df_chunks = df_chunks[df_chunks['chunk_size'] == CHUNK_SIZE]
print(df_chunks.shape)
df_chunks.head()


(4240, 6)


Unnamed: 0,review_uri,paper_id,review_id,chunked_text,chunk_size,timespan
0,paper_1_review_1_chunk_0,1,1,- El artículo aborda un problema contingente y...,100,2010-07-05
1,paper_1_review_1_chunk_1,1,1,co nacional de uso de buenas prácticas como un...,100,2010-07-05
2,paper_1_review_1_chunk_2,1,1,l lenguaje es adecuado. - El artículo se sien...,100,2010-07-05
3,paper_1_review_1_chunk_3,1,1,"iferentes: (1) resultados de una encuesta, (2)...",100,2010-07-05
4,paper_1_review_1_chunk_4,1,1,ación de buenas prácticas. - El orden de las s...,100,2010-07-05


In [0]:
# set maximum rows for test
MAX_ROWS = 30
if MAX_ROWS:
    df_chunks = df_chunks[:MAX_ROWS]

df_chunks.shape

(30, 6)

In [0]:
def normalize_texts(texts):
    normalized = []
    for t in texts:
        if t is None:
            continue
        
        t = str(t).strip()
        t = re.sub(r'\s+', ' ', t) 
        t = t.replace('\n', ' ').replace('\r', ' ')
        
        if t:
            normalized.append(t)
            
    return normalized

In [0]:
def get_embeddings(texts):
    texts = normalize_texts(texts)
    if not texts:
        return []

    res = client.serving_endpoints.query(
        name='databricks-bge-large-en',  # embedding model
        input=texts
    )

    # res.data = list of objects EmbeddingsV1ResponseEmbeddingElement
    data = getattr(res, 'data', None) or getattr(res, 'outputs', None)

    # extract embedding vector
    embeddings = [item.embedding for item in data]

    return embeddings

In [0]:
pca = PCA(
    n_components=10,
    random_state=42
)

In [0]:
batch_size = 64
all_embeddings = []

texts = df_chunks['chunked_text'].tolist()

for i in tqdm(range(0, len(texts), batch_size), desc="Processing batches"):
    batch = texts[i:i + batch_size]
    emb_high_dim = get_embeddings(batch)
    all_embeddings.extend(emb_high_dim)



Processing batches:   0%|          | 0/1 [00:00<?, ?it/s]Processing batches: 100%|██████████| 1/1 [00:00<00:00,  1.60it/s]Processing batches: 100%|██████████| 1/1 [00:00<00:00,  1.59it/s]


In [0]:
all_embeddings_low_dim = pca.fit_transform(np.array(emb_high_dim))
df_chunks['embedding'] = all_embeddings_low_dim.tolist()
print(df_chunks.shape)
df_chunks.head()

(30, 7)


Unnamed: 0,review_uri,paper_id,review_id,chunked_text,chunk_size,timespan,embedding
0,paper_1_review_1_chunk_0,1,1,- El artículo aborda un problema contingente y...,100,2010-07-05,"[0.26134172656782684, -0.011528883982242182, -..."
1,paper_1_review_1_chunk_1,1,1,co nacional de uso de buenas prácticas como un...,100,2010-07-05,"[-0.014583218901199907, 0.05389815806990671, 0..."
2,paper_1_review_1_chunk_2,1,1,l lenguaje es adecuado. - El artículo se sien...,100,2010-07-05,"[0.26491707201075443, -0.14089873106671316, 0...."
3,paper_1_review_1_chunk_3,1,1,"iferentes: (1) resultados de una encuesta, (2)...",100,2010-07-05,"[-0.016062538892716374, 0.2297889699240106, -0..."
4,paper_1_review_1_chunk_4,1,1,ación de buenas prácticas. - El orden de las s...,100,2010-07-05,"[-0.08497948844165329, 0.13448040350317578, 0...."


In [0]:
len(df_chunks.embedding[0])

10

In [0]:
spark_df_chunks = spark.createDataFrame(df_chunks)
spark_df_chunks = spark_df_chunks.withColumn(
    'embedding', 
    spark_df_chunks['embedding'].cast(ArrayType(FloatType()))
)

display(spark_df_chunks.show(10))

+--------------------+--------+---------+--------------------+----------+----------+--------------------+
|          review_uri|paper_id|review_id|        chunked_text|chunk_size|  timespan|           embedding|
+--------------------+--------+---------+--------------------+----------+----------+--------------------+
|paper_1_review_1_...|       1|        1|- El artículo abo...|       100|2010-07-05|[0.26134172, -0.0...|
|paper_1_review_1_...|       1|        1|co nacional de us...|       100|2010-07-05|[-0.014583219, 0....|
|paper_1_review_1_...|       1|        1|l lenguaje es ade...|       100|2010-07-05|[0.26491708, -0.1...|
|paper_1_review_1_...|       1|        1|iferentes: (1) re...|       100|2010-07-05|[-0.01606254, 0.2...|
|paper_1_review_1_...|       1|        1|ación de buenas p...|       100|2010-07-05|[-0.08497949, 0.1...|
|paper_1_review_1_...|       1|        1|la versión revisa...|       100|2010-07-05|[0.24280892, -0.0...|
|paper_1_review_2_...|       1|        2|El ar

In [0]:
# export as delta table
if EXPORT_TABLES_TO_DATABRICKS:
  pandas_to_databricks_table(table_name='paper_reviews_embeddings', pandas_df=spark_df_chunks.toPandas())

## Create vector search index

### Check if the vector search is available

In [2]:
#%sql
#show functions like 'vector_search%';

In [0]:
%sql
describe function vector_search;

function_desc
Function: vector_search
Class: com.databricks.sql.catalyst.plans.logical.ai.VectorSearchPlanBuilder
"Usage: vector_search() - Performs a KNN vector search against the specified vector index.  Required parameters are `index`, which is the fully-qualified vector index  name, and either `query_text` for the query text or `query_vector` for the  query embedding.  Optionally, `num_results` specifies the maximum number of output rows,  which defaults to 10. Currently, only the DELTA_SYNC index  type is supported.  Also optionally, `query_type` specifies the type of query to perform,  It supports two types of queries: `ANN` and `HYBRID`. It defaults to `ANN`.  As a legacy, the `query` parameter name (equivalent to `query_text`)  remains supported for backward compatibility."


In [0]:
%sql
select current_catalog(), current_schema();


current_catalog(),current_schema()
studies,`databricks-studies`


In [0]:
# %sql
# -- check if the table `paper_reviews_embeddings` is in the catalog
# describe table extended paper_reviews_embeddings;

col_name,data_type,comment
review_uri,string,
paper_id,bigint,
review_id,bigint,
chunked_text,string,
chunk_size,bigint,
timespan,string,
embedding,array,
,,
# Delta Statistics Columns,,
Column Names,"review_id, chunk_size, embedding, paper_id, timespan, review_uri, chunked_text",


**Note:** If the code below generate error, then create a vector search index (`paper_reviews_index`) 
          via UI from `paper_reviews_embeddings`

In [0]:
# %sql
# CREATE INDEX paper_reviews_index
# ON  studies.`databricks-studies`.paper_reviews_embeddings
# (embedding);


![image_1769798726513.png](./imgs/image_1769798726513.png "image_1769798726513.png")

*Create the vector search index table via user interface*

In [0]:
. remember of create vector search index (paper_reviews_index) via UI from paper_reviews_embeddings



#### Semantic search on the vector search index

In [0]:
# %pip install databricks-vectorsearch
# dbutils.library.restartPython()

In [0]:
# https://docs.databricks.com/gcp/en/vector-search/query-vector-search?language=Python%C2%A0SDK%C2%A0standard%C2%A0endpoint

from databricks.vector_search.client import VectorSearchClient

client_vector_search = VectorSearchClient(
    disable_notice=True,
)

index = client_vector_search.get_index(index_name='studies.databricks-studies.paper_review_index')


In [0]:
def semantic_search(query_text: str, top_k: int) -> pd.DataFrame:
    text_embedding_high_dim = get_embeddings([query_text])
    print(f'dim embedding: {len(text_embedding_high_dim[0])}')
    text_embedding_low_dim = pca.transform(text_embedding_high_dim)
    print(f'dim embedding-pca: {len(text_embedding_low_dim[0])}')

    cols = ['review_uri','paper_id', 'review_id', 'chunked_text', 'timespan', 'embedding']
    results = index.similarity_search(
        query_vector=text_embedding_low_dim.flatten().tolist(),
        columns=cols,
        num_results=top_k,
        disable_notice=True
    )
    res = pd.DataFrame(results['result']['data_array'], columns=cols+['score'])
    display(res)
    return res

In [0]:
input_text = 'recomendaciones prácticas para el desarrollo de software seguro'
df_res = semantic_search(query_text=input_text, top_k=5)

dim embedding: 1024
dim embedding-pca: 10


review_uri,paper_id,review_id,chunked_text,timespan,embedding,score
paper_1_review_2_chunk_0,1.0,2.0,El artículo presenta recomendaciones prácticas para el desarrollo de software seguro. Se describen l,2010-07-05,"List(-0.4820484, -0.07462992, -0.061202534, 0.09068968, 0.08700089, 0.12164772, -0.012149201, 0.04295415, 0.029270861, 0.026877701)",0.9961932
paper_1_review_3_chunk_2,1.0,3.0,arrollo de software seguro. - El “estado real del desarrollo de software en Chile” (como lo indica,2010-07-05,"List(-0.32006934, -0.11861887, -0.16354212, 0.032957304, 0.10105367, 0.17232092, 0.025759658, 0.0022093921, -0.09554255, 0.081382155)",0.9446371
paper_1_review_3_chunk_4,1.0,3.0,"Presenta nueve tablas que corresponden a las prácticas para el desarrollo de software seguro, pero l",2010-07-05,"List(-0.3623627, -0.017432267, 0.060682572, 0.21123499, -5.8355083E-4, 0.049791727, -0.040910758, -0.026283322, 0.06390793, -0.0031450056)",0.9347983
paper_1_review_2_chunk_1,1.0,2.0,escriben las mejores prácticas recomendadas para desarrollar software que sea proactivo ante los ata,2010-07-05,"List(-0.36486065, -0.0050330977, 0.070533745, 0.10436703, -0.02715171, -0.18610764, -0.03701538, -0.06801516, -0.08843145, -0.021011177)",0.8626408
paper_1_review_2_chunk_2,1.0,2.0,"te los ataques, y se realiza un análisis de costos de estas prácticas en desarrollo de software. Tod",2010-07-05,"List(-0.24807511, -0.06967893, -0.04766032, 0.075543016, 0.028206872, -0.05817712, -0.005347007, -0.20737253, -0.15868239, -0.031968225)",0.84880817


In [0]:
.



![res_vector_search.png](./imgs/res_vector_search.png "res_vector_search.pnd")

### Create SQL functions

In [0]:
%sql
create or replace function search_papers_reviews(
	search_term string comment 'Search paper review of your interest'
)
returns table
comment 'Search using vector search to retrieve relevant excerpts of reviews of papers of your interest.'
return(
	select chunked_text, timespan,
    review_uri as idx	
	from
		vector_search(
			index => 'studies.databricks-studies.paper_review_index',
			query_vector => ARRAY(0.12, 0.34, 0.56, 0.12, 0.33,  0.50, 0.88, 0.74, 0.33, 0.73), -- embedding fake,
			num_results => 1
		)
);
	
	

![image_1769806856730.png](./imgs/image_1769806856730.png "image_1769806856730.png")

*Check the expected results*

In [0]:
%sql
-- test the search_papers_review function

select chunked_text, timespan
from search_papers_reviews('software seguro')

chunked_text,timespan
"la versión revisada es #2, #1, #3). - El artículo no tiene validación de ningún tipo, ni siquiera po",2010-07-05


In [0]:
# chunked_text: 
# "la versión revisada es #2, #1, #3). - El artículo no tiene 
#  validación de ningún tipo, ni siquiera po"

# (translation to pt_br) 
# A versão revisada é a nº 2, nº 1 e nº 3. - O artigo não possui 
# qualquer tipo de validação, nem mesmo por parte de...

### Adding a tool to our AI agent


![image_1769807485841.png](./imgs/image_1769807485841.png "image_1769807485841.png")

### Interacting with the AI ​​agent equipped with the tool



![interaction_with_tool_to_ai_agent_sql_search_function.png](./imgs/interaction_with_tool_to_ai_agent_sql_search_function.png "interaction_with_tool_to_ai_agent_sql_search_function.png")

![answer_of_the_ai_agent_with_sql_tool_function.png](./imgs/answer_of_the_ai_agent_with_sql_tool_function.png "answer_of_the_ai_agent_with_sql_tool_function.png")

## Serving the model with MLFlow



![prepare_to_deploy.png](./imgs/prepare_to_deploy.png "prepare_to_deploy.png")


![databricks_mlflow_deploy.png](./imgs/databricks_mlflow_deploy.png "databricks_mlflow_deploy.png")


![deploy_agent_with_tool.png](./imgs/deploy_agent_with_tool.png "deploy_agent_with_tool.png")
