In [1]:
import phoenix as px

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import llama_index
from llama_index.core import Document
from llama_index.indices.managed.vectara import VectaraIndex,VectaraAutoRetriever
from llama_index.core.schema import TextNode
from llama_index.core.vector_stores import MetadataInfo, VectorStoreInfo
from llama_index.core.indices.service_context import ServiceContext
from llama_index.llms.together import TogetherLLM

from common_imports import *
import json , os , sys , time , re
import dotenv
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


In [3]:
px.launch_app()
llama_index.core.set_global_handler("arize_phoenix")

🌍 To view the Phoenix app in your browser, visit http://localhost:6006/
📺 To view the Phoenix app in a notebook, run `px.active_session().view()`
📖 For more information on how to use Phoenix, check out https://docs.arize.com/phoenix


In [4]:
dotenv.load_dotenv()

True

## Review Emb

In [5]:
def convert_reviews_into_textnodes(reviews):
    # reviews : list of reviews 
    # review_documents = [Document(text=review['text'],metadata=review , doc_id=review['metadata']) for review in reviews]
    review_nodes = [TextNode(text=review['text'],metadata={
        'source':str(review['source']),
        'date_time':str(review['metadata']['at']),
        'reviewId':str(review['metadata']['reviewId']),
        'userName':str(review['metadata']['userName']),
        'rating':str(review['metadata']['score']),
        'thumbsUpCount':str(review['metadata']['thumbsUpCount']),
        'appVersion':str(review['metadata']['appVersion']),
        'replyContent':str(review['metadata']['replyContent']),
        'repliedAt':str(review['metadata']['repliedAt']),
        'text':str(review['text']),
        'app_name':str(review['app_name']),
    },id_ = review['metadata']['reviewId']
    ) for review in reviews]

    return review_nodes

In [6]:
def get_reviews_from_file(file_path):
    # file_path : path to the file containing reviews
    # Returns list of reviews
    with open(file_path,'r') as f:
        reviews = json.load(f)
    return reviews

In [7]:
class ReviewVectaraEngine:
    def __init__(
            self,
            verbose=True, 
            similarity_top_k=2,
            summary_enabled=False,
            summary_response_lang="eng",
            summary_num_results=7,
            llm_model_name="mistralai/Mixtral-8x7B-Instruct-v0.1",
            ):
        self.index = VectaraIndex(show_progress=True)
        self.vector_store_info = VectorStoreInfo(
            content_info = "App reviews from different sources",
            metadata_info = [
                MetadataInfo(
                    name = "source",
                    type = "string",
                    description="Source of the review like playstore,appstore etc"
                ),
                MetadataInfo(
                    name = "date_time",
                    type = "string",
                    description="Date and time of the review"
                ),
                MetadataInfo(
                    name = "reviewId",
                    type = "string",
                    description="Review Id"
                ),
                MetadataInfo(
                    name = "userName",
                    type = "string",
                    description="User name of the reviewer"
                ),
                MetadataInfo(
                    name = "rating",
                    type = "float",
                    description="Rating given by the reviewer"
                ),
                MetadataInfo(
                    name = "thumbsUpCount",
                    type = "int",
                    description="Number of thumbs up i.e. the relevance of the review"
                ),
                MetadataInfo(
                    name = "appVersion",
                    type = "string",
                    description="App version of the app for which review is given"
                ),
                MetadataInfo(
                    name = "replyContent",
                    type = "string",
                    description="Reply content to the review by any other user"
                ),
                MetadataInfo(
                    name = "repliedAt",
                    type = "string",
                    description="Date and time of the reply"
                ),
                MetadataInfo(
                    name = "app_name",
                    type = "string",
                    description="App name for which review is given"
                )
            ]
        )
        self.llm = TogetherLLM(
            model=llm_model_name, api_key=os.environ['TOGETHER_API_KEY']
        )

        self.verbose = verbose
        self.similarity_top_k = similarity_top_k
        self.summary_enabled = summary_enabled
        self.summary_response_lang = summary_response_lang
        self.summary_num_results = summary_num_results

        self.build()
    
    def build(self):
        self.auto_retriever = VectaraAutoRetriever(
            vector_store_info=self.vector_store_info,
            llm=self.llm,
            index=self.index,
            show_progress=True,
            summary_enabled = self.summary_enabled,
            summary_response_lang = self.summary_response_lang,
            summary_num_results = self.summary_num_results,
            verbose=self.verbose,
        )
        self.retriever = self.index.as_retriever(
            similarity_top_k=self.similarity_top_k,
            summary_enabled = self.summary_enabled,
            summary_response_lang = self.summary_response_lang,
            summary_num_results = self.summary_num_results,
            llm = self.llm,
            )
        self.query_engine = self.index.as_query_engine(similarity_top_k=5)

    def ingest_reviews(self,review_file_path,start=None,end=None):
        self.reviews = get_reviews_from_file(review_file_path)[start:end]
        self.review_nodes = convert_reviews_into_textnodes(self.reviews)
        self.index = VectaraIndex(nodes = self.review_nodes, show_progress=True)
        self.build()
    
    def ingest_nodes(self,nodes):
        self.index = VectaraIndex(nodes = nodes, show_progress=True)
        self.build()

    def run(self,query , mode:str):
        # query : query string
        # mode : 'autoretriever' or 'retriever' or 'query_engine'
        if mode == 'autoretriever':
            return self.auto_retriever.retrieve(query)
        elif mode == 'retriever':
            return self.retriever.retrieve(query)
        elif mode == 'query_engine':
            return self.query_engine.query(query)
        else:
            return "Invalid mode"

