In [1]:
import tempfile
import logging
from embedchain import App
from embedchain.chunkers.common_chunker import CommonChunker
from embedchain.config.add_config import ChunkerConfig
import streamlit as st
OPENAI_API_KEY = st.secrets["OPENAI_API_KEY"]

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create a temporary directory for ChromaDB
db_path = tempfile.mkdtemp()
logger.info(f"Created temporary directory for ChromaDB: {db_path}")

config = {
    'app': {
        'config': {
            'name': 'full-stack-app'
        }
    },
        'llm': {
        'provider': 'openai',
        'config': {
            'model': 'gpt-4o',
            'temperature': 0.3,
            # 'max_tokens': 2000,
            'prompt': (
                "The following context is primarily in French.\n"
                "If the query is in English, translate it to French before searching.\n"
                "Once you find relevant information in French, translate it back to English before answering.\n\n"
                # "Provide relevant answers back in the query's language."
                "Be comprehensive, accurate and precise. Share as much information as possible.\n"
                "Everytime you answer a question, figure out why it matters to the audience, and why it matters to the protagonist, and where/how he developed this trait, and how he's using it in practice in his life and business. Be specific\n"
                "Always try to extract the actionable practical advice from the content.\n"
                "Give the context within which every story is set.\n"
                "Always keep the storytelling part that's relevant to the query. Keep ALL the details of the story as accurate and precise as possible.\n"
                "For every reply, you should respond in sections, where each section is a single idea/concept/lesson/value/motivation/belief/etc.\n"
                "In each section, you should provide a clear claim, backstory/context, anecdote/story that you can retrieve from the content to exemplify your claim.\n"
                "For every section, you should provide 1-3 hyper actionable and practical advice, tips that can be derived and executed from this insight.\n"
                "IMPORTANT: If you don't know the answer, don't try to make up an answer. Don't try to give a generic answer. Don't try to give generic best practices.\n"
                "Always end your reply with a summary of the main points you covered in the reply.\n"
                "Always translate the quotes back to English.\n"
                "Act as a potential customer of the protagonist. Interpret the answers based on what's in it for you. You want to learn practical advice that you can apply in your own personal or professional life. You know the importance of author's advice and takeaways, and real-lifestorytelling to absorb lessons."
                "$context\n\nQuery: $query\n\nHelpful Answer:"
            ),
            'system_prompt': (
                "Act as a translator and answer finder for a multilingual audience.\n"
                "If a question is in a different language from the content, translate the query to match the content language.\n"
                "Provide relevant answers back in the query's language."
                "Be comprehensive, accurate and precise. Share as much information as possible.\n"
                "Everytime you answer a question, figure out why it matters to the audience, and why it matters to the protagonist, and where/how he developed this trait, and how he's using it in practice in his life and business. Be specific\n"
                "Always try to extract the actionable practical advice from the content.\n"
                "Give the context within which every story is set.\n"
                "Always keep the storytelling part that's relevant to the query. Keep ALL the details of the story as accurate and precise as possible.\n"
                "For every reply, you should respond in sections, where each section is a single idea/concept/lesson/value/motivation/belief/etc.\n"
                "In each section, you should provide a clear claim, backstory/context, anecdote/story that you can retrieve from the content to exemplify your claim.\n"
                "For every section, you should provide 1-3 hyper actionable and practical advice, tips that can be derived and executed from this insight.\n"
                "IMPORTANT: If you don't know the answer, don't try to make up an answer. Don't try to give a generic answer. Don't try to give generic best practices.\n"
                "Always end your reply with a summary of the main points you covered in the reply.\n"
                "Always translate the quotes back to English.\n"
                "Act as a potential customer of the protagonist. Interpret the answers based on what's in it for you. You want to learn practical advice that you can apply in your own personal or professional life. You know the importance of author's advice and takeaways, and real-lifestorytelling to absorb lessons."
                # "IMPORTANT: If you don't know the answer, LEAVE IT BLANK, and move on. Don't try to make up an answer. Don't try to give a generic answer. Don't try to give generic best practices.\n"
            ),
            'api_key': OPENAI_API_KEY,
        }
    },
    'vectordb': {
        'provider': 'chroma',
        'config': {
            'dir': db_path,
            'allow_reset': True
        }
    },
    'embedder': {
        'provider': 'openai',
        'config': {
            'model': 'text-embedding-ada-002',
            'api_key': OPENAI_API_KEY,
        }
    },
    'chunker': {
        'chunk_size': 3000,
        'chunk_overlap': 1000,
        'length_function': 'len',
        'min_chunk_size': 1001
    },
}

def get_app_instance():
    return App.from_config(config=config)

app_instance = get_app_instance()


# import tempfile
# import logging
# from embedchain import App


# # Set up logging
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)

# # Create a temporary directory for ChromaDB
# db_path = tempfile.mkdtemp()
# logger.info(f"Created temporary directory for ChromaDB: {db_path}")

# # Configuration dictionary
# # config = {
# #     'app': {
# #         'config': {
# #             'name': 'full-stack-app'
# #         }
# #     },
# #     'llm': {
# #         'provider': 'openai',
# #         'config': {
# #             'model': 'gpt-4o',
# #             'temperature': 0.4,
# #             'max_tokens': 10000,
# #             'prompt': (
# #                 "$context contains all the information available to you.\n\n"
# #                 "Reply to the $query by retrieving as much information as possible from the context.\n"
# #                 "Be comprehensive, accurate and precise. Share as much information as possible.\n"
# #                 "Everytime you answer a question, figure out why it matters to the audience, and why it matters to the protagonist, and where/how he developed this trait, and how he's using it in practice in his life and business. Be specific\n"
# #                 "Always try to extract the actionable practical advice from the video.\n"
# #                 "Give the context within which every story is set.\n"
# #                 "Always keep the storytelling part that's relevant to the query. Keep ALL the details of the story as accurate and precise as possible.\n"
# #                 "For every reply, you should respond in sections, where each section is a single idea/concept/lesson/value/motivation/belief/etc.\n"
# #                 "In each section, you should provide a clear claim, backstory/context, anecdote/story that was mentioned in the transcript to exemplify your claim.\n"
# #                 "For every section, you should provide 1-3 hyper actionable and practical advice, tips that can be derived and executed from this insight.\n"
# #                 "IMPORTANT: If you don't know the answer, LEAVE IT BLANK, and move on. Don't try to make up an answer. Don't try to give a generic answer. Don't try to give generic best practices.\n"
# #                 "Always end your reply with a summary of the main points.\n"
# #                 "Make sure English.\n"
# #                 # "$context\n\nQuery: $query\n\nHelpful Answer:"
# #             ),
# #             'system_prompt': (
# #                 "Act as a potential customer of the author of the video. Interpret the answers based on what's in it for you. You want to learn practical advice that you can apply in your own personal or professional life. You know the importance of author's advice and takeaways, and real-lifestorytelling to absorb lessons."
# #                 "IMPORTANT: If you don't know the answer, LEAVE IT BLANK, and move on. Don't try to make up an answer. Don't try to give a generic answer. Don't try to give generic best practices.\n"
# #             ),
# #             'api_key': OPENAI_API_KEY,
# #         }
# #     },
# #     'vectordb': {
# #         'provider': 'chroma',
# #         'config': {
# #             'dir': db_path,
# #             'allow_reset': True
# #         }
# #     },
# #     'embedder': {
# #         'provider': 'openai',
# #         'config': {
# #             'model': 'text-embedding-ada-002',
# #             'api_key': OPENAI_API_KEY,
# #         }
# #     },
# # }
# config = {
#     'app': {
#         'config': {
#             'name': 'full-stack-app'
#         }
#     },
#     'llm': {
#         'provider': 'openai',
#         'config': {
#             'model': 'gpt-4o',
#             'temperature': 0.3,
#             'max_tokens': 10000,
#             'prompt': (
#                 "Reply to the $query by providing as much information about the protagonist in the video as possible.\n"
#                 "Be comprehensive, accurate and precise. Share as much information as possible.\n"
#                 "Everytime you answer a question, figure out why it matters to the audience, and why it matters to the protagonist, and where/how he developed this trait, and how he's using it in practice in his life and business. Be specific\n"
#                 "Always try to extract the actionable practical advice from the video.\n"
#                 "Give the context within which every story is set.\n"
#                 "Always keep the storytelling part that's relevant to the query. Keep ALL the details of the story as accurate and precise as possible.\n"
#                 "For every reply, you should respond in sections, where each section is a single idea/concept/lesson/value/motivation/belief/etc.\n"
#                 "In each section, you should provide a clear claim, backstory/context, anecdote/story that was mentioned in the transcript to exemplify your claim.\n"
#                 "For every section, you should provide 1-3 hyper actionable and practical advice, tips that can be derived and executed from this insight.\n"
#                 "IMPORTANT: If you don't know the answer, LEAVE IT BLANK, and move on. Don't try to make up an answer. Don't try to give a generic answer. Don't try to give generic best practices.\n"
#                 "Always end your reply with a summary of the main points you covered in the reply.\n"
#                 "Always translate the quotes back to English.\n"
#                 "$context\n\nQuery: $query\n\nHelpful Answer:"
#             ),
#             'system_prompt': (
#                 "Act as a potential customer of the author of the video. Interpret the answers based on what's in it for you. You want to learn practical advice that you can apply in your own personal or professional life. You know the importance of author's advice and takeaways, and real-lifestorytelling to absorb lessons."
#                 "IMPORTANT: If you don't know the answer, LEAVE IT BLANK, and move on. Don't try to make up an answer. Don't try to give a generic answer. Don't try to give generic best practices.\n"
#             ),
#             'api_key': OPENAI_API_KEY,
#         }
#     },
#     'vectordb': {
#         'provider': 'chroma',
#         'config': {
#             'dir': db_path,
#             # 'allow_reset': True
#         }
#     },
#     'embedder': {
#         'provider': 'openai',
#         'config': {
#             'model': 'text-embedding-ada-002',
#             'api_key': OPENAI_API_KEY,
#         }
#     },
# }

