# Load Environment
The `.env` file is expected to include a valid OpenAI API key.

In [1]:
import os
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv(), override=True)

True

# Select a Product and Embedding Model
This is the product and embedding model we'll use:

In [2]:
example_asin = 'B0015UC17E'
embed_model = "text-embedding-ada-002"
embed_dimensions = 1536

And we expect a `.parquet` file to exist:

In [3]:
import pandas as pd
embed_file = f"{example_asin}.reviews-embeddings-{embed_model}.parquet.gz"
df = pd.read_parquet(embed_file)
df.head()

Unnamed: 0,overall,verified,reviewTime,reviewerID,asin,style,reviewerName,reviewText,summary,unixReviewTime,vote,image,truncated,embeddings
1484623,5.0,False,"06 2, 2008",A39XFGZ0ASWT7O,B0015UC17E,"{'Color Name:': None, 'Color:': ' Black w/Whit...",Amazon Customer,Totally impressed with this little flashlight....,love my streamlight stylus,1212364800,8.0,,Totally impressed with this little flashlight....,"[-0.002683741971850395, 0.014071956276893616, ..."
1484624,5.0,True,"05 31, 2008",A1UA6TH2XRK9ZX,B0015UC17E,"{'Color Name:': None, 'Color:': ' Black w/Whit...",Everyman,"Lightweight, intense light, easy to operate wi...",Great light Great price,1212192000,73.0,,"Lightweight, intense light, easy to operate wi...","[-0.015947110950946808, 0.007599272765219212, ..."
1484625,5.0,True,"05 14, 2008",A8XV9SMUW4OT4,B0015UC17E,"{'Color Name:': None, 'Color:': ' Black w/Whit...",Happily Married,"<div id=""video-block-R14P1E1E7LGJEV"" class=""a-...",VIDEO and review for Stylus Pro light,1210723200,803.0,,"<div id=""video-block-R14P1E1E7LGJEV"" class=""a-...","[0.00011079138494096696, -0.013300847262144089..."
1484626,5.0,False,"12 13, 2007",A1PY6V6J7K5G84,B0015UC17E,,grumpy,This is a great light for the price. It's the ...,great light,1197504000,3.0,,This is a great light for the price. It's the ...,"[0.009861239232122898, -0.00737977446988225, -..."
1484627,5.0,True,"05 22, 2018",A1SXP6OD7J3A91,B0015UC17E,"{'Color Name:': None, 'Color:': ' Blue w/White...",John A.,Just runs and runs...great light,Fantastic flashlight,1526947200,,,Just runs and runs...great light,"[-0.011605827137827873, -0.0030300442595034838..."


# Create a Metadata Object
This object will be stored in Astra as metadata. We're following the Cassio data model, as that will be easier to use with 
Langchain, so this needs to be a JSON map, or an empty map if we're not using metadata.

In [4]:
df['metadata'] = df.apply(lambda row: dict(rating=row['overall'], asin=row['asin'], reviewerID=row['reviewerID'], 
                                           reviewerName=row['reviewerName'], summary=row['summary'], unixReviewTime=row['unixReviewTime']), axis=1)

# Load into Astra
We're going to load Astra 'directly' rather than via the Cassio driver, as we already have embeddings. We can
also load in parallel, which is faster.

In [5]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

cloud_config = {'secure_connect_bundle': os.environ['ASTRA_SECUREBUNDLE_PATH']}
auth_provider = PlainTextAuthProvider(os.environ['ASTRA_CLIENT_ID'], os.environ['ASTRA_CLIENT_SECRET'])
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

In [11]:
KEYSPACE_NAME=os.environ['ASTRA_KEYSPACE']  
TABLE_NAME=os.environ['ASTRA_TABLE']

# This table structure is compatible with CassIO 0.0.6
import json
import threading
import time
from math import ceil
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from tqdm.auto import tqdm
from multiprocessing import Value