In [8]:
# _review_file_path = './datas/api_result_reviews_relv_Google_Pay_Secure_UPI_payment_v0.json'
# _reviews = get_reviews_from_file(_review_file_path)
# _review_nodes = convert_reviews_into_textnodes(_reviews)

In [None]:
# _review_nodes[0]

In [None]:
# review_vectara_engine = ReviewVectaraEngine(
#     verbose=True, 
#     similarity_top_k=2,
#     summary_enabled=False,
#     summary_response_lang="eng",
#     summary_num_results=7,
#     llm_model_name="mistralai/Mixtral-8x7B-Instruct-v0.1",
# )

In [None]:
# review_file_path = './datas/api_result_reviews_relv_Grand_Theft_Auto_San_Andreas_v0.json'
# review_engine.ingest_reviews(review_file_path,end=20)

In [None]:
# ans = review_engine.run('What are the reviews from playstore',mode='autoretriever')

## Generic JSON Engine

In [9]:
"""
JSON_Engine is a wrapper over LlamaIndex JSON Query Engine
It takes a JSON prompt and a pydanctic class name as input in the constructor
It uses OpenAI API to generate the output

run method processes the prompt and returns the output in the form of pydantic class object
"""
class JSON_Engine(BaseTool):
    def __init__(self, prompt, class_name, llm_model_name: str = "mistralai/Mixtral-8x7B-Instruct-v0.1",temperature=0.1, api_key_name = "TOGETHER_API_KEY",parse=True,gemma_mode=False):
        self.output_parser = PydanticOutputParser(class_name)
        # self.llm = TogetherLLM(model=llm_model_name, api_key=os.environ[api_key_name], temperature=temperature)
        self.llm = OpenAI(model="gpt-3.5-turbo", temperature=temperature)
        self.json_prompt_str = prompt
        self.class_name = class_name
        self.json_prompt_str = self.output_parser.format(self.json_prompt_str)
        self.json_prompt_tmpl = PromptTemplate(self.json_prompt_str)
        if parse:
            self.p = QueryPipeline(chain=[self.json_prompt_tmpl, self.llm, self.output_parser], verbose=False)
        else:
            self.p = QueryPipeline(chain=[self.json_prompt_tmpl, self.llm], verbose=False)
        self.gemma_mode = gemma_mode
    
    def run(self, **kwargs):
        response = self.p.run(**kwargs)
        if self.gemma_mode:
            pattern = r'{.*}'
            matches = re.findall(pattern, response.message.content, re.DOTALL)
            largest_match = max(matches, key=len, default=None)
            print(largest_match)
            response = self.output_parser.parse(largest_match)
        return response