# def get_app_instance():
#     return App.from_config(config=config)

# app_instance = get_app_instance()



In [3]:

from typing import Any, Type, List, Union
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import logging
from dotenv import load_dotenv
import os

import tempfile
import logging
from embedchain import App
from typing import Any, Type, List
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import os
import requests
import io
from pydub import AudioSegment
from deepgram import Deepgram
import streamlit as st
OPENAI_API_KEY = st.secrets["OPENAI_API_KEY"]

# Set up Deepgram
os.environ["DEEPGRAM_API_KEY"] = st.secrets["DEEPGRAM_API_KEY"]
# deepgram = Deepgram(DEEPGRAM_API_KEY)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PostInfo(BaseModel):
    """Instagram post information."""
    post_id: str
    caption: str
    timestamp: datetime
    likes: int
    url: str
    is_video: bool
    video_url: str = ""

    class Config:
        arbitrary_types_allowed = True

# class PostInfo(BaseModel):
#     """Instagram post information."""
#     post_id: str
#     caption: str
#     timestamp: datetime
#     likes: int
#     url: str
#     is_video: bool
#     video_url: str = ""

#     class Config:
#         arbitrary_types_allowed = True

from typing import Any, Type, List
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import logging
from dotenv import load_dotenv
import os

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FetchInstagramPostsInput(BaseModel):
    """Input for FetchInstagramPosts."""
    instagram_username: str = Field(..., description="The Instagram username to fetch posts from")

class FetchInstagramPostsOutput(BaseModel):
    """Output containing list of Instagram posts."""
    posts: List[PostInfo]
    success: bool = True
    error_message: str = ""

class FetchInstagramPostsTool(BaseTool):
    """Tool that fetches Instagram posts from a specified account."""
    name: str = "Fetch Instagram Posts"
    description: str = "Fetches the latest posts from a specified Instagram account"
    args_schema: Type[BaseModel] = FetchInstagramPostsInput
    insta_loader: Any = Field(default=None, exclude=True)
    
    def _get_instaloader_instance(self):
        """Get or create a shared Instaloader instance."""
        if not self.insta_loader:
            self.insta_loader = instaloader.Instaloader()
            try:
                # Load Instagram credentials from environment variables
                username = os.getenv("INSTAGRAM_USERNAME", "the_smart_funnel")
                password = os.getenv("INSTAGRAM_PASSWORD", "Firescan2024+")
                
                self.insta_loader.login(username, password)
                logger.info("Successfully logged in to Instagram")
            except Exception as e:
                logger.error(f"Failed to login to Instagram: {str(e)}")
                raise
        return self.insta_loader

    def _run(self, instagram_username: str) -> FetchInstagramPostsOutput:
        try:
            logger.info(f"Fetching posts for user: {instagram_username}")
            
            # Get Instagram instance
            loader = self._get_instaloader_instance()
            
            # Get profile and posts
            profile = instaloader.Profile.from_username(loader.context, instagram_username)
            posts = []
            post_count = 0
            
            for post in profile.get_posts():
                try:
                    post_info = PostInfo(
                        post_id=post.shortcode,
                        caption=post.caption if post.caption else "",
                        timestamp=post.date_utc,
                        likes=post.likes,
                        url=f"https://www.instagram.com/p/{post.shortcode}/",
                        is_video=post.is_video,
                        video_url=post.video_url if post.is_video else ""
                    )
                    posts.append(post_info)
                    post_count += 1
                    logger.info(f"Processed post {post.shortcode}")

                    # Limit to 4 posts for testing
                    if post_count >= 4:
                        break
                        
                except Exception as post_error:
                    logger.error(f"Error processing individual post: {str(post_error)}")
                    continue

            if not posts:
                return FetchInstagramPostsOutput(
                    posts=[],
                    success=False,
                    error_message="No posts were successfully fetched"
                )

            logger.info(f"Successfully fetched {len(posts)} posts")
            return FetchInstagramPostsOutput(
                posts=posts,
                success=True,
                error_message=""
            )

        except Exception as e:
            error_message = f"Error fetching Instagram posts: {str(e)}"
            logger.error(error_message)
            return FetchInstagramPostsOutput(
                posts=[],
                success=False,
                error_message=error_message
            )

    def _handle_error(self, error: Exception) -> str:
        """Handle errors that occur during tool execution."""
        error_message = str(error)
        if "login" in error_message.lower():
            return "Failed to authenticate with Instagram. Please check your credentials."
        elif "not found" in error_message.lower():
            return f"Instagram profile not found. Please check the username."
        elif "rate limit" in error_message.lower():
            return "Instagram rate limit reached. Please try again later."
        else:
            return f"An error occurred: {error_message}"

class AddInstagramAudioInput(BaseModel):
    """Input for AddInstagramAudio."""
    # posts: List[PostInfo] = Field(..., description="The Instagram posts to process")
    video_url: str = Field(..., description="The URL of the Instagram video to process")

class AddInstagramAudioOutput(BaseModel):
    """Output from AddInstagramAudio."""
    success: bool = Field(..., description="Whether the audio was successfully added to the vector DB")
    processed_posts: List[str] = Field(default_factory=list, description="List of successfully processed post IDs")
    error_message: str = Field(default="", description="Error message if any operations failed")

class AddInstagramAudioTool(BaseTool):
    """Tool that processes Instagram videos as audio and adds them to the vector database."""
    name: str = "Add Instagram Audio to Vector DB"
    description: str = "Adds Instagram video audio to the vector database using EmbedChain"
    args_schema: Type[BaseModel] = AddInstagramAudioInput
    app: Any = Field(default=None, exclude=True)

    def __init__(self, app: App, **data):
        super().__init__(**data)
        self.app = app
        
    def _process_audio(self, audio_buffer: io.BytesIO) -> str:
        """Save audio to temporary file and return the path."""
        temp_path = f"temp_audio_{datetime.now().timestamp()}.wav"
        try:
            with open(temp_path, 'wb') as f:
                f.write(audio_buffer.getvalue())
            return temp_path
        except Exception as e:
            raise Exception(f"Error saving audio: {str(e)}")
        
    def _cleanup_temp_file(self, file_path: str):
        """Clean up temporary audio file."""
        try:
            if os.path.exists(file_path):
                os.remove(file_path)
        except Exception as e:
            logger.warning(f"Error cleaning up temporary file {file_path}: {str(e)}")

    def _run(self, video_url: str) -> AddInstagramAudioOutput:
        processed_posts = []
        errors = []
        temp_audio_path = None

        try:
            # Download video and convert to audio
            video_response = requests.get(video_url)
            video_buffer = io.BytesIO(video_response.content)
            audio = AudioSegment.from_file(video_buffer, format="mp4")
            
            # Export as WAV to memory
            audio_buffer = io.BytesIO()
            audio.export(audio_buffer, format="wav")
            audio_buffer.seek(0)
            
            # Save to temporary file
            temp_audio_path = self._process_audio(audio_buffer)
            
            # Add to embedchain with metadata
            self.app.add(
                temp_audio_path,
                data_type="audio",
                metadata={
                    "source": video_url
                }
            )
            
            processed_posts.append(video_url)
            logger.info(f"Successfully processed video: {video_url}")
            
        except Exception as e:
            error_msg = f"Error processing video {video_url}: {str(e)}"
            logger.error(error_msg)
            errors.append(error_msg)
        finally:
            # Clean up temporary file
            if temp_audio_path:
                self._cleanup_temp_file(temp_audio_path)
                
        return AddInstagramAudioOutput(
            success=len(processed_posts) > 0,
            processed_posts=processed_posts,
            error_message="; ".join(errors) if errors else ""
        )
    # def _run(self, video_url: str) -> AddInstagramAudioOutput:
    #     processed_posts = []
    #     errors = []

    #     # Filter video posts
    #     video_posts = [post for post in posts if post.is_video and post.video_url]
        
    #     for post in video_posts:
    #         temp_audio_path = None
    #         try:
    #             # Download video and convert to audio
    #             video_response = requests.get(post.video_url)
    #             video_buffer = io.BytesIO(video_response.content)
    #             audio = AudioSegment.from_file(video_buffer, format="mp4")
                
    #             # Export as WAV to memory
    #             audio_buffer = io.BytesIO()
    #             audio.export(audio_buffer, format="wav")
    #             audio_buffer.seek(0)
                
    #             # Save to temporary file
    #             temp_audio_path = self._process_audio(audio_buffer)
                
    #             # Add to embedchain with metadata
    #             self.app.add(
    #                 temp_audio_path,
    #                 data_type="audio",
    #                 metadata={
    #                     "source": post.url,
    #                     "caption": post.caption,
    #                     "timestamp": post.timestamp.isoformat(),
    #                     "likes": post.likes,
    #                     "post_id": post.post_id
    #                 }
    #             )
                
    #             processed_posts.append(post.post_id)
    #             logger.info(f"Successfully processed post: {post.post_id}")
                
    #         except Exception as e:
    #             error_msg = f"Error processing post {post.post_id}: {str(e)}"
    #             logger.error(error_msg)
    #             errors.append(error_msg)
    #         finally:
    #             # Clean up temporary file
    #             if temp_audio_path:
    #                 self._cleanup_temp_file(temp_audio_path)
                
    #     return AddInstagramAudioOutput(
    #         success=len(processed_posts) > 0,
    #         processed_posts=processed_posts,
    #         error_message="; ".join(errors) if errors else ""
    #     )
