In [1]:
%env VALKEY_HOST=localhost
%env VALKEY_PORT=6379

env: VALKEY_HOST=localhost
env: VALKEY_PORT=6379


# Real-time fraud detection with vector search for Valkey

## 1. Architecture
![Architecture](img/AWS-OnAir_01-Architecture.png)

## 2. Install packages
![Packages](img/AWS-OnAir_02-Packages.jpeg)

In [2]:
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -V

Python 3.12.11


# Install a pip package in the current Jupyter kernel
```python
import sys
!{sys.executable} -m pip install pandas
!{sys.executable} -m pip install numpy
!{sys.executable} -m pip install valkey
```

In [3]:
import pandas as pd

In [None]:
import numpy as np
import uuid
#from sentence_transformers import SentenceTransformer
import datetime
import os
import redis
import time
from redis.commands.search.field import VectorField
from redis.commands.search.field import TextField, NumericField
from redis.commands.search.field import TagField
from redis.commands.search.query import Query
from redis.commands.search.result import Result
from redis.cluster import RedisCluster as MemoryDB
from redis.commands.search.field import VectorField, TextField, NumericField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

## 3. Connect to MemoryDB

![Connection](img/AWS-OnAir_03-Connection.jpeg)

In [None]:
%%time
memorydb_host = os.environ.get("VALKEY_HOST", "localhost")
memorydb_port = os.environ.get("VALKEY_PORT", 6379)
print(f"Valkey URI = {memorydb_host}:{memorydb_port}")
rc = Valkey(host=memorydb_host, port=memorydb_port, ssl=True, decode_responses=False, ssl_cert_reqs="none")
rc.ping()
# rc.flushall()

## 4. [Credit Card Fraud Detection Source](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud)

This dataset presents transactions that occurred in two days, where we have __492__ _frauds_ out of __284,807__ _transactions_.  
The dataset is highly unbalanced, the positive class (_frauds_) account for __0.172%__ of all _transactions_.

It contains only numerical input variables:
- Features __V1__, __V2__, â€¦ __V28__, 'Time' 'Amount' and 'Class'.
- Feature __'Time'__ contains the seconds elapsed between each transaction and the first transaction in the dataset.
- Feature __'Amount'__ is the transaction Amount, this feature can be used for example-dependant cost-sensitive learning.
- Feature __'Class'__ is the response variable and it takes value 1 in case of fraud and 0 otherwise.

![CreditCardFraud](img/AWS-OnAir_04-NeedleHaystack.jpeg)

In [None]:
df = pd.read_csv("creditcard.csv")
df.head()

In [None]:
num_rows = df.shape[0]
print(f"Number of rows in dataset {num_rows:,}")

In [None]:
slice = num_rows - 10_000
newDF = df.iloc[:slice]
# newDF=df
#df_dropped = newDF.drop(columns=['Time', 'Amount', 'Class'])
num_rows = newDF.shape[0]
print(f"Number of rows in dataset {num_rows:,}")

In [None]:
%%time
embedding_columns = [f'V{i}' for i in range(1, 29)]
# Ensure the specified columns exist in the DataFrame
missing_columns = [col for col in embedding_columns if col not in df.columns]
if missing_columns:
    raise ValueError(f"The following embedding columns are missing from the DataFrame: {missing_columns}")
# Convert the specified columns into a list of lists (each row is a list)
vectors = newDF[embedding_columns].values.tolist()

In [None]:
newDF['vector'] = vectors
newDF.head()

In [None]:
subset_df = newDF[['vector', 'Amount', 'Class']]
subset_df.head()
num_rows = subset_df.shape[0]
print(f"Number of rows in dataset {num_rows:,}")

## 5. Create index in MemoryDB

![Create-Index](img/AWS-OnAir_05-Index.jpeg)

In [None]:
def generate_key(prefix = ""):
    return prefix + str(uuid.uuid4())

In [None]:
def create_hnsw_index(rc, index_name, vector_field_name, number_of_vectors, vector_dimensions=28, distance_metric='L2', M=16, EF=512, key_prefix=''):
    # Create the index
    try:
        rc.ft(index_name).create_index([
            VectorField(vector_field_name, "HNSW", {
                "TYPE": "FLOAT32",
                "DIM": vector_dimensions,
                "DISTANCE_METRIC": distance_metric,
                "INITIAL_CAP": number_of_vectors,
                "M": M,
                "EF_CONSTRUCTION": EF
            }),
            NumericField("amount"),
            NumericField("class")
        ], definition=IndexDefinition(prefix=[key_prefix]))
        print(f"Index {index_name} created successfully.")
    except ResponseError as e:
        print(f"Index {index_name} created previously: {str(e)}")