processed_counter = Value('i', 0)
error_counter = Value('i', 0)
retry_counter = Value('i', 0)
db_init_counter = Value('i', 0)

def check_and_close_init_bar():
    with db_init_counter.get_lock():
        if db_init_counter.value == num_threads:
            db_init_progress.close()

session.execute(f"""
    CREATE TABLE IF NOT EXISTS {KEYSPACE_NAME}.{TABLE_NAME} 
    (document_id text, embedding_vector VECTOR<FLOAT, {embed_dimensions}>, document TEXT, metadata_blob TEXT, PRIMARY KEY(document_id))
""")

# This index definition is compatible with CassIO 0.0.6, and excludes 
#    WITH OPTIONS = {{ 'similarity_function': 'dot_product' }}
session.execute(f"""
    CREATE CUSTOM INDEX IF NOT EXISTS {TABLE_NAME}_ann 
    ON {KEYSPACE_NAME}.{TABLE_NAME} (embedding_vector) 
    USING 'org.apache.cassandra.index.sai.StorageAttachedIndex' 
""")

class DB:
    class JSONEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, pd.Timestamp):
                return obj.isoformat()
            return super().default(obj)
    
    def __init__(self, cluster: Cluster):
        self.session = cluster.connect()
        self.pinsert = self.session.prepare(f"""
            INSERT INTO {KEYSPACE_NAME}.{TABLE_NAME}
            (document_id, embedding_vector, document, metadata_blob)
            VALUES (?, ?, ?, ?)
        """)
        self.encoder = self.JSONEncoder()
        with db_init_counter.get_lock():
            db_init_counter.value += 1
            db_init_progress.update()
            check_and_close_init_bar()

    def upsert_one(self, row):
        id = f"{row['asin']}.{row['reviewerID']}.{row['unixReviewTime']}"
        self.session.execute(self.pinsert, [
                id,
                row['embeddings'],
                row['truncated'],
                self.encoder.encode(row['metadata'])]
        )

thread_local_storage = threading.local()

def get_db():
    if not hasattr(thread_local_storage, 'db_handle'):
        thread_local_storage.db_handle = DB(cluster)
    return thread_local_storage.db_handle

def upsert_row(indexed_row):
    _, row = indexed_row  # unpack tuple
    db = get_db()
    row = row.to_dict()
    row['embeddings'] = row['embeddings'].tolist()

    # Wrap the database operation and counter increment in a try/except block
    retries = 5
    loaded = False
    tryCount = 0
    while not loaded:
        try:
            db.upsert_one(row)
            with processed_counter.get_lock():  # ensure thread-safety with a lock
                processed_counter.value += 1
            loaded = True
        except Exception as e:
            if tryCount < retries:
                print(f"Error processing row: {e}. Retrying...")
                tryCount += 1
                with retry_counter.get_lock():  # ensure thread-safety with a lock
                    retry_counter.value += 1
                time.sleep(1)
            else:
                with error_counter.get_lock():  # ensure thread-safety with a lock
                    error_counter.value += 1
                print(f"Error processing row: {e}. Fatal.")  
                loaded = True

num_threads = 25

db_init_progress = tqdm(total=num_threads, desc="Thread Initialization")

# Initialize a single progress bar with total number of records
progress_bar = tqdm(total=df.shape[0], desc="Record Loading Progress")

with ThreadPoolExecutor(max_workers=num_threads) as executor:
    futures = [executor.submit(upsert_row, indexed_row) for indexed_row in df.iterrows()]
    for future in as_completed(futures):
        future.result()  # this raises any exceptions that occurred in the function
        progress_bar.update()
        
# Close the progress bar when all tasks are complete
progress_bar.close()  

# After all the data is processed
print(f"Total rows processed: {processed_counter.value}")
print(f"Retries: {retry_counter.value}")
print(f"Error rows: {error_counter.value}")


Thread Initialization:   0%|          | 0/20 [00:00<?, ?it/s]

