In [0]:
df = spark.table('workspace.default.fomc_presconf_10years').filter('value is not null and value != "" and value != " "')

display(df.show(5, truncate=False))

+------------------------------------------------------------------------------------------+
|value                                                                                     |
+------------------------------------------------------------------------------------------+
|January 30, 2019  Chairman Powell’s Press C onference  FINAL                              |
|Page 1 of 24  Transcript of Chair man  Powell’s Press Conference                          |
|January 30, 2019                                                                          |
|CHAIRMAN POWELL .  Good afternoon, everyone, and welcome.  I will start with a            |
|recap of our discussions, including our assessment of the outlook for the economy and the |
+------------------------------------------------------------------------------------------+
only showing top 5 rows



In [0]:
pdf = df.toPandas()

display(len(pdf))
display(pdf.head())

536

value
"January 30, 2019 Chairman Powell’s Press C onference FINAL"
Page 1 of 24 Transcript of Chair man Powell’s Press Conference
"January 30, 2019"
"CHAIRMAN POWELL . Good afternoon, everyone, and welcome. I will start with a"
"recap of our discussions, including our assessment of the outlook for the economy and the"


In [0]:
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=100,
    chunk_overlap=50
)

all_texts = pdf['value'].tolist()
split_texts = text_splitter.split_text(" ".join(all_texts))
embeddings = SentenceTransformer('all-MiniLM-L6-v2').encode(split_texts)



In [0]:
print(type(all_texts), type(split_texts), type(embeddings))
print(len(all_texts), len(split_texts), embeddings.shape)

<class 'list'> <class 'list'> <class 'numpy.ndarray'>
536 6561 (6561, 384)


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType

vector_data = pd.DataFrame({
    'text': split_texts,
    'vector': embeddings.tolist()
})

schema = StructType([
    StructField("text", StringType(), True),
    StructField("vector", ArrayType(FloatType()), True)
])

vector_df = SparkSession.builder.getOrCreate().createDataFrame(vector_data, schema)

table_path = 'workspace.default.fomc_presconf_10years_vector'
vector_df.write.mode('overwrite').saveAsTable(table_path)


In [0]:
vector_df.show(5)

+--------------------+--------------------+
|                text|              vector|
+--------------------+--------------------+
|much on that.  Th...|[0.06885268, 0.01...|
|on that.  Those a...|[0.04153522, 0.02...|
|that.  Those are ...|[0.051951826, 0.0...|
|are  great questi...|[0.041080438, 0.0...|
|great questions ,...|[0.03174615, 0.04...|
+--------------------+--------------------+
only showing top 5 rows



In [0]:
VECTOR_SEARCH_ENDPOINT_NAME = 'fomc_transcript_endpoint'

from databricks.vector_search.client import VectorSearchClient

vsc = VectorSearchClient()

if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
    vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type='STANDARD')

wait_for_vsc_endpoint_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME)
print(f'Endpoint named {VECTOR_SEARCH_ENDPOINT_NAME} is ready')

In [0]:
from databricks.sdk import WorkspaceClient
import databricks.sdk.service.catalog as c

source_table_path = 'workspace.default.fomc_presconf_10years_vector'
vs_index_path = 'workspace.default.fomc_presconf_10years_vector_index'

if not index_exsits(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_path):
    print(f'Creating index {vs_index_path} on endpoint {VECTOR_SEARCH_ENDPOINT_NAME} ... ')
    vsc.create_delta_sync_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
        index_name=vs_index_path,
        source_table_name=source_table_path,
        pipeline_type='TRIGGERED',
        primary_key='id',
        embedding_source_column='vector',
        embedding_model_endpoint_name='bge_small_en_v1_5-3'
    )
    wait_for_vsc_endpoint_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_path)
else:
    wait_for_vsc_endpoint_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_path)
    vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_path).sync()


prunt(f'index {vs_index_fullname} on table {source_table_fullname} is ready')

In [0]:
import mlflow.deployments

deploy_client = mlflow.deployments.get_deploy_client('databricks')

question = 'Give me an economic outlook for January next year.'

results = vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, source_table_path).similarity_search(
    query_text=question,
    columns=['vector'],
    num_results=1
)

docs = results.get('result', {}).get('data_array', [])
docs