## Data Ingestion Pipeline

In [10]:
class Isssue(BaseModel):
    issue:str = Field(...,description="""
    A negative aspect extracted from the review context. It should be a full sentence and complete.
    """)

    containsAnyReport:bool = Field(...,description="""
    Whether the issue contains any particular report or not. It may user is reporting some issue or bug like any glitch, crash, lag, something not working etc. 
    True means the issue contains any particular report
    False means the issue does not contain any particular report
    """)

    featureRequest:bool = Field(...,description=
    """
    Whether the issue is a feature request or not. It may user is unsatisfied with the current features and wants some new features / upgrade.
    True means the issue is a feature request
    False means the issue is not a feature request
    """)

class IssueList(BaseModel):
    issues:List[Isssue] = Field(...,description=
    """
    List of different Issues from review context
    """
    )

class IssueEngine:
    def __init__(self):
        self.engine = JSON_Engine("""
        Given a review context, extract the list of different negative aspects mentioned in the review context:
        <<<
        {text}
        >>>

        Note: 
        These aspects should be all different negative aspects of the review context and should be full sentences.
        Also they must be complete i.e. they should not be a part of bigger aspects.
        If there are no negative aspects, return an empty list. 

        """,class_name = IssueList,temperature=0.2,gemma_mode=False,parse=True)
    
    def __call__(self,**kwargs):
        return self.engine.run(**kwargs)

In [11]:
class ReviewDegenAndSentimentPipeline:
    def __init__(self):
        self.issue_engine = IssueEngine()
    
    def __call__(self,review):
        issues = self.issue_engine(text=review)
        issues = issues.dict()['issues']
        return issues

In [13]:
os.environ['OPENAI_API_KEY'] = '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'

In [14]:
_sub_contexts = ReviewDegenAndSentimentPipeline()(review="""
Hi, After the last update, I'm not able to open the app. When I click on the login id, the UI disappear's and close the application. Kindly fix the issue ASAP. Secondly, the experience was excellent and swift before, but the cash back is very low. Everyone needs cash backs instead of irrelevant vouchers and coupons. Kindly bring these fixes ASAP. Keep growing guys. Thank you google pay.!!
""")

In [None]:
# _sub_contexts.message.content

In [None]:
# Regex pattern to match the largest content within curly braces


In [None]:
# print(largest_match)

### Creating Routes for different team

In [15]:
class UtterenceList(BaseModel):
    utterenceList : List[str] = Field(...,description="""List of different utterances / use case related to the given team's work""")

class UtterenceEngine:
    def __init__(self):
        self.engine = JSON_Engine("""
        Given an app description (between ### and ###):
        ###
        {app_description}
        ###


        Given a team's scope of work (between <<< and >>>):
        <<<
        {scopes}
        >>>

        List down the more than 15 different utterances / use cases related to the given team's work and the app_description where they work.
        These utterances should be full sentences and refers to the different bugs, features, improvements, etc. related to the team's work.
        Strictly stick to the scope of work of the team.
        """,class_name = UtterenceList,temperature=0.1)
    
    def __call__(self,**kwargs):
        return self.engine.run(**kwargs)
    

In [16]:
class TechnicalSummary(BaseModel):
    summary:str = Field(...,description=
            """
            Summary of all technical details associated with the app
            """)