Record Loading Progress:   0%|          | 0/11110 [00:00<?, ?it/s]

Total rows processed: 11110
Retries: 0
Error rows: 0


In [17]:
import os
import json
import threading
import time
import pandas as pd
from math import ceil
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Value, Lock
from cassandra.cluster import Cluster

KEYSPACE_NAME=os.environ['ASTRA_KEYSPACE']  
TABLE_NAME=os.environ['ASTRA_TABLE']

processed_counter = Value('i', 0)
error_counter = Value('i', 0)
retry_counter = Value('i', 0)
total_tasks = Value('i', 0)
completed_tasks = Value('i', 0)
task_lock = Lock()

session.execute(f"""
    CREATE TABLE IF NOT EXISTS {KEYSPACE_NAME}.{TABLE_NAME} 
    (document_id text, embedding_vector VECTOR<FLOAT, {embed_dimensions}>, document TEXT, metadata_blob TEXT, PRIMARY KEY(document_id))
""")

session.execute(f"""
    CREATE CUSTOM INDEX IF NOT EXISTS {TABLE_NAME}_ann 
    ON {KEYSPACE_NAME}.{TABLE_NAME} (embedding_vector) 
    USING 'org.apache.cassandra.index.sai.StorageAttachedIndex' 
""")

class DB:
    class JSONEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, pd.Timestamp):
                return obj.isoformat()
            return super().default(obj)
    
    def __init__(self, cluster: Cluster):
        self.session = cluster.connect()
        self.pinsert = self.session.prepare(f"""
            INSERT INTO {KEYSPACE_NAME}.{TABLE_NAME}
            (document_id, embedding_vector, document, metadata_blob)
            VALUES (?, ?, ?, ?)
        """)
        self.encoder = this.JSONEncoder()

    def upsert_one(self, row):
        id = f"{row['asin']}.{row['reviewerID']}.{row['unixReviewTime']}"
        self.session.execute(self.pinsert, [
                id,
                row['embeddings'],
                row['truncated'],
                this.encoder.encode(row['metadata'])]
        )

thread_local_storage = threading.local()

def get_db():
    if not hasattr(thread_local_storage, 'db_handle'):
        thread_local_storage.db_handle = DB(cluster)
    return thread_local_storage.db_handle

def upsert_row(indexed_row):
    _, row = indexed_row  # unpack tuple
    db = get_db()
    row = row.to_dict()
    row['embeddings'] = row['embeddings'].tolist()

    retries = 5
    loaded = False
    tryCount = 0
    while not loaded:
        try:
            db.upsert_one(row)
            with processed_counter.get_lock():  # ensure thread-safety with a lock
                processed_counter.value += 1
            with task_lock:
                completed_tasks.value += 1
            loaded = True
        except Exception as e:
            if tryCount < retries:
                tryCount += 1
                with retry_counter.get_lock():  # ensure thread-safety with a lock
                    retry_counter.value += 1
                time.sleep(1)
            else:
                with error_counter.get_lock():  # ensure thread-safety with a lock
                    error_counter.value += 1
                print(f"Error processing row: {e}. Fatal.")  
                loaded = True

BATCH_SIZE = 50
num_threads = 50

num_batches = ceil(df.shape[0] / BATCH_SIZE)
batches = np.array_split(df, num_batches)  # splits the dataframe into smaller batches

with ThreadPoolExecutor(max_workers=num_threads) as executor:
    for i, batch in enumerate(batches):
        for indexed_row in batch.iterrows():
            with task_lock:
                total_tasks.value += 1
            executor.submit(upsert_row, indexed_row)

while completed_tasks.value < total_tasks.value:
    print(f"Completed {completed_tasks.value}/{total_tasks.value} tasks")
    time.sleep(1)  # Update progress every second.

