# Vector databases in Azure

### Databases used in this notebook
- [Azure Cosmos DB for MongoDB vCore](#azure-cosmos-db-for-mongodb-vcore)
- [Azure Database for PostgreSQL](#azure-database-for-postgresql)
- [Azure Data Explorer (Kusto)](#azure-data-explorer-kusto)
- [Azure Cognitive Search](#azure-cognitive-search)
- [Redis](#redis)

## Deploy infrastructure with Terraform

In [None]:
!terraform init -upgrade
!terraform apply -auto-approve

## Install and import Python packages

In [None]:
# We'll need to install the clients for all vector databases
%pip install wget --user
%pip install pymongo --user
%pip install psycopg2 --user
%pip install azure.kusto.data azure.kusto.ingest --user
%pip install redis --user

In [5]:
import wget
import zipfile
import pandas as pd
import os
import subprocess
import numpy as np

## Load data

In [8]:
# Download embeddings
embeddings_url = 'https://cdn.openai.com/API/examples/data/vector_database_wikipedia_articles_embedded.zip'
wget.download(embeddings_url)

'vector_database_wikipedia_articles_embedded.zip'

In [9]:
# Unzip embeddings
with zipfile.ZipFile("vector_database_wikipedia_articles_embedded.zip","r") as zip_ref:
    zip_ref.extractall("./data")

In [6]:
# Load file to Pandas dataframe
article_df = pd.read_csv('./data/vector_database_wikipedia_articles_embedded.csv')

In [7]:
from ast import literal_eval

# Read vectors from strings back into a list
article_df['title_vector'] = article_df.title_vector.apply(literal_eval)
article_df['content_vector'] = article_df.content_vector.apply(literal_eval)

# Set vector_id to be a string
article_df['vector_id'] = article_df['vector_id'].apply(str)

article_df.info(show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 25000 entries, 0 to 24999
Data columns (total 7 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   id              25000 non-null  int64 
 1   url             25000 non-null  object
 2   title           25000 non-null  object
 3   text            25000 non-null  object
 4   title_vector    25000 non-null  object
 5   content_vector  25000 non-null  object
 6   vector_id       25000 non-null  object
dtypes: int64(1), object(6)
memory usage: 1.3+ MB


## Configurations and testing data

In [11]:
# Login information
user_name = "tomas"
password = "Azure12345678"
random_name = subprocess.check_output(["terraform", "output", "-raw", "random_name"]).decode("utf-8")
cs_api_key = subprocess.check_output(["terraform", "output", "-raw", "cs_api_key"]).decode("utf-8")

In [12]:
# Testing parameters
test_size = 100        # Number of vectors to test
top_n = 3              # Number of results to return

In [13]:
test_vectors = np.random.rand(test_size, 1536)

## Azure Cosmos DB for MongoDB vCore

In [32]:
from pymongo import MongoClient

In [33]:
# Get client
CONNECTION_STRING = f"mongodb+srv://{user_name}:{password}@{random_name}.mongocluster.cosmos.azure.com/mydb?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
client = MongoClient(CONNECTION_STRING)

# Create database
mydb = client["mydb"]

# Create collection
mycol = mydb["mycol"]

In [34]:
# Create indexes
indexes = [
      {
      "name": "vectorSearchIndex",
      "key": {
        "content_vector": "cosmosSearch"
      },
      "cosmosSearchOptions": {
        "kind": "vector-ivf",
        "numLists": 25,
        "similarity": "COS",
        "dimensions": 1536
      }
    }
]
mydb.command(
    {
        "createIndexes": "mycol",
        "indexes": indexes,
    }
)
print("Indexes are: {}\n".format(sorted(mycol.index_information())))

Indexes are: ['_id_', 'vectorSearchIndex']



In [35]:
# Insert records
dict_records = article_df.to_dict("records")
mycol.insert_many(dict_records)

# time taken: 01:43

<pymongo.results.InsertManyResult at 0x7f1dd13836d0>

In [38]:
# Query
results = []
for i in range(test_size):
  query = {
    "$search": {
      "cosmosSearch": {
        "vector": test_vectors[i].tolist(),
        "path": "content_vector",
        "k": top_n
      }
    }
  }
  results.append(mycol.aggregate([query]))

print(f"Got {len(results)} results")

# time taken: 03:51

Got 100 results


In [108]:
# Optional commands to drop collection or database
# mydb.drop_collection("mycol")
# client.drop_database("mydb")

## Azure Database for PostgreSQL

In [39]:
import psycopg2

In [40]:
# Get client
CONNECTION_STRING_PSQL = f"host={random_name}.postgres.database.azure.com port=5432 dbname=mydb user={user_name} password={password} sslmode=require"
conn = psycopg2.connect(CONNECTION_STRING_PSQL)
cur = conn.cursor()

In [41]:
# Enable pgvector extension
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
conn.commit()

In [42]:
# Create table for article_df
cur.execute(
    """
CREATE TABLE IF NOT EXISTS mytable (
    id INT PRIMARY KEY,
    url TEXT,
    title TEXT,
    text TEXT,
    title_vector vector(1536),
    content_vector vector(1536),
    vector_id TEXT
    );
    """
)
conn.commit()

In [43]:
# Insert records
insert_sql = "INSERT INTO mytable (id, url, title, text, title_vector, content_vector, vector_id) VALUES (%s, %s, %s, %s, %s, %s, %s)"
data = [
    tuple(x)
    for x in article_df[
        ["id", "url", "title", "text", "title_vector", "content_vector", "vector_id"]
    ].to_numpy()
]
cur.executemany(insert_sql, data)
conn.commit()

# time taken: 04:17

In [45]:
# Query (accurate search)
results = []
for i in range(test_size):
    query = f"""
    SELECT * FROM mytable
    ORDER BY content_vector <=> '{test_vectors[i].tolist()}'
    LIMIT {top_n};
    """
    results.append(cur.execute(query))

print(f"Got {len(results)} results")

# time taken: 00:16

Got 100 results


In [46]:
# Enable vector cosine index for approximate search
cur.execute(
    "CREATE INDEX IF NOT EXISTS content_vector_cosine_idx ON mytable USING ivfflat(content_vector) WITH (lists = 25);"
)
conn.commit()

In [48]:
# Query (approximate search -> using 2 probes, more probes, better recall)
results = []
for i in range(test_size):
    query = f"""
    SET ivfflat.probes = 2;
    SELECT * FROM mytable
    ORDER BY content_vector <=> '{test_vectors[i].tolist()}'
    LIMIT {top_n};
    """
    results.append(cur.execute(query))

print(f"Got {len(results)} results")

# time taken: 00:16

Got 100 results


In [202]:
# Drop index
cur.execute("DROP INDEX IF EXISTS content_vector_cosine_idx;")
conn.commit()

In [49]:
cur.close()
conn.close()

## Azure Data Explorer (Kusto)

In [50]:
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor
from azure.kusto.data.data_format import DataFormat
from azure.kusto.data.helpers import dataframe_from_result_table
import io

In [51]:
# Get client
AAD_TENANT_ID = "d6af5f85-2a50-4370-b4b5-9b9a55bcb0dc"
KUSTO_URI = f"https://{random_name}.westeurope.kusto.windows.net/"
KUSTO_INGEST_URI = f"https://ingest-{random_name}.westeurope.kusto.windows.net/"
KUSTO_DATABASE = "mydb"
INGEST = KustoConnectionStringBuilder.with_interactive_login(KUSTO_INGEST_URI)
DATA = KustoConnectionStringBuilder.with_interactive_login(KUSTO_URI)

In [None]:
# Create table
client = KustoClient(DATA)
query = ".create table mytable (id:int, url:string, title:string, text:string, title_vector:dynamic, content_vector:dynamic, vector_id:string)"
client.execute_mgmt(KUSTO_DATABASE, query)

In [53]:
# Ingestion client
ingestion_client = QueuedIngestClient(INGEST)
ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table="mytable", data_format=DataFormat.CSV)

In [54]:
# Create file without header
with open("./data/vector_database_wikipedia_articles_embedded.csv", "r", encoding="utf8") as input_file, open("./data/kusto.csv", "w", encoding="utf8") as output_file:
    lines = input_file.readlines()
    output_file.writelines(lines[1:])

In [None]:
%%capture
# Ingest file
ingestion_client.ingest_from_file("./data/kusto.csv", ingestion_properties=ingestion_props)

# time taken: submitting 05:00, but than background ingestion process tool another 10 minutes

In [62]:
# Wait for ingestion to finish
# We should see 25000 rows
query = """
mytable
 | count
 """

result = client.execute_query(KUSTO_DATABASE, query)
dataframe_from_result_table(result.primary_results[0]).Count.values[0]

25000

In [63]:
# Query
results = []
for i in range(test_size):
    query = f"""
    let series_cosine_similarity_fl=(vec1:dynamic, vec2:dynamic, vec1_size:real=double(null), vec2_size:real=double(null))
    {{
        let dp = series_dot_product(vec1, vec2);
        let v1l = iff(isnull(vec1_size), sqrt(series_dot_product(vec1, vec1)), vec1_size);
        let v2l = iff(isnull(vec2_size), sqrt(series_dot_product(vec2, vec2)), vec2_size);
        dp/(v1l*v2l)
    }};
    mytable
    | extend similarity = series_cosine_similarity_fl(dynamic({list(test_vectors[i])}),content_vector,1536,1536) 
    | top {top_n} by similarity desc
    """

    results.append(client.execute(KUSTO_DATABASE, query))

print(f"Got {len(results)} results")

# time taken: 07:04

Got 100 results


## Azure Cognitive Search

In [None]:
!pip install --index-url=https://pkgs.dev.azure.com/azure-sdk/public/_packaging/azure-sdk-for-python/pypi/simple/ azure-search-documents==11.4.0a20230509004

In [9]:
from azure.core.credentials import AzureKeyCredential 
from azure.search.documents import SearchClient  
from azure.search.documents.indexes import SearchIndexClient  
from azure.search.documents.models import Vector  
from azure.search.documents.indexes.models import (  
    SearchIndex,  
    SearchField,  
    SearchFieldDataType,  
    SimpleField,  
    SearchableField,  
    SearchIndex,  
    SemanticConfiguration,  
    PrioritizedFields,  
    SemanticField,  
    SearchField,  
    SemanticSettings,  
    VectorSearch,  
    VectorSearchAlgorithmConfiguration,  
)  

In [14]:
# Configurations
CS_ENDPOINT = f"https://{random_name}.search.windows.net"
CS_API_KEY = cs_api_key
INDEX_NAME = "myindex"

credential = AzureKeyCredential(CS_API_KEY)

In [57]:
# Create a search index
index_client = SearchIndexClient(endpoint=CS_ENDPOINT, credential=credential)
fields = [
    SimpleField(name="vector_id", type=SearchFieldDataType.String, key=True),
    SimpleField(name="id", type=SearchFieldDataType.Int64, key=False),
    SearchableField(
        name="title", type=SearchFieldDataType.String, searchable=True, retrievable=True
    ),
    SearchableField(
        name="url", type=SearchFieldDataType.String, searchable=True, retrievable=True
    ),
    SearchableField(
        name="text", type=SearchFieldDataType.String, searchable=True, retrievable=True
    ),
    SearchField(
        name="title_vector",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        dimensions=1536,
        vector_search_configuration="my-vector-config",
    ),
    SearchField(
        name="content_vector",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        dimensions=1536,
        vector_search_configuration="my-vector-config",
    ),
]

vector_search = VectorSearch(
    algorithm_configurations=[
        VectorSearchAlgorithmConfiguration(
            name="my-vector-config",
            kind="hnsw",
            hnsw_parameters={
                "m": 4,
                "efConstruction": 400,
                "efSearch": 1000,
                "metric": "cosine",
            },
        )
    ]
)

semantic_config = SemanticConfiguration(
    name="my-semantic-config",
    prioritized_fields=PrioritizedFields(
        title_field=SemanticField(field_name="title"),
        prioritized_content_fields=[SemanticField(field_name="text")],
    ),
)

# Create the semantic settings with the configuration
semantic_settings = SemanticSettings(configurations=[semantic_config])

# Create the search index with the semantic settings
index = SearchIndex(
    name=INDEX_NAME,
    fields=fields,
    vector_search=vector_search,
    semantic_settings=semantic_settings,
)
result = index_client.create_or_update_index(index)
print(f" {result.name} created")

 myindex created


In [None]:
# Upload some documents to the index
search_client = SearchClient(endpoint=CS_ENDPOINT, index_name=INDEX_NAME, credential=credential)

for index, row in article_df.iterrows():
    search_client.upload_documents(row)

print(f"Uploaded {len(article_df)} documents")

# time taken: 36:50

In [85]:
# Query
results = []
for i in range(test_size):
    results.append(
        search_client.search(
            search_text="",
            vector=Vector(
                value=test_vectors[i], k=3, fields="content_vector"
            ),
            select=["title", "text", "url"],
        )
    )

print(f"Got {len(results)} results")

# time taken: 00:00 (instant)

Got 100 results


In [None]:
# Print results
sindex = 0
for result in results:
    sindex += 1
    print(f"----- Search {sindex}")
    for row in result:
        print(row)

## Redis

In [95]:
import redis
from redis.commands.search.indexDefinition import (
    IndexDefinition,
    IndexType
)
from redis.commands.search.query import Query
from redis.commands.search.field import (
    TextField,
    VectorField
)

In [96]:
REDIS_HOST = f"redis{random_name}.westeurope.azurecontainer.io"
REDIS_PORT = 6379
REDIS_PASSWORD = password

# Connect to Redis
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD)
redis_client.ping()

True

In [111]:
# Constants
VECTOR_DIM = 1536
VECTOR_NUMBER = 25000
INDEX_NAME = "embeddings-index"
PREFIX = "doc"
DISTANCE_METRIC = "COSINE"

# Define RediSearch fields for each of the columns in the dataset
title = TextField(name="title")
url = TextField(name="url")
text = TextField(name="text")
title_embedding = VectorField(
    "title_vector",
    "FLAT",
    {
        "TYPE": "FLOAT32",
        "DIM": VECTOR_DIM,
        "DISTANCE_METRIC": DISTANCE_METRIC,
        "INITIAL_CAP": VECTOR_NUMBER,
    },
)
text_embedding = VectorField(
    "content_vector",
    "FLAT",
    {
        "TYPE": "FLOAT32",
        "DIM": VECTOR_DIM,
        "DISTANCE_METRIC": DISTANCE_METRIC,
        "INITIAL_CAP": VECTOR_NUMBER,
    },
)
fields = [title, url, text, title_embedding, text_embedding]

In [112]:
# Check if index exists
try:
    redis_client.ft(INDEX_NAME).info()
    print("Index already exists")
except:
    # Create RediSearch Index
    redis_client.ft(INDEX_NAME).create_index(
        fields = fields,
        definition = IndexDefinition(prefix=[PREFIX], index_type=IndexType.HASH)
    )

Index already exists


In [101]:
# Ingest data
for i in range(len(article_df)):
    key = f"{PREFIX}:{str(article_df['id'][i])}"
    doc = {}
    doc["url"] = article_df['url'][i]
    doc["title"] = article_df['title'][i]
    doc["text"] = article_df['text'][i]
    doc["title_vector"] = np.array(article_df["title_vector"][i], dtype=np.float32).tobytes()
    doc["content_vector"] = np.array(article_df["content_vector"][i], dtype=np.float32).tobytes()
    redis_client.hset(key, mapping = doc)

# time taken: 02:27

In [99]:
# Check number of documents in index
print(f"Loaded {redis_client.info()['db0']['keys']} documents in Redis search index with name: {INDEX_NAME}")

Loaded 25000 documents in Redis search index with name: embeddings-index


In [110]:
results = []
for i in range(test_size):
    base_query = f"*=>[KNN {top_n} @content_vector $vector AS vector_score]"
    query = Query(base_query).sort_by("vector_score").paging(0, top_n).dialect(2)
    params_dict = {
        "vector": np.array(test_vectors[i]).astype(dtype=np.float32).tobytes()
    }
    results.append(redis_client.ft(INDEX_NAME).search(query, params_dict))

print(f"Got {len(results)} results")

# time taken: 00:05

Got 100 results


# Destroy infrastructure

In [None]:
!terraform destroy -auto-approve