# class AddInstagramAudioTool(BaseTool):
#     """Tool that processes Instagram videos as audio and adds them to the vector database."""
#     name: str = "Add Instagram Audio to Vector DB"
#     description: str = "Adds Instagram video audio to the vector database using EmbedChain"
#     args_schema: Type[BaseModel] = AddInstagramAudioInput
#     app: Any = Field(default=None, exclude=True)

#     def __init__(self, app: App, **data):
#         super().__init__(**data)
#         self.app = app
        
#     def _process_audio(self, audio_buffer: io.BytesIO) -> str:
#         """Save audio to temporary file and return the path."""
#         temp_path = f"temp_audio_{datetime.now().timestamp()}.wav"
#         try:
#             with open(temp_path, 'wb') as f:
#                 f.write(audio_buffer.getvalue())
#             return temp_path
#         except Exception as e:
#             raise Exception(f"Error saving audio: {str(e)}")
        
#     def _cleanup_temp_file(self, file_path: str):
#         """Clean up temporary audio file."""
#         try:
#             if os.path.exists(file_path):
#                 os.remove(file_path)
#         except Exception as e:
#             logger.warning(f"Error cleaning up temporary file {file_path}: {str(e)}")

#     def _run(self, input_data: Union[dict, str]) -> AddInstagramAudioOutput:
#         try:
#             # Handle string input (JSON)
#             if isinstance(input_data, str):
#                 import json
#                 input_data = json.loads(input_data)
            
#             # Handle dict input
#             if isinstance(input_data, dict) and "posts" in input_data:
#                 posts_data = input_data["posts"]
#             else:
#                 raise ValueError("Input must contain a 'posts' key with array of post data")

#             # Convert to PostInfo objects
#             posts = [PostInfo(**post) for post in posts_data]
            
#             processed_posts = []
#             errors = []

#             # Filter video posts
#             video_posts = [post for post in posts if post.is_video and post.video_url]
            
#             for post in video_posts:
#                 temp_audio_path = None
#                 try:
#                     # Download video and convert to audio
#                     video_response = requests.get(post.video_url)
#                     video_buffer = io.BytesIO(video_response.content)
#                     audio = AudioSegment.from_file(video_buffer, format="mp4")
                    
#                     # Export as WAV to memory
#                     audio_buffer = io.BytesIO()
#                     audio.export(audio_buffer, format="wav")
#                     audio_buffer.seek(0)
                    
#                     # Save to temporary file
#                     temp_audio_path = self._process_audio(audio_buffer)
                    
#                     # Add to embedchain with metadata
#                     self.app.add(
#                         temp_audio_path,
#                         data_type="audio",
#                         metadata={
#                             "source": post.url,
#                             "description": post.description,
#                             "timestamp": post.timestamp.isoformat(),
#                             "likes": post.likes,
#                             "post_id": post.id
#                         }
#                     )
                    
#                     processed_posts.append(post.id)
#                     logger.info(f"Successfully processed post: {post.id}")
                    
#                 except Exception as e:
#                     error_msg = f"Error processing post {post.id}: {str(e)}"
#                     logger.error(error_msg)
#                     errors.append(error_msg)
#                 finally:
#                     # Clean up temporary file
#                     if temp_audio_path:
#                         self._cleanup_temp_file(temp_audio_path)
                    
#             return AddInstagramAudioOutput(
#                 success=len(processed_posts) > 0,
#                 processed_posts=processed_posts,
#                 error_message="; ".join(errors) if errors else ""
#             )
            
#         except Exception as e:
#             error_msg = f"Error processing posts: {str(e)}"
#             logger.error(error_msg)
#             return AddInstagramAudioOutput(
#                 success=False,
#                 processed_posts=[],
#                 error_message=error_msg
            # )
# class AddInstagramAudioTool(BaseTool):
#     """Tool that processes Instagram videos as audio and adds them to the vector database."""
#     name: str = "Add Instagram Audio to Vector DB"
#     description: str = "Adds Instagram video audio to the vector database using EmbedChain"
#     args_schema: Type[BaseModel] = AddInstagramAudioInput
#     app: Any = Field(default=None, exclude=True)

#     def __init__(self, app: App, **data):
#         super().__init__(**data)
#         self.app = app
        
#     def _process_audio(self, audio_buffer: io.BytesIO) -> str:
#         """Save audio to temporary file and return the path."""
#         temp_path = f"temp_audio_{datetime.now().timestamp()}.wav"
#         try:
#             with open(temp_path, 'wb') as f:
#                 f.write(audio_buffer.getvalue())
#             return temp_path
#         except Exception as e:
#             raise Exception(f"Error saving audio: {str(e)}")
        
#     def _cleanup_temp_file(self, file_path: str):
#         """Clean up temporary audio file."""
#         try:
#             if os.path.exists(file_path):
#                 os.remove(file_path)
#         except Exception as e:
#             logger.warning(f"Error cleaning up temporary file {file_path}: {str(e)}")

#     def _run(self, posts: List[PostInfo]) -> AddInstagramAudioOutput:
#         processed_posts = []
#         errors = []

#         # Filter video posts
#         video_posts = [post for post in posts if post.is_video and post.video_url]
        
#         for post in video_posts:
#             temp_audio_path = None
#             try:
#                 # Download video and convert to audio
#                 video_response = requests.get(post.video_url)
#                 video_buffer = io.BytesIO(video_response.content)
#                 audio = AudioSegment.from_file(video_buffer, format="mp4")
                
#                 # Export as WAV to memory
#                 audio_buffer = io.BytesIO()
#                 audio.export(audio_buffer, format="wav")
#                 audio_buffer.seek(0)
                
#                 # Save to temporary file
#                 temp_audio_path = self._process_audio(audio_buffer)
                
#                 # Add to embedchain with metadata
#                 self.app.add(
#                     temp_audio_path,
#                     data_type="audio",
#                     metadata={
#                         "source": post.url,
#                         "caption": post.caption,
#                         "timestamp": post.timestamp.isoformat(),
#                         "likes": post.likes,
#                         "post_id": post.post_id
#                     }
#                 )
                
#                 processed_posts.append(post.post_id)
#                 logger.info(f"Successfully processed post: {post.post_id}")
                
#             except Exception as e:
#                 error_msg = f"Error processing post {post.post_id}: {str(e)}"
#                 logger.error(error_msg)
#                 errors.append(error_msg)
#             finally:
#                 # Clean up temporary file
#                 if temp_audio_path:
#                     self._cleanup_temp_file(temp_audio_path)
                
#         return AddInstagramAudioOutput(
#             success=len(processed_posts) > 0,
#             processed_posts=processed_posts,
#             error_message="; ".join(errors) if errors else ""
#         )
    
class QueryDatabaseInput(BaseModel):
    """Input for QueryDatabase."""
    query: str = Field(..., description="The query to search the vector DB.")

class QueryDatabaseOutput(BaseModel):
    """Output from QueryDatabase."""
    reply: str = Field(..., description="The reply from the query")
    error_message: str = Field(default="", description="Error message if the operation failed")

class QueryDatabaseTool(BaseTool):
    name: str = "Query Database"
    description: str = "Queries the vector database containing processed Instagram content"
    args_schema: Type[BaseModel] = QueryDatabaseInput
    app: Any = Field(default=None, exclude=True)

    def __init__(self, app: App, **data):
        super().__init__(**data)
        self.app = app

    def _run(self, query: str) -> QueryDatabaseOutput:
        try:
            logger.info(f"Querying vector DB with: {query}")
            reply = self.app.query(
                f"""Please analyze the following query about the Instagram content: {query}
                Focus on providing specific examples and quotes from the posts."""
            )
            logger.info("Query completed successfully")
            
            if not reply or (isinstance(reply, str) and not reply.strip()):
                return QueryDatabaseOutput(
                    reply="No relevant content found in the processed posts.",
                    error_message="Empty response from database"
                )
            
            return QueryDatabaseOutput(reply=reply)
                
        except Exception as e:
            error_message = f"Failed to query vector DB: {str(e)}"
            logger.error(error_message)
            return QueryDatabaseOutput(
                reply="Error occurred",
                error_message=error_message
            )

In [4]:

