# CU Recommender System

In this example we are creating a cu recommender system by extractung feature vectors using PaddlePaddle, importing bets vectors into Milvus, and then searching in Milvus and Redis.

## Data
In this project, we use qa3 tenant data for 2021. This dataset contains approximately 4900 cus made under 160 Books and 2500 solutions. 

We use the following files:
- cu.dat: Contains Gsi -> Cus mapping information.
- bet.dat: Contains Cu -> GEs mapping information.


## Requirements

Due to package constraints, this notebook needs to be run using Python 3.6/3.7 . It is recommended that you use a virtual enviroment like Conda, instructions for installing Conda can be found [here](https://conda.io/projects/conda/en/latest/user-guide/install/index.html). 

Currently, there is a dirty workaround that you can use for python 3.8+. When installing the `requirements.txt`, pip will fail to install`sentencepiece`. If you rerun the notebook after the install fails and avoid redownloading the packages, the rest of the notebook should run without any hiccups.

|  Packages |  Servers |
| --------------- | -------------- |
| pymilvus==2.0.0rc5 | milvus-2.0.0-rc5 |
| pymongo           | mongodb          |
| paddle_serving_app |
| paddlepaddle==2.1.1 |


We have included a requirements.txt file in order to easily satisfy the required packages. 

## Up and Running

### Installing Packages
Install the required python packages with `requirements.txt`. If using Python 3.8, look at workaround in the Requirements section. Uninstall previous pymilvus-orm if needed.

In [None]:
! pip3 install -r requirements.txt

### Getting Milvus Server

This demo uses Milvus 2.0 Standalone with docker-compose, please refer to [Install Milvus 2.0](https://milvus.io/docs/v2.0.0/install_standalone-docker.md) for other installation options (on Kubernetes or use Milvus Cluster). 

Alternatively we have deployed standalone milvus in dev kubernetes. We need to grep the endpoint by pod ip.
This can be done by
`kubectl get po -o wide | grep milvus-standalone`

In [None]:
!helm install milvus milvus/milvus --set cluster.enabled=false \
--set minio.persistence.storageClass=ebs-sc --set log.persistence.persistentVolumeClaim.storageClass=ebs-sc \
--set standalone.persistence.persistentVolumeClaim.storageClass=ebs-sc --set minio.persistence.size=50Gi \
--set etcd.global.storageClass=ebs-sc --set pulsar.default_storage.existingStorageClassName=ebs-sc

In [None]:
!docker-compose up -d

### Starting Redis Server
We are using Redis as a metadata storage service. Code can easily be modified to use a python dictionary, but that usually does not work in any use case outside of quick examples. We need a metadata storage service in order to be able to be able to map between embeddings and the corresponding data.

In [None]:
!docker run  --name redis -d -p 6379:6379 redis

### Confirm Running Servers

In [None]:
! docker ps

### Downloading Data from Mongodb (Bets Store)

In [None]:
import pandas as pd
import urllib.parse
from pymongo import MongoClient

def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s?authSource=%s&readPreference=primary&ssl=false' % (username, urllib.parse.quote(password), host, port, db,'nsl_bet_db_qa3')
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)

    return conn[db]


In [None]:
def read_mongo(db, collection, query={}, host='10.220.98.254', port=27017, username='bet', password='bet@123', no_id=False):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].aggregate(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))
    
    # Delete the _id
    if no_id:
        del df['_id']

    return df

In [None]:
gsi_pipeline = [
    {
        "$project": {"nb": "$$ROOT", "_id": 0}
    },
    {
        "$lookup": {
            "localField": "nb.gsiList.id",
            "from": "nsl_gsi",
            "foreignField": "id",
            "as": "ng"
        }
    },
    {
        "$unwind": {
            "path": "$ng",
            "preserveNullAndEmptyArrays": False
        }
    },
    {
        "$lookup": {
            "localField": "ng.solutionLogic.referencedChangeUnit",
            "from": "nsl_change_unit",
            "foreignField": "id",
            "as": "cu"
        }
    },
    {
        "$unwind": {
            "path": "$cu",
            "preserveNullAndEmptyArrays": False
        }
    },
    {
        "$addFields": {
            "bookId": "$nb.id",
            "gsiId": "$ng.id",
            "gsiName": "$ng.displayName"
        }
    },
    {
        "$match": {
            "$and": [
                {
                    "nb.tenantId": {
                        "$in": [
                            "ProjectCarnivals",
                            "Banking",
                            "projectmanagement",
                            "Brane-Finance",
                            "FinanceSolBrane",
                            "AILABS",
                            "BRF2008",
                            "Finance2008",
                            "FamilyApp2008",
                            "Learning2008",
                            "GRC",
                            "Insurance",
                            "Healthcare",
                            "SupplyChain",
                            "Pharma",
                            "CustomerSuccess"]
                    }
                },
                {"nb.displayName": {"$regex": "^((?!test).)*$", "$options": "i"}},
                {"nb.displayName": {"$regex": "^((?!book).)*$", "$options": "i"}},
                {"ng.displayName": {"$regex": "^((?!test).)*$", "$options": "i"}}
            ]
        }
    },
    {
        "$group": {
            "_id": {"bookId": "$bookId", "gsiId": "$gsiId", "gsiName": "$gsiName"},
            "addToSet(cu_name)": {"$addToSet": "$cu.displayName"}
        }
    },
    {
        "$project": {
            "bookId": "$_id.bookId",
            "gsiId": "$_id.gsiId",
            "gsiName": "$_id.gsiName",
            "cuNames": "$addToSet(cu_name)",
            "_id": 0
        }
    }
]