## Behind the scenes

![KNNdistanceMetrics](img/AWS-OnAir_08-KNNdistanceMetrics.png)

```
FT.CREATE "ccfd_hnsw_index" 
PREFIX "1" "tsx:" 
SCORE "1.0" 
SCHEMA "vector" 
VECTOR "HNSW" "12" "TYPE" "FLOAT32" "DIM" "28" "DISTANCE_METRIC" "Cosine" 
INITIAL_CAP "274807" "M" "16" 
EF_CONSTRUCTION "512" "amount" "NUMERIC" "class" "NUMERIC"
```

In [None]:
KEY_PREFIX = "tsx:"
index_name = "ccfd_hnsw_index"
vector_field_name = "vector"
number_of_vectors = len(subset_df)
print(f"Creating Index {index_name} on Field {vector_field_name} expecting {number_of_vectors:,} vectors")

In [None]:
%%time
# Create index in MemoryDB
create_hnsw_index(rc, index_name, vector_field_name, number_of_vectors, 
                  vector_dimensions=28, distance_metric='Cosine', M=16, EF=512, key_prefix=KEY_PREFIX)

## 

6. Load vector embeddings into MemoryDB

![Index](img/AWS-OnAir_06-Load.jpeg)

In [None]:
%%time
# Load data into MemoryDB
BATCH_SIZE = 100
pipe = rc.pipeline()
for index, row in subset_df.iterrows():
    key = generate_key(prefix=KEY_PREFIX)
    vector = np.array(row['vector'], dtype=np.float32).tobytes()
    pipe.hset(key, mapping={
        'vector': vector,
        'amount': row['Amount'],
        'class': row['Class']
    })
    if index % BATCH_SIZE == 0 or index == number_of_vectors - 1:
        pipe.execute()
        pipe = rc.pipeline()
print("Data indexed successfully.")

In [None]:
%%time
# Add a python script to find a random key that stats with the prefix and fetch the value and show it
count = 0
while True:
    count += 1
    keyname = rc.randomkey()
    keyname = keyname.decode('utf-8')
    print(str(keyname))
    if keyname.startswith(KEY_PREFIX) == True:
        print(rc.hgetall(keyname))
        break
    elif count > 10:
        break


## 7. Find fraudulent transactions

![Find-Tsx](img/AWS-OnAir_07-Find.jpeg)

In [None]:
df_class_1 = subset_df.query('Class == 1').tail(10)
df_class_1


In [None]:
selected_row = subset_df.iloc[263324]
print(selected_row)
query_vector = selected_row['vector']

In [None]:
def similarity_search(redis_client, index_name, query_vector, top_n=5):
    # Convert the query vector to bytes
    query_vector_bytes = np.array(query_vector, dtype=np.float32).tobytes()
    # Create the query
    query = Query(f"*=>[KNN {top_n} @vector $query_vec AS score]") \
        .sort_by("score") \
        .return_fields( "amount", "class") \
        .paging(0, top_n) \
        .dialect(2)
    params = {
        "query_vec": query_vector_bytes
    }
    # Process the query
    result = redis_client.ft(index_name).search(query, query_params=params)
    return result

### Query behind the scenes

```
FT.SEARCH "ccfd_hnsw_index" "*=>[KNN 5 @vector $query_vec AS score]" 
RETURN "2" "amount" "class" 
SORTBY "score" "ASC" "DIALECT" "2" "LIMIT" "0" "5" 
"params" "2" "query_vec" "\x1e!2\xbf\x0b\xef\x14?\x184\x18@\xbd\xd5\x81=?\x82\xa8>\xa5T\xe6\xbe\...\xbf"
```

In [None]:
%%time
results = similarity_search(rc, index_name, query_vector, top_n=5)

In [None]:
%%time

for doc in results.docs:
    doc_id = doc.id
  #  score = doc.score
    amount = doc.amount
    class_value = doc['class']  # Accessing with square brackets
    print(f"ID: {doc_id}, Amount: {amount}, Class: {class_value}")