In [34]:
!pip3 install --upgrade  -q gdown

## 👽 Goal of this Notebook

In this notebook we will run Ghostly? 👻  Ghastly? ingest from Google Sheets/Form into our "ghosts" collection in the open source Milvus vector database.


### 👻 Simple Unstructured Analytics:

### 🔍 Summary
By the end of this application, you will see that we can grab any form entry values that Google saved in a Google Sheet and parse those CSV to insert into Milvus.

We could have done this in Datavolo Apache NiFi, Ray, Apache Spark, Apache Kafka, Apache Seatunnel, Java, .NET, Golang, NodeJS, Airbyte, FiveTran and other more heavy duty production ETL tools.


#### 🐍 AIM Stack - Easy Local Free Open Source RAG

* Ollama
* Python
* Jupyter Lab Notebook
* Milvus Vector Database
* Pymilvus
* Sentence Transformers
* Hugging Face

#### 🌐 Models

* BGE-M3 - BAAI/bge-m3
* LLava - llava:7b
* GTE - Alibaba-NLP/gte-base-en-v1.5
* SPLADE - naver/splade-cocondenser-selfdistil
* CLIP - clip-ViT-B-32

![milvuslogo](https://milvus.io/images/milvus_logo.svg)

#### 🎃 Resources

* https://github.com/tspannhw/AIM-BecomingAnAIEngineer
* https://github.com/tspannhw/FLiPStackWeekly
* https://medium.com/@tspann/ghosts-are-unstructured-data-i-e31b34c0d9e4
* https://github.com/tspannhw/AIM-Ghosts
* https://zilliz.com/community/unstructured-data-meetup
* https://www.meetup.com/unstructured-data-meetup-new-york/
* https://lu.ma/naqu6xrd?tk=kJaUfp
* https://zilliz.com/learn/an-ultimate-guide-to-vectorizing-structured-data
* https://zilliz.com/blog/metadata-filtering-hybrid-search-or-agent-in-rag-applications
  
Open Source Milvus

In [4]:
import warnings
warnings.filterwarnings('ignore')
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
import os
import requests
import base64
from tqdm.notebook import tqdm
from PIL import Image
from sentence_transformers import SentenceTransformer
from pymilvus import MilvusClient
import requests
from pymilvus import (
   utility,
   FieldSchema, CollectionSchema, DataType,
   Collection, AnnSearchRequest, RRFRanker, connections,
)
from pymilvus.model.dense import SentenceTransformerEmbeddingFunction
from pymilvus.model.sparse import SpladeEmbeddingFunction
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
from dotenv import load_dotenv
load_dotenv(override=True)


# Multi
bge_m3 = BGEM3EmbeddingFunction(
    model_name = 'BAAI/bge-m3', 
    device = 'cpu', 
    use_fp16 = False
)

# BGE-M3 Sparse
dense_dim = bge_m3.dim["dense"] 

# Slade Sparse
splade_ef = SpladeEmbeddingFunction(
    model_name="naver/splade-cocondenser-selfdistil", 
    device="cpu"
)

# Load CLIP model
model = SentenceTransformer('clip-ViT-B-32')

# Big Text
textmodel = SentenceTransformer("Alibaba-NLP/gte-base-en-v1.5", trust_remote_code=True)

Fetching 30 files:   0%|          | 0/30 [00:00<?, ?it/s]

In [5]:
# Connect to Milvus Standalone on Docker
from pymilvus import connections
from pymilvus import utility
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection
from pymilvus import MilvusClient

### Constants
S3_URL = "http://192.168.1.166:9000"
MINIO_USER = 'minioadmin'
MINIO_PASSWORD = 'minioadmin'
MINIO_REGION = 'us-east-1'

MILVUS_URL = "http://192.168.1.166:19530" 

milvus_client = MilvusClient(uri=MILVUS_URL)
DIMENSION = 512
TEXTDIMENSION = 768
COLLECTION = "ghosts"

ghost_classes = ["Fake", "Art", "TV", "Unclassified", "Class I", "Class II", "Class III", "Class IV", "Class V", "Class VI", "Class VII"]

connections.connect(uri=MILVUS_URL)

fields = [
    FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name='ghostclass', dtype=DataType.VARCHAR, max_length=20),
    FieldSchema(name='filename', dtype=DataType.VARCHAR, max_length=256),
    FieldSchema(name='s3path', dtype=DataType.VARCHAR, max_length=1024),
    FieldSchema(name='description', dtype=DataType.VARCHAR, max_length=65000),
    FieldSchema(name='category', dtype=DataType.VARCHAR, max_length=256), 
    FieldSchema(name='identification', dtype=DataType.VARCHAR, max_length=50),  
    FieldSchema(name='location', dtype=DataType.VARCHAR, max_length=256),  
    FieldSchema(name='country', dtype=DataType.VARCHAR, max_length=4),
    FieldSchema(name='latitude', dtype=DataType.VARCHAR, max_length=20),
    FieldSchema(name='longitude', dtype=DataType.VARCHAR, max_length=20),      
    FieldSchema(name='zipcode', dtype=DataType.VARCHAR, max_length=20),  
    FieldSchema(name='timestamp', dtype=DataType.VARCHAR, max_length=128),
    FieldSchema(name='s3timestamp', dtype=DataType.VARCHAR, max_length=128),
    FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, dim=DIMENSION),
    FieldSchema(name="text_vector", dtype=DataType.SPARSE_FLOAT_VECTOR),
    FieldSchema(name="text_vector2", dtype=DataType.FLOAT_VECTOR, dim=TEXTDIMENSION),
    FieldSchema(name="text_vector3", dtype=DataType.FLOAT_VECTOR, dim=dense_dim)
]