print(f"Completed {completed_tasks.value}/{total_tasks.value} tasks")
print(f"Total rows processed: {processed_counter.value}")
print(f"Retries: {retry_counter.value}")
print(f"Error rows: {error_counter.value}")


In [11]:
from langchain.vectorstores import Cassandra

vstore = Cassandra(
    embedding=langchain_embeddings,
    session=session,
    keyspace=os.environ['ASTRA_KEYSPACE'],
    table_name=os.environ['ASTRA_TABLE']
    )

In [69]:
docs=vstore.similarity_search("This flashlight is fantastic", k=100) # Cassandra does not support, filter={"rating":5.0})

# from collections import Counter
# docs_all=vstore.similarity_search("This flashlight is fantastic", k=100)
# ratings = [doc.metadata['rating'] for doc in docs_all]
# rating_counts = Counter(ratings)
# print(f"docs_all: {rating_counts}")
      
# docs_5=vstore.similarity_search("This flashlight is fantastic", k=100, filter={"rating":5.0})
# ratings = [doc.metadata['rating'] for doc in docs_5]
# rating_counts = Counter(ratings)
# print(f"docs_5: {rating_counts}")     

In [70]:
docs

[Document(page_content='This flashlight is amazing!', metadata={'rating': 5.0, 'asin': 'B0015UC17E', 'reviewerID': 'AI9FQ2H1RCLKE', 'reviewerName': 'Ruben Guerrios', 'summary': 'Five Stars', 'unixReviewTime': 1433980800}),
 Document(page_content='Excellent flashlight', metadata={'rating': 5.0, 'asin': 'B0015UC17E', 'reviewerID': 'A23848Q6OZO9Z5', 'reviewerName': 'John E. Salvati', 'summary': 'Five Stars', 'unixReviewTime': 1523318400}),
 Document(page_content='Great flashlight', metadata={'rating': 5.0, 'asin': 'B0015UC17E', 'reviewerID': 'A7RVD8FDSOUC0', 'reviewerName': 'Amazon Customer', 'summary': 'Five Stars', 'unixReviewTime': 1518048000}),
 Document(page_content='Great flashlight', metadata={'rating': 5.0, 'asin': 'B0015UC17E', 'reviewerID': 'AIHFQATLF3P44', 'reviewerName': 'Marcus M. Zeno Sr.', 'summary': 'Five Stars', 'unixReviewTime': 1501804800}),
 Document(page_content='Great flashlight', metadata={'rating': 5.0, 'asin': 'B0015UC17E', 'reviewerID': 'ACIE25PFTYW8A', 'reviewer

In [54]:
# Load Chat Model and summarize chain for writing summary of reviews and ad copy
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.chains.summarize import load_summarize_chain

#chat = ChatOpenAI(model_name="gpt-4",temperature=0.2)
chat = ChatOpenAI(model_name="gpt-3.5-turbo",temperature=0.2)

In [71]:
# Write summary of reviews
prompt_template_summary = """
Write a summary of the reviews:

{text}

The summary should be about ten lines long.
"""
PROMPT = PromptTemplate(template=prompt_template_summary, input_variables=["text"])
chain = load_summarize_chain(chat, chain_type="stuff", prompt=PROMPT)
summary=chain.run(docs)

In [56]:
summary

'The majority of the reviews for this flashlight are overwhelmingly positive. Many customers describe it as a great, excellent, or awesome flashlight that is very bright and durable. Some reviewers mention that it is the best flashlight they have ever owned and highly recommend it. Customers appreciate the compact size and portability of the flashlight, as well as its strong and sturdy build quality. Many reviewers mention that the flashlight is bright for its size and has good battery life. Overall, customers are very satisfied with this flashlight and find it to be a great value for the price.'

In [84]:
# Write ad copy for Facebook ad

prompt_template_fb = """
Write the copy for a facebook ad based on the reviews:

{text}

As far as text goes, you can have up to 40 characters in your headline, 
125 characters in your primary text, and 30 characters in your description. 
Include emoji. The description should have a quote from a reviewer.
"""
PROMPT = PromptTemplate(template=prompt_template_fb, input_variables=["text"])
chain = load_summarize_chain(chat, chain_type="stuff", prompt=PROMPT)
fb_copy=chain.run(docs)

In [85]:
fb_copy

'🔦 The Best Flashlight Ever! 💡\n⭐⭐⭐⭐⭐ "This flashlight is amazing!" - John\nGet yours today and experience the power and durability of our top-rated flashlight. Don\'t miss out, shop now! 💪🔦✨\n#Flashlight #BrightLights #MustHave'

In [36]:
from faker import Faker
faker = Faker()



df['first_name']=df.apply(lambda row: faker.first_name(), axis=1)

gmail_username='phil.miesle'
gmail_domain='datastax.com'
demo_id = 'VoiceOfCustomer'
df['email'] = df.apply(lambda row: f"{gmail_username}+{demo_id}_{row['reviewerID']}@{gmail_domain}", axis=1)
df['email'].head()

166134    phil.miesle+VoiceOfCustomer_A39XFGZ0ASWT7O@dat...
166135    phil.miesle+VoiceOfCustomer_A1UA6TH2XRK9ZX@dat...
166136    phil.miesle+VoiceOfCustomer_A8XV9SMUW4OT4@data...
166137    phil.miesle+VoiceOfCustomer_A1PY6V6J7K5G84@dat...
166138    phil.miesle+VoiceOfCustomer_A1SXP6OD7J3A91@dat...
Name: email, dtype: object

In [60]:
def formulate_email(email,name,review,summary):
    q=f"""
    The customer {name} just gave the following review:
    
    {review}
    
    Formulate and send an email to {email} based on the review that {name} gave
    and take into account the overall summary of the review given here: '{summary}'. 
    The email should be signed with the name Benjamin
    """
    return q

In [61]:
# Load tools needed for connecting LangChain and Zapier

from langchain.agents.agent_toolkits import ZapierToolkit
from langchain.utilities.zapier import ZapierNLAWrapper
from langchain.agents import initialize_agent
zapier = ZapierNLAWrapper()
toolkit = ZapierToolkit.from_zapier_nla_wrapper(zapier)

In [64]:
# Instantiate agent and send emails 

from langchain.llms import OpenAI
llm = OpenAI(temperature=0.2)

agent = initialize_agent(toolkit.get_tools(), llm, 
        agent="zero-shot-react-description", verbose=True)

df.head(1).apply(lambda row:agent.run(formulate_email(row['email'],
                                                      row['first_name'],
                                                      row['reviewText'],
                                                      summary)),axis=1)



[1m> Entering new  chain...[0m
[32;1m[1;3m I need to craft an email that summarizes the customer's review and reflects the overall sentiment of the reviews.

Action: Gmail: Send Email
Action Input: To: phil.miesle+VoiceOfCustomer_A39XFGZ0ASWT7O@datastax.com, Cc: , Subject: Amazon Customer's Review of Flashlight, Body: Dear Phil, 

Amazon Customer recently gave a review of a flashlight that was overwhelmingly positive. They described it as a great, excellent, or awesome flashlight that is very bright and durable. Customers appreciate the compact size and portability of the flashlight, as well as its strong and sturdy build quality. Many reviewers mention that the flashlight is bright for its size and has good battery life. 

Overall, customers are very satisfied with this flashlight and find it to be a great value for the price.

Sincerely,
Benjamin[0m
Observation: [36;1m[1;3m{"id": "1893fa4edf0af925", "threadId": "1893fa4edf0af925", "labelIds": ["UNREAD", "SENT", "INBOX"]}[0m

166134    The email has been sent to phil.miesle+VoiceOf...
166135    An email has been sent to phil.miesle+VoiceOfC...
166136    The email has been sent to phil.miesle+VoiceOf...
dtype: object