class TechnicalSummaryEngine:
    def __init__(self):
        self.engine = JSON_Engine("""
        Given a raw app description (between <<< and >>>):
        <<<
        {app_description}
        >>>
        
        Summarize all the technical details associated with the app into a brief summary. Only include techincal features. 
        """,class_name = TechnicalSummary,temperature=0.1)
    
    def __call__(self,**kwargs):
        return self.engine.run(**kwargs)

In [17]:
class TeamAndReview(BaseModel):
    team:int = Field(...,description=
    """
    Team's ID
    """)

    reason_for_assignment:str = Field(...,description=
    """
    Reason for assigning the team to the review
    """)

class TeamAndReviewList(BaseModel):
    teams:List[TeamAndReview] = Field(...,description=
    """
    List of different teams assigned to the reviews
    """
    )

class TeamAndReviewEngine:
    def __init__(self):
        self.engine = JSON_Engine("""
        Given a list of teams along with their IDs and scope of work (between <<< and >>>):
        <<<
        {teams}
        >>>
                                  

        Given a review context (between ### and ###):
        ###
        {review}
        ###

        This review can be assigned to multiple teams based on the context of the review and the scope of work of the teams.
        List down the different teams along with their IDs to which the review can be assigned based on the review context.
        Also provide the reason for assigning the review to the team. 
        Strictly stick to the scope of work of the teams and check if the context of the review matches with the scope of work of the teams.
        """,class_name = TeamAndReviewList,temperature=0.1)

    def __call__(self,**kwargs):
        return self.engine.run(**kwargs)

In [18]:
class TeamRoutePipeline:
    def __init__(
            self, 
            team_details_file_path:str, 
            app_details_file_path:str,
            app_description=None,
            ):
        
        self.team_details_file_path = team_details_file_path
        # load app details from the file
        with open(app_details_file_path,'r') as f:
            app_details = json.load(f)
        with open(self.team_details_file_path,'r') as f:
            self.team_details = json.load(f)

        self.app_name = app_details["title"]
        self.app_description = app_description
        # self.utterance_engine = UtterenceEngine()
        self.technical_summary_engine = TechnicalSummaryEngine()

        self.team_route_engine = TeamAndReviewEngine()
        self.team_subprompt = None
        
        # self.score_threshold_config = score_threshold_config
        # if score_threshold_config is None:
        #     self.score_threshold_config = {team:score_threshold for team in self.team_details["teams"]}

        if app_description is None:
            self.app_description = app_details["description"]
            self.summarized_app_description = self.technical_summary_engine(app_description=self.app_description).summary
        # if build_mode:
        #     self.build()

    # def build(self):
    #     # load team details from the file
    #     with open(self.team_details_file_path,'r') as f:
    #         self.team_details = json.load(f)
    #     print("Building utterances ...")
    #     for i,team in tqdm(enumerate(self.team_details["teams"])):
    #         print(f"Building utterances for team : {team['teamName']} ...")
    #         utterance_ans = self.utterance_engine(app_description=self.summarized_app_description,scopes=str(team["scopes"])).utterenceList
    #         self.team_details["teams"][i]["utterances"] = utterance_ans
    #     # save the updated team details to the file
    #     with open(self.team_details_file_path,'w') as f:
    #         json.dump(self.team_details,f,indent=4)  

    def build_routes(self):
        with open(self.team_details_file_path,'r') as f:
            self.team_details = json.load(f)

        print("Building route layers ...")
        self.team_subprompt = ""
        for i,team in tqdm(enumerate(self.team_details["teams"])):
            team_name = team["teamName"]
            scopes = team["scopes"]
            self.team_subprompt += f"TeamID:{i} , TeamName:{team_name} , Scopes:{scopes}\n"



    def route_text(self, text):
        if self.team_subprompt is None:
           self.build_routes()
        route_ans =  self.team_route_engine(teams=self.team_subprompt,review=text)
        return route_ans