# with partitioning
schema = CollectionSchema(fields=fields, partition_key_field="ghostclass", num_partitions=8 )

if milvus_client.has_collection(collection_name=COLLECTION):
    print(COLLECTION + " exists")
else:
    milvus_client.create_collection(COLLECTION, schema=schema,  auto_id=True)

    index_params = milvus_client.prepare_index_params()
    
    index_params.add_index(field_name = "id", index_type="STL_SORT")
    index_params.add_index(field_name = "ghostclass", index_type="TRIE")
    index_params.add_index(field_name = "zipcode",  index_type="INVERTED", index_name="inverted_index")
    index_params.add_index(field_name = "vector", metric_type="COSINE")
    index_params.add_index(field_name = "text_vector", index_type="SPARSE_INVERTED_INDEX", metric_type="IP")
    index_params.add_index(field_name = "text_vector2", metric_type="COSINE")
    index_params.add_index(field_name = "text_vector3", metric_type= "L2", index_type="IVF_FLAT", params = {"nlist": 128} )

    milvus_client.create_index( collection_name=COLLECTION, index_params=index_params)
    
    res = milvus_client.get_load_state( collection_name = COLLECTION)
    
    print(res)

milvus_client.load_collection(COLLECTION)
res = milvus_client.get_load_state( collection_name = COLLECTION )
print(res)

### for hybrid search
collection = Collection(name=COLLECTION, schema=schema)

ghosts exists
{'state': <LoadState: Loaded>}


In [43]:
import random
from datetime import datetime
import pprint
import time
import csv
import requests
import time
import datetime
from time import gmtime, strftime
import random, string
import uuid
from random import seed
from random import random
from io import BytesIO
from datetime import datetime
import requests 
import shutil 
import gdown
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

GHOST_BUCKET = "images"

### full load
loadt0 = time.time()

### random
seed(1)

### URL download
CSV_URL = ""
# Timestamp	Email Address	Ghost Class	Description	Category	Location	Photos

### Row Count
rowCount = 0

with requests.Session() as s:
    download = s.get(CSV_URL)

    decoded_content = download.content.decode('utf-8')
    cr = csv.reader(decoded_content.splitlines(), delimiter=',')
    my_list = list(cr)
    for row in my_list:
        
        # print(row)
        if ( row[0] == "Timestamp"):
            print("Skipping First Row of CSV - Header")
        else:
            rowCount = rowCount + 1
            # row[0] - timestamp
            # row[1] - email   <--- add hidden field?  encrypt?   ignore pii
            # row[2] - ghost class
            # row[3] - description
            # row[4] - category
            # row[5] - location
            # row[6] - photo
            print( str(row[2]) + " " + str(row[6]) )
            image = None
            
            try:
                rawtext = str(row[6])

                if "drive.google.com" in rawtext: 
                    imagelinks = rawtext.split("=")
                    imageurl = str( "https://drive.google.com/uc?export=view&id=" + str(imagelinks[1]))
                    file_name = "tempimage.png"
                    gdown.download(imageurl, file_name, quiet=True)                    
                    print('Image sucessfully Downloaded: ',file_name)
                    image = Image.open(file_name)    
                else:
                    imageurl = rawtext
                    response = requests.get(rawtext)
                    image = Image.open(BytesIO(response.content))

                description = str(row[3])
                image_embedding = model.encode([image])                
                text_embedding = textmodel.encode(description) 
                sparse_embedding = splade_ef.encode_documents([description])
                bgem3_embedding = bge_m3.encode_documents([description])
                filename = "fieldformghost" + str(rowCount)
                milvus_client.insert(COLLECTION, {"ghostclass": str(row[2]),
                                                  "filename": str(filename),
                                                  "s3path": str(imageurl),
                                                  "description": str(description), 
                                                  "category": str(row[4]),
                                                  "identification":  str(random()*1000),
                                                  "location": str(row[5]),
                                                  "country": "USA",
                                                  "latitude": str("NA"),
                                                  "longitude": str("NA"),
                                                  "zipcode":str("NA"),
                                                  "timestamp": str(row[0]),
                                                  "s3timestamp": str(datetime.now()),
                                                  "vector": image_embedding[0], 
                                                  "text_vector": sparse_embedding, 
                                                  "text_vector2": text_embedding, 
                                                  "text_vector3": bgem3_embedding["dense"][0]} )
            except Exception as e:
                print("save exception: ")
                print(e)

### full load stop
loadt1 = time.time()

print(f"Inserted Time: {round(loadt1-loadt0, 4)} seconds\n\n")
print("Inserted " + str(rowCount) + " rows.")

Skipping First Row of CSV - Header
Class III https://drive.google.com/open?id=1kc8sMST5C6vWnEZraIVJWVNmi9M7lflO
Image sucessfully Downloaded:  tempimage.png
Art https://drive.google.com/open?id=1YKcJ47Ekdx0jV1AFO_-G9pjZEes5YVgl
Image sucessfully Downloaded:  tempimage.png
Class II https://github.com/tspannhw/AIM-Ghosts/blob/main/happyhalloween2024.jpg?raw=true
Inserted Time: 17.4714 seconds


Inserted 3 rows.