def main():
    # Get app instance
    app_instance = get_app_instance()
    
    # Initialize tools with app instance
    fetch_tool = FetchInstagramPostsTool()
    process_tool = AddInstagramAudioTool(app=app_instance)
    query_tool = QueryDatabaseTool(app=app_instance)
    
    # Define list of queries
    queries = [
        # "Tell me about the content of these videos",
        "What are the values/lessons that the author transmits throughout the content that makes him likeable, trustworthy?",
        "What are the setbacks, failures that the author encountered and learnt from?",
        "What are the key achievements that the author achieved that gives him more credibility and successful?",
        "What are the life events/circumstances that shaped the character of the author throughout his life?",
        "What was the context/environment that triggered the author to decide to change his life around and launch this business?",
        "What is the context within which the author started his business?",
        "What is the first name, last name of author of the content? If looking for contact details.",
        "What are the tips mentioned in the content that I can implement to grow my business?",
        "What are the key characteristics of the business of the author and the services I can buy from him?"
    ]
    
    try:
        # Fetch posts
        instagram_username = "antoineblanco99"
        print(f"Fetching posts from {instagram_username}...")
        fetch_result = fetch_tool.run(instagram_username=instagram_username)
        
        if fetch_result.success:
            print(f"Successfully fetched {len(fetch_result.posts)} posts")
            
            # Process videos one by one
            print("Processing videos...")
            processed_count = 0
            processed_posts = []
            
            for post in fetch_result.posts:
                if post.is_video and post.video_url:
                    try:
                        # Process each video individually
                        process_result = process_tool.run(video_url=post.video_url)
                        
                        if process_result.success:
                            processed_count += 1
                            processed_posts.extend(process_result.processed_posts)
                    except Exception as e:
                        print(f"Error processing video {post.post_id}: {str(e)}")
                        continue
            
            if processed_count > 0:
                print(f"Successfully processed {processed_count} videos")
                print("Processed posts:", processed_posts)
                
                # Query the database with multiple queries
                print("\nQuerying database...")
                for i, query in enumerate(queries, 1):
                    print(f"\nQuery {i}: {query}")
                    print("-" * 50)
                    
                    query_result = query_tool.run(query=query)
                    
                    if query_result.error_message:
                        print(f"Query error: {query_result.error_message}")
                    else:
                        print(f"Response: {query_result.reply}")
                    
                    # Add spacing between queries
                    print("\n")
            else:
                print("No videos were successfully processed")
        else:
            print(f"Fetching failed: {fetch_result.error_message}")
            
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        import traceback
        print("\nFull traceback:")
        traceback.print_exc()

if __name__ == "__main__":
    main()