In [19]:
class ReviewRoutePipeline:
    def __init__(self, team_route_obj):
        self.review_degen_pipeline = ReviewDegenAndSentimentPipeline()
        self.team_route_obj = team_route_obj
        self.backup = []

    def __call__(self,review_nodes):
        review_nodes_assigned = []
        for review_node in review_nodes:
            issues = self.review_degen_pipeline(review_node.text) 
            # assign each subset to a team
            # Loop over subsets_with_sentiment 
            teams = []
            review_node.metadata['assigned_teams'] = []
            review_node.metadata['reasons'] = []
            review_node.metadata['issues'] = []
            # review_node.metadata['positive_keywords'] = []
            review_sub_prompt = ""
            for issue in issues:
                if issue["containsAnyReport"] or issue['featureRequest']:
                    review_sub_prompt += f"issue:{issue['issue']}\n"
                    review_node.metadata['issues'].append(issue['issue'])
                    

            print(review_sub_prompt)
            if not (review_sub_prompt == ""):
                list_of_team_and_reviews = self.team_route_obj.route_text(review_sub_prompt).teams
                for team_and_review in list_of_team_and_reviews:
                    team_id = team_and_review.team
                    team = self.team_route_obj.team_details["teams"][team_id]["teamName"]
                    print("...",team,team_and_review)
                    teams.append(team)
                    review_node.metadata['assigned_teams'].append(team)
                    review_node.metadata['reasons'].append(team_and_review.reason_for_assignment)
            else:
                print("No issues found in the review")
            review_nodes_assigned.append(review_node)
            self.backup=review_nodes_assigned
        return review_nodes_assigned

In [22]:
team_route = TeamRoutePipeline(
    team_details_file_path = './datas/team_details_Google_Pay_Secure_UPI_payment.json',
    app_details_file_path = './datas/api_result_appdescr_Google_Pay_Secure_UPI_payment.json',
)

# # team_route.build()
team_route.build_routes()

Building route layers ...


6it [00:00, 14742.72it/s]


In [23]:
_review_route_pipeline = ReviewRoutePipeline(team_route)

In [26]:
# _review_file_path = './datas/api_result_reviews_relv_Google_Pay_Secure_UPI_payment_v0.json'
# _reviews = get_reviews_from_file(_review_file_path)
# _review_nodes = convert_reviews_into_textnodes(_reviews)

In [None]:
_review_nodes_routed = _review_route_pipeline(_review_nodes[:30])

In [28]:
# review_vectara_engine = ReviewVectaraEngine(
#     verbose=True, 
#     similarity_top_k=2,
#     summary_enabled=False,
#     summary_response_lang="eng",
#     summary_num_results=7,
#     llm_model_name="mistralai/Mixtral-8x7B-Instruct-v0.1",
# )

# review_vectara_engine.ingest_nodes(_review_nodes_routed)

LLM is explicitly disabled. Using MockLLM.
Embeddings have been explicitly disabled. Using MockEmbedding.
LLM is explicitly disabled. Using MockLLM.
Embeddings have been explicitly disabled. Using MockEmbedding.


In [30]:
# Store review nodes in the JSON file
# ID and Metadata are stored in the JSON file
# Load existing review nodes from the JSON file and append the new review nodes to it

def store_review_nodes(review_nodes,file_path):
    review_nodes_dict = [{'_id':review_node.dict()['id_'], 'metadata':review_node.dict()['metadata']} for review_node in review_nodes]
    try:
        with open(file_path,'r') as f:
            existing_review_nodes = json.load(f)
            existing_review_nodes.extend(review_nodes_dict)
    except:
        existing_review_nodes = review_nodes_dict
    with open(file_path,'w') as f:
        json.dump(existing_review_nodes,f,indent=4)

store_review_nodes(_review_nodes_routed,'./datas/team_assigned_result_reviews_relv_Google_Pay_Secure_UPI_payment.json')