# Change data capture with MongoDB

In [1]:
import pymongo
import torch
import sys

sys.path.append('../')

from pinnacledb.encoders.numpy.array import array
from pinnacledb.models.sentence_transformers.wrapper import SentenceTransformer
from pinnacledb.datalayer.mongodb.query import Collection
from pinnacledb.misc.pinnacle import pinnacle
from pinnacledb.core.watcher import Watcher
from pinnacledb.core.vector_index import VectorIndex
from pinnacledb.datalayer.base.cdc import DatabaseWatcher

INFO:numexpr.utils:NumExpr defaulting to 8 threads.


Connect to mongodb
Make it a pinnacle instance

In [2]:
db_mongo = pymongo.MongoClient('mongodb://testmongodbuser:testmongodbpassword@localhost:27018/admin')
db_mongo.drop_database('test_db')
db = pinnacle(db_mongo.test_db)

Populate data
Insert the data into `documents` collection

In [3]:
data = [
  {
    "title": "Politics of Armenia",
    "abstract": "The politics of Armenia take place in the framework of the parliamentary representative democratic republic of Armenia, whereby the President of Armenia is the head of state and the Prime Minister of Armenia the head of government, and of a multi-party system. Executive power is exercised by the President and the Government."
  },
  {
    "title": "Foreign relations of Armenia",
    "abstract": "Since its independence, Armenia has maintained a policy of complementarism by trying to have positive and friendly relations with Iran, Russia, and the West, including the United States and the European Union.– \"Armenian Foreign Policy Between Russia, Iran And U."
  },
  {
    "title": "Demographics of American Samoa",
    "abstract": "This article is about the demographics of American Samoa, including population density, ethnicity, education level, health of the populace, economic status, religious affiliations and other aspects of the population. American Samoa is an unincorporated territory of the United States located in the South Pacific Ocean."
  },
  {
    "title": "Analysis",
    "abstract": "Analysis is the process of breaking a complex topic or substance into smaller parts in order to gain a better understanding of it. The technique has been applied in the study of mathematics and logic since before Aristotle (384–322 B."
  }
]

from pinnacledb.encoders.pillow.image import pil_image as i
from pinnacledb.core.documents import Document as D
from sentence_transformers import SentenceTransformer as _SentenceTransformer
data = [D(d) for d in data]

db.execute(Collection(name='documents').insert_many(data, encoders= [i]))

INFO:root:found 0 uris


(<pymongo.results.InsertManyResult at 0x7ff01f4d8d60>,
 TaskWorkflow(database=<pinnacledb.datalayer.base.database.BaseDatabase object at 0x7ff029927070>, G=<networkx.classes.digraph.DiGraph object at 0x7ff01f4d8be0>))

# Create a vector index watcher.
### This consist a indexing watcher (SentenceTransformer) model to vectorize a key.

In [4]:
def configure_text_search(model):
    return VectorIndex(
        identifier='my-index',
        indexing_watcher=Watcher(
            model=model,
            key='abstract',
            select=Collection(name='documents').find()
        ),
        compatible_watcher=
            Watcher(
                model=model,
                key='title',
                select=Collection(name='documents').find(),
                active=False,
            )
        
    )




job  = configure_text_search(SentenceTransformer(identifier="test-st", object=_SentenceTransformer('all-MiniLM-L6-v2'), encoder= array('float32', shape=(384,))
))


INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2
INFO:sentence_transformers.SentenceTransformer:Use pytorch device: cpu


Add the vector index job to pinnacledb.

In [5]:
db.add(job)



Batches:   0%|          | 0/1 [00:00<?, ?it/s]

INFO:root:loading hashes: 'my-index'


[]

Create instance of `DatabaseWatcher`
Start watching `documents` collection.

In [26]:
database_watcher = DatabaseWatcher(db=db, identifier='basic-cdc-watcher', on=Collection(name='documents'))
database_watcher.watch()

INFO:root:Database watch service started at 2023-07-21 19:21:35.649003
INFO:root:Started listening database with identity basic-cdc-watcher/documents...


# Check the watcher status

In [27]:
database_watcher.is_available()

True

# You can check info of the watcher.

In [28]:
database_watcher.info()

{
  "inserts": 0,
  "updates": 0
}