Fetching posts from antoineblanco99...
Using Tool: Fetch Instagram Posts
Successfully fetched 4 posts
Processing videos...
Using Tool: Add Instagram Audio to Vector DB
Using Tool: Add Instagram Audio to Vector DB
Successfully processed 2 videos
Processed posts: ['https://scontent.cdninstagram.com/o1/v/t16/f1/m82/404E0230BEB5F97412CAD09DCF19BFA0_video_dashinit.mp4?efg=eyJ4cHZfYXNzZXRfaWQiOjc1NzAyMzMxOTg3NzYyMywidmVuY29kZV90YWciOiJ4cHZfcHJvZ3Jlc3NpdmUuSU5TVEFHUkFNLkNMSVBTLkMzLjcyMC5kYXNoX2Jhc2VsaW5lXzFfdjEifQ&_nc_ht=scontent.cdninstagram.com&_nc_cat=107&vs=e3a6c194f943025f&_nc_vs=HBksFQIYT2lnX3hwdl9yZWVsc19wZXJtYW5lbnRfcHJvZC80MDRFMDIzMEJFQjVGOTc0MTJDQUQwOURDRjE5QkZBMF92aWRlb19kYXNoaW5pdC5tcDQVAALIAQAVAhg6cGFzc3Rocm91Z2hfZXZlcnN0b3JlL0dERUZPeHFNMk9kNGwyOEJBSmxVLVdGY19wYzVicV9FQUFBRhUCAsgBACgAGAAbAogHdXNlX29pbAExEnByb2dyZXNzaXZlX3JlY2lwZQExFQAAJu6fi6HHoNgCFQIoAkMzLBdAZNRBiTdLxxgSZGFzaF9iYXNlbGluZV8xX3YxEQB1_gcA&ccb=9-4&oh=00_AYB-Ebl_oIxTF_x7NYDQpykf2igwEQbeEn7HyL-gQmHI3A&oe=671D749F&_nc_s



Response: **Understanding Authenticity and Transparency**

**Claim:** One of the core values that the author transmits through his Instagram content is authenticity and transparency.

**Backstory/Context:** In the world of social media, where curated perfection often reigns, the author stands out by sharing genuine insights into his life and business. This authenticity builds trust with his audience, making him more relatable and likeable.

**Anecdote/Story:** In one post, the author shares a behind-the-scenes look at a failed project, discussing what went wrong and what he learned from the experience. He writes, "Failure is just another step towards success. Here's what I learned from my recent setback..." This openness about failure not only humanizes him but also provides valuable lessons to his followers.

**Actionable Advice:**
1. **Be Honest:** Share both successes and failures. This builds credibility and trust with your audience.
2. **Reflect on Lessons Learned:** When discussi



Response: **Claim: Embracing Setbacks as Learning Opportunities**

**Backstory/Context:** The author of the Instagram content often shares personal and professional experiences, highlighting the importance of setbacks as a natural part of growth. These experiences are shared to inspire and motivate the audience to view failures not as endpoints but as stepping stones.

**Anecdote/Story:** In one of the posts, the author recounts a failed business venture that initially seemed promising. Despite thorough research and planning, the market response was lukewarm, leading to financial losses. The author shared a quote from this experience: "Failure is simply the opportunity to begin again, this time more intelligently."

**Actionable Advice:**
1. **Reflect on Failures:** After encountering a setback, take time to analyze what went wrong and what could be improved. This reflection can provide valuable insights for future endeavors.
2. **Adjust Strategies:** Use the lessons learned from failu



Response: **Translation of the Query:**

Veuillez analyser la question suivante concernant le contenu Instagram : Quels sont les principaux accomplissements que l'auteur a réalisés qui lui confèrent plus de crédibilité et de succès ? Concentrez-vous sur des exemples spécifiques et des citations des publications.

---

**Key Achievements and Credibility**

1. **Professional Milestones**

   **Claim:** The author has achieved significant professional milestones that enhance his credibility.

   **Backstory/Context:** The author has shared various posts highlighting his journey and accomplishments in his professional field. These achievements often serve as proof of his expertise and dedication.

   **Anecdote/Story:** In one post, the author shared a story about a major project he led that resulted in a significant increase in company revenue. He detailed the strategies he implemented and the challenges he overcame.

   **Actionable Advice:**
   - Document your professional achievements 



Response: Pour analyser la question concernant le contenu Instagram et les événements de vie qui ont façonné le caractère de l'auteur, nous devons d'abord identifier les moments clés et les circonstances partagées par l'auteur sur sa plateforme. Voici une analyse détaillée :

### 1. **Claim: Importance of Overcoming Adversity**

**Backstory/Context:**  
L'auteur a souvent partagé des moments de sa vie où il a dû surmonter des défis personnels et professionnels. Ces expériences ont été cruciales pour développer sa résilience et sa détermination.

**Anecdote/Story:**  
Dans un post Instagram, l'auteur a raconté comment il a perdu son premier emploi à cause d'une récession économique. Plutôt que de se laisser abattre, il a utilisé cette expérience pour se réinventer et lancer sa propre entreprise.

**Actionable Advice:**  
- Lorsque vous faites face à un revers, utilisez-le comme une opportunité pour réévaluer vos objectifs et stratégies.
- Développez une mentalité de croissance en cherch



Response: **Context and Environment Triggering Change**

**Claim:** The author decided to change his life and launch his business due to a combination of personal dissatisfaction and a desire for greater fulfillment.

**Backstory/Context:** The author often expressed feelings of being stuck in a monotonous routine and lacking passion in his previous job. He frequently shared posts about the importance of pursuing one's dreams and not settling for less.

**Anecdote/Story:** In one Instagram post, the author shared a photo of himself looking out of an office window with the caption, "Every day felt like I was living someone else's life. It was time to reclaim my own." This moment of realization was pivotal in his decision to pursue a business that aligned with his passions.

**Actionable Advice:**
1. **Self-Reflection:** Regularly assess your current situation and identify areas where you feel unfulfilled. This can help pinpoint what changes are necessary.
2. **Vision Board:** Create a v



Response: **Understanding the Context of Starting the Business**

**Claim:** The author began his business in response to a personal passion and a gap he identified in the market.

**Backstory/Context:** The author was deeply passionate about sustainable fashion and noticed a lack of affordable, eco-friendly clothing options. This realization came during a period when environmental consciousness was gaining traction globally, and consumers were becoming more aware of their ecological footprint.

**Anecdote/Story:** In one of his Instagram posts, the author shared a story about a trip to a local market where he struggled to find clothing that aligned with his values of sustainability and style. This experience was pivotal, as it highlighted the market gap and inspired him to create a brand that offered stylish, sustainable clothing at an accessible price point.

**Actionable Advice:**
1. Identify a personal passion that aligns with a market need.
2. Observe and analyze current market tr



Response: Je suis désolé, mais je ne peux pas fournir d'informations personnelles telles que le prénom, le nom de famille ou les coordonnées de l'auteur du contenu Instagram. Cependant, je peux vous aider à analyser le contenu lui-même pour en tirer des leçons pratiques et des conseils applicables.

### Analyse du contenu Instagram

#### 1. Comprendre le message principal
- **Contexte**: Chaque post sur Instagram a un message ou un thème central. Il peut s'agir de motivation, de conseils professionnels, de style de vie, etc.
- **Exemple**: Si un post parle de l'importance de la persévérance, l'auteur pourrait partager une citation inspirante ou une histoire personnelle sur un défi surmonté.
- **Conseils pratiques**: 
  - Identifiez le thème principal de chaque post.
  - Réfléchissez à comment ce message peut s'appliquer à votre propre vie.
  - Notez les citations ou idées qui résonnent particulièrement avec vous.

#### 2. Analyser l'engagement et l'interaction
- **Contexte**: L'engagem



Response: **Translation of Query to French:**

Veuillez analyser la requête suivante concernant le contenu Instagram : Quels sont les conseils mentionnés dans le contenu que je peux mettre en œuvre pour développer mon entreprise ? Concentrez-vous sur la fourniture d'exemples spécifiques et de citations des publications.

**Analysis and Answer:**

1. **Engagement with Audience:**
   - **Claim:** Engaging directly with your audience is crucial for business growth on Instagram.
   - **Backstory/Context:** The protagonist often shares stories about how personal interaction with followers has led to increased brand loyalty and sales.
   - **Anecdote/Story:** In one post, the protagonist shared a story about responding to every comment and direct message during a product launch, which resulted in a 20% increase in sales.
   - **Actionable Advice:**
     1. Respond to comments and messages promptly to build a rapport with your audience.
     2. Use Instagram Stories to conduct Q&A sessions, a



Response: **Understanding the Author's Business Characteristics**

1. **Authenticity and Personal Branding**
   - **Claim:** The author's business is deeply rooted in authenticity and personal branding.
   - **Backstory/Context:** The author often shares personal stories and experiences on Instagram, creating a genuine connection with the audience. This approach not only humanizes the brand but also builds trust.
   - **Anecdote/Story:** In one post, the author shared a behind-the-scenes look at their daily routine, emphasizing transparency and relatability. This post resonated with followers, as evidenced by the high engagement and positive comments.
   - **Actionable Advice:**
     - Share personal stories or experiences that align with your brand values to build trust with your audience.
     - Use Instagram Stories to give a behind-the-scenes look at your business operations.
     - Engage with your audience by responding to comments and messages to foster a sense of community.

2.

In [1]:

from typing import Any, Type, List, Union
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import logging
from dotenv import load_dotenv
import os

import tempfile
import logging
from embedchain import App
from typing import Any, Type, List
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import os
import requests
import io
from pydub import AudioSegment
from deepgram import Deepgram

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
# Set up Deepgram
os.environ["DEEPGRAM_API_KEY"] = os.environ["DEEPGRAM_API_KEY"]
# deepgram = Deepgram(DEEPGRAM_API_KEY)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PostInfo(BaseModel):
    """Instagram post information."""
    post_id: str
    caption: str
    timestamp: datetime
    likes: int
    url: str
    is_video: bool
    video_url: str = ""

    class Config:
        arbitrary_types_allowed = True

# class PostInfo(BaseModel):
#     """Instagram post information."""
#     post_id: str
#     caption: str
#     timestamp: datetime
#     likes: int
#     url: str
#     is_video: bool
#     video_url: str = ""

#     class Config:
#         arbitrary_types_allowed = True

from typing import Any, Type, List
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import logging
from dotenv import load_dotenv
import os

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FetchInstagramPostsInput(BaseModel):
    """Input for FetchInstagramPosts."""
    instagram_username: str = Field(..., description="The Instagram username to fetch posts from")

class FetchInstagramPostsOutput(BaseModel):
    """Output containing list of Instagram posts."""
    posts: List[PostInfo]
    success: bool = True
    error_message: str = ""

class FetchInstagramPostsTool(BaseTool):
    """Tool that fetches Instagram posts from a specified account."""
    name: str = "Fetch Instagram Posts"
    description: str = "Fetches the latest posts from a specified Instagram account"
    args_schema: Type[BaseModel] = FetchInstagramPostsInput
    insta_loader: Any = Field(default=None, exclude=True)
    
    def _get_instaloader_instance(self):
        """Get or create a shared Instaloader instance."""
        if not self.insta_loader:
            self.insta_loader = instaloader.Instaloader()
            try:
                username = os.getenv("INSTAGRAM_USERNAME", "the_smart_funnel")
                password = os.getenv("INSTAGRAM_PASSWORD", "Firescan2024+")
                
                self.insta_loader.login(username, password)
                logger.info("Successfully logged in to Instagram")
            except Exception as e:
                logger.error(f"Failed to login to Instagram: {str(e)}")
                raise
        return self.insta_loader

    def _run(self, instagram_username: str) -> FetchInstagramPostsOutput:
        try:
            logger.info(f"Fetching posts for user: {instagram_username}")
            
            # Get Instagram instance
            loader = self._get_instaloader_instance()
            
            # Get profile and posts
            profile = instaloader.Profile.from_username(loader.context, instagram_username)
            posts = []
            post_count = 0
            
            for post in profile.get_posts():
                try:
                    post_info = PostInfo(
                        post_id=post.shortcode,
                        caption=post.caption if post.caption else "",
                        timestamp=post.date_utc,
                        likes=post.likes,
                        url=f"https://www.instagram.com/p/{post.shortcode}/",
                        is_video=post.is_video,
                        video_url=post.video_url if post.is_video else ""
                    )
                    posts.append(post_info)
                    post_count += 1
                    logger.info(f"Processed post {post.shortcode}")

                    if post_count >= 6:
                        break
                        
                except Exception as post_error:
                    logger.error(f"Error processing individual post: {str(post_error)}")
                    continue

            if not posts:
                return FetchInstagramPostsOutput(
                    posts=[],
                    success=False,
                    error_message="No posts were successfully fetched"
                )

            logger.info(f"Successfully fetched {len(posts)} posts")
            return FetchInstagramPostsOutput(
                posts=posts,
                success=True,
                error_message=""
            )

        except Exception as e:
            error_message = f"Error fetching Instagram posts: {str(e)}"
            logger.error(error_message)
            return FetchInstagramPostsOutput(
                posts=[],
                success=False,
                error_message=error_message
            )
        
    def _handle_error(self, error: Exception) -> str:
        """Handle errors that occur during tool execution."""
        error_message = str(error)
        if "login" in error_message.lower():
            return "Failed to authenticate with Instagram. Please check your credentials."
        elif "not found" in error_message.lower():
            return f"Instagram profile not found. Please check the username."
        elif "rate limit" in error_message.lower():
            return "Instagram rate limit reached. Please try again later."
        else:
            return f"An error occurred: {error_message}"

class AddInstagramAudioInput(BaseModel):
    """Input for AddInstagramAudio."""
    # posts: List[PostInfo] = Field(..., description="The Instagram posts to process")
    video_url: str = Field(..., description="The URL of the Instagram video to process")

class AddInstagramAudioOutput(BaseModel):
    """Output from AddInstagramAudio."""
    success: bool = Field(..., description="Whether the audio was successfully added to the vector DB")
    processed_posts: List[str] = Field(default_factory=list, description="List of successfully processed post IDs")
    error_message: str = Field(default="", description="Error message if any operations failed")

class AddInstagramAudioTool(BaseTool):
    """Tool that processes Instagram videos as audio and adds them to the vector database."""
    name: str = "Add Instagram Audio to Vector DB"
    description: str = "Adds Instagram video audio to the vector database using EmbedChain"
    args_schema: Type[BaseModel] = AddInstagramAudioInput
    app: Any = Field(default=None, exclude=True)

    def __init__(self, app: App, **data):
        super().__init__(**data)
        self.app = app

    def _process_audio(self, audio_buffer: io.BytesIO) -> str:
        """Save audio to temporary file and return the path."""
        temp_path = f"temp_audio_{datetime.now().timestamp()}.wav"
        try:
            with open(temp_path, 'wb') as f:
                f.write(audio_buffer.getvalue())
            return temp_path
        except Exception as e:
            raise Exception(f"Error saving audio: {str(e)}")

    def _cleanup_temp_file(self, file_path: str):
        """Clean up temporary audio file."""
        try:
            if os.path.exists(file_path):
                os.remove(file_path)
        except Exception as e:
            logger.warning(f"Error cleaning up temporary file {file_path}: {str(e)}")

    def _run(self, video_url: str) -> AddInstagramAudioOutput:
        processed_posts = []
        errors = []
        temp_audio_path = None

        try:
            # Simple download with standard headers
            response = requests.get(video_url, timeout=30)
            if response.status_code != 200:
                raise Exception(f"Failed to download video: Status code {response.status_code}")
            
            # Process the video data
            video_buffer = io.BytesIO(response.content)
            try:
                audio = AudioSegment.from_file(video_buffer, format="mp4")
            except Exception as e:
                raise Exception(f"Error processing video file: {str(e)}")
            
            # Export as WAV to memory
            audio_buffer = io.BytesIO()
            audio.export(audio_buffer, format="wav")
            audio_buffer.seek(0)
            
            # Save to temporary file
            temp_audio_path = self._process_audio(audio_buffer)
            
            # Add to embedchain with metadata
            self.app.add(
                temp_audio_path,
                data_type="audio",
                metadata={
                    "source": video_url,
                    "timestamp": datetime.now().isoformat()
                }
            )
            
            processed_posts.append(video_url)
            logger.info(f"Successfully processed video: {video_url}")
            
        except Exception as e:
            error_msg = f"Error processing video {video_url}: {str(e)}"
            logger.error(error_msg)
            errors.append(error_msg)
        finally:
            # Clean up temporary file
            if temp_audio_path:
                self._cleanup_temp_file(temp_audio_path)
                
        return AddInstagramAudioOutput(
            success=len(processed_posts) > 0,
            processed_posts=processed_posts,
            error_message="; ".join(errors) if errors else ""
        )

# class AddInstagramAudioTool(BaseTool):
#     """Tool that processes Instagram videos as audio and adds them to the vector database."""
#     name: str = "Add Instagram Audio to Vector DB"
#     description: str = "Adds Instagram video audio to the vector database using EmbedChain"
#     args_schema: Type[BaseModel] = AddInstagramAudioInput
#     app: Any = Field(default=None, exclude=True)

#     def __init__(self, app: App, **data):
#         super().__init__(**data)
#         self.app = app
        
#     def _process_audio(self, audio_buffer: io.BytesIO) -> str:
#         """Save audio to temporary file and return the path."""
#         temp_path = f"temp_audio_{datetime.now().timestamp()}.wav"
#         try:
#             with open(temp_path, 'wb') as f:
#                 f.write(audio_buffer.getvalue())
#             return temp_path
#         except Exception as e:
#             raise Exception(f"Error saving audio: {str(e)}")
        
#     def _cleanup_temp_file(self, file_path: str):
#         """Clean up temporary audio file."""
#         try:
#             if os.path.exists(file_path):
#                 os.remove(file_path)
#         except Exception as e:
#             logger.warning(f"Error cleaning up temporary file {file_path}: {str(e)}")

#     def _run(self, video_url: str) -> AddInstagramAudioOutput:
#         processed_posts = []
#         errors = []
#         temp_audio_path = None

#         try:
#             # Download video and convert to audio
#             video_response = requests.get(video_url)
#             video_buffer = io.BytesIO(video_response.content)
#             audio = AudioSegment.from_file(video_buffer, format="mp4")
            
#             # Export as WAV to memory
#             audio_buffer = io.BytesIO()
#             audio.export(audio_buffer, format="wav")
#             audio_buffer.seek(0)
            
#             # Save to temporary file
#             temp_audio_path = self._process_audio(audio_buffer)
            
#             # Add to embedchain with metadata
#             self.app.add(
#                 temp_audio_path,
#                 data_type="audio",
#                 metadata={
#                     "source": video_url
#                 }
#             )
            
#             processed_posts.append(video_url)
#             logger.info(f"Successfully processed video: {video_url}")
            
#         except Exception as e:
#             error_msg = f"Error processing video {video_url}: {str(e)}"
#             logger.error(error_msg)
#             errors.append(error_msg)
#         finally:
#             # Clean up temporary file
#             if temp_audio_path:
#                 self._cleanup_temp_file(temp_audio_path)
                
#         return AddInstagramAudioOutput(
#             success=len(processed_posts) > 0,
#             processed_posts=processed_posts,
#             error_message="; ".join(errors) if errors else ""
#         )
    # def _run(self, video_url: str) -> AddInstagramAudioOutput:
    #     processed_posts = []
    #     errors = []

    #     # Filter video posts
    #     video_posts = [post for post in posts if post.is_video and post.video_url]
        
    #     for post in video_posts:
    #         temp_audio_path = None
    #         try:
    #             # Download video and convert to audio
    #             video_response = requests.get(post.video_url)
    #             video_buffer = io.BytesIO(video_response.content)
    #             audio = AudioSegment.from_file(video_buffer, format="mp4")
                
    #             # Export as WAV to memory
    #             audio_buffer = io.BytesIO()
    #             audio.export(audio_buffer, format="wav")
    #             audio_buffer.seek(0)
                
    #             # Save to temporary file
    #             temp_audio_path = self._process_audio(audio_buffer)
                
    #             # Add to embedchain with metadata
    #             self.app.add(
    #                 temp_audio_path,
    #                 data_type="audio",
    #                 metadata={
    #                     "source": post.url,
    #                     "caption": post.caption,
    #                     "timestamp": post.timestamp.isoformat(),
    #                     "likes": post.likes,
    #                     "post_id": post.post_id
    #                 }
    #             )
                
    #             processed_posts.append(post.post_id)
    #             logger.info(f"Successfully processed post: {post.post_id}")
                
    #         except Exception as e:
    #             error_msg = f"Error processing post {post.post_id}: {str(e)}"
    #             logger.error(error_msg)
    #             errors.append(error_msg)
    #         finally:
    #             # Clean up temporary file
    #             if temp_audio_path:
    #                 self._cleanup_temp_file(temp_audio_path)
                
    #     return AddInstagramAudioOutput(
    #         success=len(processed_posts) > 0,
    #         processed_posts=processed_posts,
    #         error_message="; ".join(errors) if errors else ""
    #     )
# class AddInstagramAudioTool(BaseTool):
#     """Tool that processes Instagram videos as audio and adds them to the vector database."""
#     name: str = "Add Instagram Audio to Vector DB"
#     description: str = "Adds Instagram video audio to the vector database using EmbedChain"
#     args_schema: Type[BaseModel] = AddInstagramAudioInput
#     app: Any = Field(default=None, exclude=True)

#     def __init__(self, app: App, **data):
#         super().__init__(**data)
#         self.app = app
        
#     def _process_audio(self, audio_buffer: io.BytesIO) -> str:
#         """Save audio to temporary file and return the path."""
#         temp_path = f"temp_audio_{datetime.now().timestamp()}.wav"
#         try:
#             with open(temp_path, 'wb') as f:
#                 f.write(audio_buffer.getvalue())
#             return temp_path
#         except Exception as e:
#             raise Exception(f"Error saving audio: {str(e)}")
        
#     def _cleanup_temp_file(self, file_path: str):
#         """Clean up temporary audio file."""
#         try:
#             if os.path.exists(file_path):
#                 os.remove(file_path)
#         except Exception as e:
#             logger.warning(f"Error cleaning up temporary file {file_path}: {str(e)}")

#     def _run(self, input_data: Union[dict, str]) -> AddInstagramAudioOutput:
#         try:
#             # Handle string input (JSON)
#             if isinstance(input_data, str):
#                 import json
#                 input_data = json.loads(input_data)
            
#             # Handle dict input
#             if isinstance(input_data, dict) and "posts" in input_data:
#                 posts_data = input_data["posts"]
#             else:
#                 raise ValueError("Input must contain a 'posts' key with array of post data")

#             # Convert to PostInfo objects
#             posts = [PostInfo(**post) for post in posts_data]
            
#             processed_posts = []
#             errors = []

#             # Filter video posts
#             video_posts = [post for post in posts if post.is_video and post.video_url]
            
#             for post in video_posts:
#                 temp_audio_path = None
#                 try:
#                     # Download video and convert to audio
#                     video_response = requests.get(post.video_url)
#                     video_buffer = io.BytesIO(video_response.content)
#                     audio = AudioSegment.from_file(video_buffer, format="mp4")
                    
#                     # Export as WAV to memory
#                     audio_buffer = io.BytesIO()
#                     audio.export(audio_buffer, format="wav")
#                     audio_buffer.seek(0)
                    
#                     # Save to temporary file
#                     temp_audio_path = self._process_audio(audio_buffer)
                    
#                     # Add to embedchain with metadata
#                     self.app.add(
#                         temp_audio_path,
#                         data_type="audio",
#                         metadata={
#                             "source": post.url,
#                             "description": post.description,
#                             "timestamp": post.timestamp.isoformat(),
#                             "likes": post.likes,
#                             "post_id": post.id
#                         }
#                     )
                    
#                     processed_posts.append(post.id)
#                     logger.info(f"Successfully processed post: {post.id}")
                    
#                 except Exception as e:
#                     error_msg = f"Error processing post {post.id}: {str(e)}"
#                     logger.error(error_msg)
#                     errors.append(error_msg)
#                 finally:
#                     # Clean up temporary file
#                     if temp_audio_path:
#                         self._cleanup_temp_file(temp_audio_path)
                    
#             return AddInstagramAudioOutput(
#                 success=len(processed_posts) > 0,
#                 processed_posts=processed_posts,
#                 error_message="; ".join(errors) if errors else ""
#             )
            
#         except Exception as e:
#             error_msg = f"Error processing posts: {str(e)}"
#             logger.error(error_msg)
#             return AddInstagramAudioOutput(
#                 success=False,
#                 processed_posts=[],
#                 error_message=error_msg
            # )
# class AddInstagramAudioTool(BaseTool):
#     """Tool that processes Instagram videos as audio and adds them to the vector database."""
#     name: str = "Add Instagram Audio to Vector DB"
#     description: str = "Adds Instagram video audio to the vector database using EmbedChain"
#     args_schema: Type[BaseModel] = AddInstagramAudioInput
#     app: Any = Field(default=None, exclude=True)

#     def __init__(self, app: App, **data):
#         super().__init__(**data)
#         self.app = app
        
#     def _process_audio(self, audio_buffer: io.BytesIO) -> str:
#         """Save audio to temporary file and return the path."""
#         temp_path = f"temp_audio_{datetime.now().timestamp()}.wav"
#         try:
#             with open(temp_path, 'wb') as f:
#                 f.write(audio_buffer.getvalue())
#             return temp_path
#         except Exception as e:
#             raise Exception(f"Error saving audio: {str(e)}")
        
#     def _cleanup_temp_file(self, file_path: str):
#         """Clean up temporary audio file."""
#         try:
#             if os.path.exists(file_path):
#                 os.remove(file_path)
#         except Exception as e:
#             logger.warning(f"Error cleaning up temporary file {file_path}: {str(e)}")

#     def _run(self, posts: List[PostInfo]) -> AddInstagramAudioOutput:
#         processed_posts = []
#         errors = []

#         # Filter video posts
#         video_posts = [post for post in posts if post.is_video and post.video_url]
        
#         for post in video_posts:
#             temp_audio_path = None
#             try:
#                 # Download video and convert to audio
#                 video_response = requests.get(post.video_url)
#                 video_buffer = io.BytesIO(video_response.content)
#                 audio = AudioSegment.from_file(video_buffer, format="mp4")
                
#                 # Export as WAV to memory
#                 audio_buffer = io.BytesIO()
#                 audio.export(audio_buffer, format="wav")
#                 audio_buffer.seek(0)
                
#                 # Save to temporary file
#                 temp_audio_path = self._process_audio(audio_buffer)
                
#                 # Add to embedchain with metadata
#                 self.app.add(
#                     temp_audio_path,
#                     data_type="audio",
#                     metadata={
#                         "source": post.url,
#                         "caption": post.caption,
#                         "timestamp": post.timestamp.isoformat(),
#                         "likes": post.likes,
#                         "post_id": post.post_id
#                     }
#                 )
                
#                 processed_posts.append(post.post_id)
#                 logger.info(f"Successfully processed post: {post.post_id}")
                
#             except Exception as e:
#                 error_msg = f"Error processing post {post.post_id}: {str(e)}"
#                 logger.error(error_msg)
#                 errors.append(error_msg)
#             finally:
#                 # Clean up temporary file
#                 if temp_audio_path:
#                     self._cleanup_temp_file(temp_audio_path)
                
#         return AddInstagramAudioOutput(
#             success=len(processed_posts) > 0,
#             processed_posts=processed_posts,
#             error_message="; ".join(errors) if errors else ""
#         )
    
class QueryDatabaseInput(BaseModel):
    """Input for QueryDatabase."""
    query: str = Field(..., description="The query to search the vector DB.")

class QueryDatabaseOutput(BaseModel):
    """Output from QueryDatabase."""
    reply: str = Field(..., description="The reply from the query")
    error_message: str = Field(default="", description="Error message if the operation failed")

class QueryDatabaseTool(BaseTool):
    name: str = "Query Database"
    description: str = "Queries the vector database containing processed Instagram content"
    args_schema: Type[BaseModel] = QueryDatabaseInput
    app: Any = Field(default=None, exclude=True)

    def __init__(self, app: App, **data):
        super().__init__(**data)
        self.app = app

    def _run(self, query: str) -> QueryDatabaseOutput:
        try:
            logger.info(f"Querying vector DB with: {query}")
            reply = self.app.query(
                f"""Please analyze the following query about the Instagram content: {query}
                Focus on providing specific examples and quotes from the posts."""
            )
            logger.info("Query completed successfully")
            
            if not reply or (isinstance(reply, str) and not reply.strip()):
                return QueryDatabaseOutput(
                    reply="No relevant content found in the processed posts.",
                    error_message="Empty response from database"
                )
            
            return QueryDatabaseOutput(reply=reply)
                
        except Exception as e:
            error_message = f"Failed to query vector DB: {str(e)}"
            logger.error(error_message)
            return QueryDatabaseOutput(
                reply="Error occurred",
                error_message=error_message
            )

In [2]:
def main():
    from smartfunnel.tools.chroma_db_init import get_app_instance
    # Get app instance
    app_instance = get_app_instance()
    
    # Initialize tools with app instance
    fetch_tool = FetchInstagramPostsTool()
    process_tool = AddInstagramAudioTool(app=app_instance)
    query_tool = QueryDatabaseTool(app=app_instance)
    
    # Define list of queries
    queries = [
        "What are the values/lessons that the author transmits throughout the content that makes him likeable, trustworthy?",
        # ... other queries ...
    ]
    
    try:
        # Fetch posts with authenticated URLs
        instagram_username = "antoineblanco99"
        print(f"Fetching posts from {instagram_username}...")
        fetch_result = fetch_tool.run(instagram_username=instagram_username)
        
        if not fetch_result.success:
            raise Exception(f"Fetching failed: {fetch_result.error_message}")
            
        print(f"Successfully fetched {len(fetch_result.posts)} posts")
        
        # Process videos one by one
        print("Processing videos...")
        processed_count = 0
        processed_posts = []
        
        for post in fetch_result.posts:
            if post.is_video and post.video_url:
                try:
                    process_result = process_tool.run(video_url=post.video_url)
                    if process_result.success:
                        processed_count += 1
                        processed_posts.extend(process_result.processed_posts)
                except Exception as e:
                    print(f"Error processing video {post.video_url}: {str(e)}")
                    continue
        
        if processed_count == 0:
            print("No videos were successfully processed")
            return
            
        print(f"Successfully processed {processed_count} videos")
        
        # Query the database
        print("\nQuerying database...")
        for i, query in enumerate(queries, 1):
            print(f"\nQuery {i}: {query}")
            print("-" * 50)
            
            query_result = query_tool.run(query=query)
            if query_result.error_message:
                print(f"Query error: {query_result.error_message}")
            else:
                print(f"Response: {query_result.reply}")
            
            print("\n")
            
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        import traceback
        print("\nFull traceback:")
        traceback.print_exc()

if __name__ == "__main__":
    main()

Fetching posts from antoineblanco99...
Using Tool: Fetch Instagram Posts
Successfully fetched 6 posts
Processing videos...
Using Tool: Add Instagram Audio to Vector DB
Using Tool: Add Instagram Audio to Vector DB
Using Tool: Add Instagram Audio to Vector DB
Using Tool: Add Instagram Audio to Vector DB
Successfully processed 4 videos

Querying database...

Query 1: What are the values/lessons that the author transmits throughout the content that makes him likeable, trustworthy?
--------------------------------------------------
Using Tool: Query Database




Response: **Claim: Authenticity and Relatability**

**Backstory/Context:**  
The author often shares personal stories and experiences that resonate with a wide audience. This approach helps build a connection with followers, making the content feel genuine and relatable.

**Anecdote/Story:**  
In one post, the author shared a story about a personal failure and how it led to a significant learning experience. This vulnerability not only humanizes the author but also encourages followers to see failure as a stepping stone rather than a setback.

**Actionable Advice:**  
1. Share personal stories that highlight both successes and failures. This transparency can build trust with your audience.
2. Engage with your audience by asking them to share their own experiences related to the topic.
3. Use storytelling to convey lessons learned, making the content more engaging and memorable.

**Claim: Consistent Value Delivery**

**Backstory/Context:**  
The author consistently provides valuable ins

In [1]:

from typing import Any, Type, List, Union
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import logging
from dotenv import load_dotenv
import os

import tempfile
import logging
from embedchain import App
from typing import Any, Type, List
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import os
import requests
import io
from pydub import AudioSegment
from deepgram import Deepgram

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
# Set up Deepgram
os.environ["DEEPGRAM_API_KEY"] = os.environ["DEEPGRAM_API_KEY"]
# deepgram = Deepgram(DEEPGRAM_API_KEY)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PostInfo(BaseModel):
    """Instagram post information."""
    post_id: str
    caption: str
    timestamp: datetime
    likes: int
    url: str
    is_video: bool
    video_url: str = ""

    class Config:
        arbitrary_types_allowed = True

# class PostInfo(BaseModel):
#     """Instagram post information."""
#     post_id: str
#     caption: str
#     timestamp: datetime
#     likes: int
#     url: str
#     is_video: bool
#     video_url: str = ""

#     class Config:
#         arbitrary_types_allowed = True

from typing import Any, Type, List
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import logging
from dotenv import load_dotenv
import os

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

from typing import Any, Type, List
from datetime import datetime
from pydantic import BaseModel, Field
from crewai_tools.tools.base_tool import BaseTool
import instaloader
import logging
import os
import requests
import io
from pydub import AudioSegment
from embedchain import App

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FetchToAddInstagramAudioInput(BaseModel):
    """Input for FetchToAddInstagramAudio."""
    instagram_username: str = Field(..., description="The Instagram username to fetch posts from")

class FetchToAddInstagramAudioOutput(BaseModel):
    """Output containing results of fetch and audio processing."""
    processed_videos: List[str] = Field(default_factory=list, description="List of successfully processed video URLs")
    success: bool = Field(..., description="Whether the operation was successful")
    error_message: str = Field(default="", description="Error message if any operations failed")
    total_posts_found: int = Field(default=0, description="Total number of posts found")
    total_videos_processed: int = Field(default=0, description="Total number of videos processed")

class FetchToAddInstagramAudioTool(BaseTool):
    """Tool that fetches Instagram posts and processes their audio for the vector database."""
    name: str = "Fetch and Process Instagram Audio"
    description: str = "Fetches Instagram posts and adds video audio to the vector database"
    args_schema: Type[BaseModel] = FetchToAddInstagramAudioInput
    insta_loader: Any = Field(default=None, exclude=True)
    app: Any = Field(default=None, exclude=True)

    def __init__(self, app: App, **data):
        super().__init__(**data)
        self.app = app

    def _get_instaloader_instance(self):
        """Get or create a shared Instaloader instance."""
        if not self.insta_loader:
            self.insta_loader = instaloader.Instaloader()
            try:
                username = os.getenv("INSTAGRAM_USERNAME", "the_smart_funnel")
                password = os.getenv("INSTAGRAM_PASSWORD", "Firescan2024+")
                self.insta_loader.login(username, password)
                logger.info("Successfully logged in to Instagram")
            except Exception as e:
                logger.error(f"Failed to login to Instagram: {str(e)}")
                raise
        return self.insta_loader

    def _process_audio(self, audio_buffer: io.BytesIO) -> str:
        """Save audio to temporary file and return the path."""
        temp_path = f"temp_audio_{datetime.now().timestamp()}.wav"
        try:
            with open(temp_path, 'wb') as f:
                f.write(audio_buffer.getvalue())
            return temp_path
        except Exception as e:
            raise Exception(f"Error saving audio: {str(e)}")

    def _cleanup_temp_file(self, file_path: str):
        """Clean up temporary audio file."""
        try:
            if os.path.exists(file_path):
                os.remove(file_path)
        except Exception as e:
            logger.warning(f"Error cleaning up temporary file {file_path}: {str(e)}")

    def _process_video(self, video_url: str, post_metadata: dict) -> bool:
        """Process a single video and add it to the vector database."""
        temp_audio_path = None
        try:
            # Download video
            response = requests.get(video_url, timeout=30)
            if response.status_code != 200:
                raise Exception(f"Failed to download video: Status code {response.status_code}")
            
            # Process video data
            video_buffer = io.BytesIO(response.content)
            audio = AudioSegment.from_file(video_buffer, format="mp4")
            
            # Export as WAV
            audio_buffer = io.BytesIO()
            audio.export(audio_buffer, format="wav")
            audio_buffer.seek(0)
            
            # Save to temporary file
            temp_audio_path = self._process_audio(audio_buffer)
            
            # Add to embedchain with metadata
            self.app.add(
                temp_audio_path,
                data_type="audio",
                metadata=post_metadata
            )
            
            logger.info(f"Successfully processed video: {video_url}")
            return True
            
        except Exception as e:
            logger.error(f"Error processing video {video_url}: {str(e)}")
            return False
        finally:
            if temp_audio_path:
                self._cleanup_temp_file(temp_audio_path)

    def _run(self, instagram_username: str) -> FetchToAddInstagramAudioOutput:
        processed_videos = []
        errors = []
        total_posts = 0
        
        try:
            logger.info(f"Fetching posts for user: {instagram_username}")
            
            # Get Instagram instance
            loader = self._get_instaloader_instance()
            
            # Get profile and posts
            profile = instaloader.Profile.from_username(loader.context, instagram_username)
            post_count = 0
            
            for post in profile.get_posts():
                total_posts += 1
                
                try:
                    if post.is_video and post.video_url:
                        post_metadata = {
                            "source": f"https://www.instagram.com/p/{post.shortcode}/",
                            "caption": post.caption if post.caption else "",
                            "timestamp": post.date_utc.isoformat(),
                            "likes": post.likes,
                            "post_id": post.shortcode
                        }
                        
                        if self._process_video(post.video_url, post_metadata):
                            processed_videos.append(post.video_url)
                    
                    post_count += 1
                    if post_count >= 2:  # Limit to 6 posts as in original
                        break
                        
                except Exception as post_error:
                    error_msg = f"Error processing post {post.shortcode}: {str(post_error)}"
                    logger.error(error_msg)
                    errors.append(error_msg)
                    continue

            success = len(processed_videos) > 0
            error_message = "; ".join(errors) if errors else ""
            
            logger.info(f"Processed {len(processed_videos)} videos out of {total_posts} total posts")
            
            return FetchToAddInstagramAudioOutput(
                processed_videos=processed_videos,
                success=success,
                error_message=error_message,
                total_posts_found=total_posts,
                total_videos_processed=len(processed_videos)
            )

        except Exception as e:
            error_message = f"Error in fetch and process operation: {str(e)}"
            logger.error(error_message)
            return FetchToAddInstagramAudioOutput(
                processed_videos=[],
                success=False,
                error_message=error_message,
                total_posts_found=total_posts,
                total_videos_processed=0
            )

    def _handle_error(self, error: Exception) -> str:
        """Handle errors that occur during tool execution."""
        error_message = str(error)
        if "login" in error_message.lower():
            return "Failed to authenticate with Instagram. Please check your credentials."
        elif "not found" in error_message.lower():
            return f"Instagram profile not found. Please check the username."
        elif "rate limit" in error_message.lower():
            return "Instagram rate limit reached. Please try again later."
        else:
            return f"An error occurred: {error_message}"

class QueryDatabaseInput(BaseModel):
    """Input for QueryDatabase."""
    query: str = Field(..., description="The query to search the vector DB.")

class QueryDatabaseOutput(BaseModel):
    """Output from QueryDatabase."""
    reply: str = Field(..., description="The reply from the query")
    error_message: str = Field(default="", description="Error message if the operation failed")

class QueryDatabaseTool(BaseTool):
    name: str = "Query Database"
    description: str = "Queries the vector database containing processed Instagram content"
    args_schema: Type[BaseModel] = QueryDatabaseInput
    app: Any = Field(default=None, exclude=True)

    def __init__(self, app: App, **data):
        super().__init__(**data)
        self.app = app

    def _run(self, query: str) -> QueryDatabaseOutput:
        try:
            logger.info(f"Querying vector DB with: {query}")
            reply = self.app.query(
                f"""Please analyze the following query about the Instagram content: {query}
                Focus on providing specific examples and quotes from the posts."""
            )
            logger.info("Query completed successfully")
            
            if not reply or (isinstance(reply, str) and not reply.strip()):
                return QueryDatabaseOutput(
                    reply="No relevant content found in the processed posts.",
                    error_message="Empty response from database"
                )
            
            return QueryDatabaseOutput(reply=reply)
                
        except Exception as e:
            error_message = f"Failed to query vector DB: {str(e)}"
            logger.error(error_message)
            return QueryDatabaseOutput(
                reply="Error occurred",
                error_message=error_message
            )
        
# def main():
#     from smartfunnel.tools.chroma_db_init import get_app_instance
#     app_instance = get_app_instance()
    
#     # Initialize the integrated tool
#     tool = FetchToAddInstagramAudioTool(app=app_instance)
#     query_tool = QueryDatabaseTool(app=app_instance)
    
#     try:
#         # Fetch and process posts
#         instagram_username = "antoineblanco99"
#         print(f"Fetching and processing posts from {instagram_username}...")
#         result = tool.run(instagram_username=instagram_username)
        
#         if not result.success:
#             raise Exception(f"Operation failed: {result.error_message}")
            
#         print(f"Successfully processed {result.processed_posts} videos")
        
#         # Query the database
#         queries = [
#             "What are the values/lessons that the author transmits throughout the content that makes him likeable, trustworthy?",
#             # ... other queries ...
#         ]
        
#         print("\nQuerying database...")
#         for i, query in enumerate(queries, 1):
#             print(f"\nQuery {i}: {query}")
#             print("-" * 50)
            
#             query_result = query_tool.run(query=query)
#             if query_result.error_message:
#                 print(f"Query error: {query_result.error_message}")
#             else:
#                 print(f"Response: {query_result.reply}")
            
#             print("\n")
            
#     except Exception as e:
#         print(f"Error in main execution: {str(e)}")
#         import traceback
#         print("\nFull traceback:")
#         traceback.print_exc()

# if __name__ == "__main__":
#     main()

In [2]:
def main():
    from smartfunnel.tools.chroma_db_init import get_app_instance
    app_instance = get_app_instance()
    tool = FetchToAddInstagramAudioTool(app=app_instance)
    
    result = tool.run(instagram_username="antoineblanco99")
    # print(result)
    query_tool = QueryDatabaseTool(app=app_instance)
    query_result = query_tool.run(query="What are the values/lessons that the author transmits throughout the content that makes him likeable, trustworthy?")
    
    print(query_result)

if __name__ == "__main__":
    main()

Using Tool: Fetch and Process Instagram Audio
Using Tool: Query Database




reply='To provide a comprehensive analysis of the values and lessons that make the author of the Instagram content likeable and trustworthy, we need to delve into specific examples and quotes from the posts. Here\'s a detailed breakdown:\n\n### Authenticity and Transparency\n\n**Claim:** The author consistently demonstrates authenticity and transparency, which are key to building trust and likeability.\n\n**Backstory/Context:** In the digital age, audiences crave genuine interactions. The author understands this and leverages his platform to share real-life experiences and challenges.\n\n**Anecdote/Story:** In one post, the author shares a personal story about a failure in his business journey, detailing the lessons learned and how it shaped his current success. He writes, "Failure is not the opposite of success; it\'s part of success."\n\n**Actionable Advice:**\n1. **Share Personal Stories:** Don\'t shy away from discussing failures or challenges. It humanizes you and makes your audie

In [10]:
from smartfunnel.tools.chroma_db_init import get_app_instance
query_tool = QueryDatabaseTool(app=app_instance)
query_result = query_tool.run(query="What are the values/lessons that the author transmits throughout the content that makes him likeable, trustworthy?")
print(query_result)

NameError: name 'app_instance' is not defined