In [None]:
df = read_mongo(db='nsl_bet_db_soln',collection='nsl_book',query=gsi_pipeline)
df.to_csv("data/cu.csv")


## Code Overview


### Importing Gsi into Milvus

#### 1. Connectings to Milvus and Redis
Both servers are running as Docker containers on the localhost with their corresponding default ports.

In [None]:
from pymilvus import *
import redis

# connections.connect()
connections.connect("default", host="localhost", port="19530")

EXPIRATION_SECONDS = 600
r = redis.StrictRedis(host="localhost", port=6379) 


#### 2. Loading CU into Redis
We begin by loading all the movie files into redis. 

In [None]:
#r.flushall()

In [None]:
import pandas as pd
import json
from ast import literal_eval

df2 = pd.read_csv("data/cu.csv",index_col='id',converters={'cuId': literal_eval})
df2["cuId"]= df2["cuId"].str.join(". ")

#0|Health insurance_377|CustomerPortal_HealthInsurance|['Customer Menu Options', 'Customer Accessing Portal']
pipe = r.pipeline()
for i in range(len(df2['gsiId'])):
    temp = {"gsi": df2['gsiId'][i], "cus" : df2['cuId'][i], "book": df2['bookId'][i] }
    pipe.set(df2['gsiId'][i],json.dumps(temp))
inserts = pipe.execute()

#### 3.1 Creating Partition and Collection in Milvus

In [None]:
COLLECTION_NAME = 'demo_cu'
PARTITION_NAME = 'GsiCu'
FIELD_NAME = 'vec'

pk = FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=False)
field = FieldSchema(name=FIELD_NAME, dtype=DataType.FLOAT_VECTOR, dim=768)
schema = CollectionSchema(fields=[pk, field], description="cu recommender: demo_cu")

if utility.get_connection().has_collection(COLLECTION_NAME): # drop the same collection created before
    collection = Collection(COLLECTION_NAME)
    collection.drop()
else:
    collection = Collection(name=COLLECTION_NAME, schema=schema)
    partition = collection.create_partition(PARTITION_NAME)
    print("Collection & partition are successfully created.")

#### 3.2 Setting an Index
After creating the collection we want to assign it an index type. This can be done before or after inserting the data. When done before, indexes will be made as data comes in and fills the data segments. In this example we are using IVF_SQ8 which requires the 'nlist' parameter.

In [None]:
# Flush collection with inserted vectors to disk
# utility.get_connection().flush([COLLECTION_NAME])

index_param = {
        "metric_type":"L2",
        "index_type":"IVF_SQ8",
        "params":{"nlist":1024}
    }
collection.create_index(field_name=FIELD_NAME, index_params=index_param)

#### 4.1 Generating Embeddings
In this example we are using the sentence_transformer library to encode the sentence into vectors. This library uses a modified BERT model to generate the embeddings, and in this example we are using a model pretrained using Microsoft's mpnet. More info can be found here.

In [None]:
from sentence_transformers import SentenceTransformer
import pandas as pd
from sklearn.preprocessing import normalize

model = SentenceTransformer('paraphrase-mpnet-base-v2')
# Get gsi and related cu clusters.
title_data = df2['gsiId'].tolist();
text_data = df2['cuNames'].tolist();

sentence_embeddings = model.encode(text_data)
sentence_embeddings = normalize(sentence_embeddings)
print(type(sentence_embeddings))


In [None]:
text_input = tf.keras.layers.Input(shape=(), dtype=tf.string)
preprocessor = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3")
encoder_inputs = preprocessor(text_input)
encoder = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4",trainable=True)
outputs = encoder(encoder_inputs)
pooled_output = outputs["pooled_output"]      # [batch_size, 768].
sequence_output = outputs["sequence_output"]  

#### 4.2 Getting Embeddings and IDs
Since current dataset contains only 550 vectors, we are inserting all of them as one batch insert.

In [None]:
embeddings =list(sentence_embeddings)


#### 4. Importing Vectors into Milvus
Import vectors into the partition **GsiCu** under the collection **demo_cu**.

In [None]:
if collection.num_entities != 0:
    print(COLLECTION_NAME + " is not empty!")
else:
    mr = collection.insert(data=[text_data, embeddings], partition_name=PARTITION_NAME)

print("Record count in collection: " + str(collection.num_entities))
print(str(len(mr.primary_keys)) + " ids:\n", mr.primary_keys[:10])

### Recalling Vectors in Milvus

#### 2. Searching
Pass in the user vector, and then recall vectors in the previously imported data collection and partition.

In [None]:
gsi = "Order Management"
query_embeddings = []
embed = model.encode(gsi)
embed = embed.reshape(1,-1)
embed = normalize(embed)
query_embeddings = embed.tolist()
topK = 20
SEARCH_PARAM = {"metric_type":"L2","params":{"nprobe": 20}}

collection.load() # load collection memory before search
results = collection.search(query_embeddings,"gsiId",param=SEARCH_PARAM, limit=topK, expr=None);


In [None]:
for x in results:
        for y in x:
            print(y.id, y.distance)

#### 3. Returning Information by IDs

In [None]:
I = []
for x in results:
    for y in x.ids:
        I.append(y)
        
recall_results = []
for x in I:
    recall_results.append(r.get("{}##movie_info".format(x)).decode('utf-8'))
recall_results

## Conclusion

After completing the recall service, the results can be further sorted using the **movie_recommender** model, and then the movies with high similarity scores can be recommended to users. You can try this deployable recommender system using this [quick start](QUICK_START.md).