{'inserts': 0, 'updates': 0}

Add 2 documents and check the info again

In [29]:
data = [{
  "title": "Politics of India",
  "abstract": "Some despriction 1",
}, {
  "title": "Politics of Asia",
  "abstract": "some description 2",
}]
doc = db_mongo.test_db.documents.insert_many(data)

INFO:root:found 0 uris


Batches:   0%|          | 0/1 [00:00<?, ?it/s]



Batches:   0%|          | 0/1 [00:00<?, ?it/s]

# Check the inserts info again

In [10]:
database_watcher.info()

{
  "inserts": 1,
  "updates": 0
}


{'inserts': 1, 'updates': 0}

# Check vectors if they are synced with the vector database.

In [11]:
from pinnacledb.vector_search.lancedb_client import LanceDBClient
from pinnacledb import CFG

In [12]:
lance_client = LanceDBClient(CFG.vector_search.type)

# Use the identifier which is `model/key`

In [13]:
table = lance_client.get_table('test-st/abstract')

In [14]:
table.table.to_pandas()

Unnamed: 0,vector,id
0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",__SEEDKEY__
1,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b9970f87282bb1ebd74a82
2,"[-0.057723213, 0.09273246, -0.028482832, 0.014...",64b9970f87282bb1ebd74a83
3,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b9989b6f03202cfcf0f784
4,"[-0.057723213, 0.09273246, -0.028482832, 0.014...",64b9989b6f03202cfcf0f785
5,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b998fc9740cbc5d6c06555
6,"[-0.057723213, 0.09273246, -0.028482832, 0.014...",64b998fc9740cbc5d6c06556
7,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b99d8b10919213a90f7b50
8,"[-0.057723213, 0.09273246, -0.028482832, 0.014...",64b99d8b10919213a90f7b51
9,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b99d8c10919213a90f7b52


# Advance Section

# Resume Tokens

In [30]:
from pinnacledb.datalayer.mongodb.cdc import ResumeToken

In [16]:
data = [{
  "title": "Politics of India",
  "abstract": "Some despriction 1",
}, {
  "title": "Politics of Asia",
  "abstract": "some description 2",
},
{
  "title": "Politics of Abc",
  "abstract": "some description 3",
}
]
doc = db_mongo.test_db.documents.insert_many(data)

In [31]:
resume_tokens = database_watcher.resume_tokens()

## You can use the resume token to resume watching.

### Lets restart the change stream from the begining.

In [32]:
first_change_stream_token = resume_tokens[0]

In [33]:
database_watcher.stop()

In [34]:
database_watcher.resume(first_change_stream_token)

INFO:root:Database watch service started at 2023-07-21 19:22:23.076590
INFO:root:Started listening database with identity basic-cdc-watcher/documents...
INFO:root:found 0 uris


Batches:   0%|          | 0/1 [00:00<?, ?it/s]



Batches:   0%|          | 0/1 [00:00<?, ?it/s]

### As you can see above the change stream has started again from the initial point of time using the first resume token.

# Change pipeline

In [None]:
database_watcher.stop()

### your program has been listening to all operations. In a real application this would be overwhelming and often unnecessary as each part of your application will generally want to listen only to specific operations. To limit the amount of operations, you can use certain aggregation stages when setting up the stream

### Lets create a specific change pipeline which only listens to insert operations.

In [35]:
from pinnacledb.datalayer.mongodb.cdc import MongoChangePipeline
mongo_change_pipeline = MongoChangePipeline(['insert'])

In [36]:
change_pipeline = mongo_change_pipeline.build_matching()

In [37]:
database_watcher = DatabaseWatcher(db=db, identifier='basic-cdc-watcher', on=Collection(name='documents'))
database_watcher.watch(change_pipeline)

INFO:root:Database watch service started at 2023-07-21 19:24:53.890748
INFO:root:Started listening database with identity basic-cdc-watcher/documents...


In [38]:
data = [{
  "title": "Politics of India",
  "abstract": "Some despriction 1",
}, {
  "title": "Politics of Asia",
  "abstract": "some description 2",
},
{
  "title": "Politics of Abc",
  "abstract": "some description 3",
}
]
doc = db_mongo.test_db.documents.insert_many(data)

INFO:root:found 0 uris
INFO:root:found 0 uris


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]