In [None]:
!pip install tweepy transformers numpy scikit-learn certifi numpy

In [None]:
import os
import json
import sqlite3
import time
from datetime import datetime
import requests
import tweepy
from transformers import pipeline
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class CompanyAgent:
    def __init__(self, company_name, description, twitter_handle,
                 twitter_api_key=None, twitter_api_secret=None,
                 twitter_access_token=None, twitter_access_token_secret=None,
                 mediastack_api_key=None, serpapi_api_key=None):
        self.company_name = company_name
        self.description = description
        self.twitter_handle = twitter_handle

        # API Keys
        self.mediastack_api_key = mediastack_api_key or os.environ.get("MEDIASTACK_API_KEY")
        self.serpapi_api_key = serpapi_api_key or os.environ.get("SERPAPI_API_KEY")
        self.twitter_api_key = twitter_api_key or os.environ.get("TWITTER_API_KEY")
        self.twitter_api_secret = twitter_api_secret or os.environ.get("TWITTER_API_SECRET")
        self.twitter_access_token = twitter_access_token or os.environ.get("TWITTER_ACCESS_TOKEN")
        self.twitter_access_token_secret = twitter_access_token_secret or os.environ.get("TWITTER_ACCESS_TOKEN_SECRET")

        # Each company gets its own database (isolated knowledge)
        self.memory_db = f"memory_{self.company_name.replace(' ', '_').lower()}.db"
        self.initialize_db()
        self.texts = []  # Texts to build the vector store

        # Initialize sentiment analysis pipeline
        try:
            self.sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
            print(f"[{self.company_name}] Sentiment analyzer initialized successfully.")
        except Exception as e:
            print(f"[{self.company_name}] Error initializing sentiment analyzer: {e}")
            self.sentiment_analyzer = None

        # Initialize Twitter API
        self.initialize_twitter_api()

        # Initialize text generation model
        try:
            self.text_generator = pipeline("text-generation", model="gpt2")
            print(f"[{self.company_name}] Text generation model initialized successfully.")
        except Exception as e:
            print(f"[{self.company_name}] Error initializing text generation model: {e}")
            self.text_generator = None

        # Initialize embedding model (using distilbert for a lightweight approach)
        try:
            self.embedding_model = pipeline('feature-extraction', model='distilbert-base-uncased')
            print(f"[{self.company_name}] Embedding model initialized successfully.")
        except Exception as e:
            print(f"[{self.company_name}] Error initializing embedding model: {e}")
            self.embedding_model = None

        # Load previously stored texts from the company's database
        self.load_texts_from_db()

        # Initialize vector store if texts are available
        if self.texts:
            self.initialize_vector_store()

    def initialize_db(self):
        """Sets up a separate SQLite database for each company."""
        try:
            self.conn = sqlite3.connect(self.memory_db)
            self.cursor = self.conn.cursor()

            # Table for general company data
            self.cursor.execute('''
                CREATE TABLE IF NOT EXISTS company_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    data TEXT,
                    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # Table to track processed tweets
            self.cursor.execute('''
                CREATE TABLE IF NOT EXISTS processed_tweets (
                    tweet_id TEXT PRIMARY KEY,
                    processed_at DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # Table for stored texts
            self.cursor.execute('''
                CREATE TABLE IF NOT EXISTS stored_texts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    text TEXT UNIQUE,
                    embedding BLOB,
                    added_at DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # Table for news items
            self.cursor.execute('''
                CREATE TABLE IF NOT EXISTS news_items (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title TEXT,
                    description TEXT,
                    source TEXT,
                    url TEXT UNIQUE,
                    published_at TEXT,
                    collected_at DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # Table for social engagements
            self.cursor.execute('''
                CREATE TABLE IF NOT EXISTS social_engagements (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    tweet_id TEXT UNIQUE,
                    action TEXT,
                    content TEXT,
                    performed_at DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            self.conn.commit()
            print(f"[{self.company_name}] Database initialized successfully.")
        except Exception as e:
            print(f"[{self.company_name}] Error initializing database: {e}")
            raise

    def load_texts_from_db(self):
        """Load previously stored texts from the database."""
        try:
            self.cursor.execute("SELECT text, embedding FROM stored_texts")
            rows = self.cursor.fetchall()
            if rows:
                self.texts = [row[0] for row in rows]
                self.embeddings = []
                for row in rows:
                    if row[1] is not None:
                        self.embeddings.append(np.frombuffer(row[1], dtype=np.float32))
                    else:
                        # If embedding is missing, compute it now
                        if self.embedding_model:
                            embedding = self.get_embedding(row[0])
                            if embedding is not None:
                                self.embeddings.append(embedding)
                                embedding_binary = embedding.tobytes()
                                self.cursor.execute("UPDATE stored_texts SET embedding = ? WHERE text = ?",
                                                    (embedding_binary, row[0]))
                self.conn.commit()
                print(f"[{self.company_name}] Loaded {len(self.texts)} texts from database.")
        except Exception as e:
            print(f"[{self.company_name}] Error loading texts from database: {e}")
            self.texts = []
            self.embeddings = []

    def get_embedding(self, text):
        """Generate embedding for a text using the embedding model."""
        if not self.embedding_model:
            return None
        try:
            features = self.embedding_model(text, truncation=True, max_length=512)
            # Use the mean of token embeddings as the sentence embedding
            embedding = np.mean(features[0], axis=0)
            return embedding.astype(np.float32)
        except Exception as e:
            print(f"[{self.company_name}] Error generating embedding: {e}")
            return None

    def save_texts_to_db(self, new_texts):
        """Save new texts and their embeddings to the database."""
        try:
            for text in new_texts:
                embedding = self.get_embedding(text)
                if embedding is not None:
                    embedding_binary = embedding.tobytes()
                    self.cursor.execute(
                        "INSERT OR IGNORE INTO stored_texts (text, embedding) VALUES (?, ?)",
                        (text, embedding_binary)
                    )
                else:
                    self.cursor.execute(
                        "INSERT OR IGNORE INTO stored_texts (text) VALUES (?)",
                        (text,)
                    )
            self.conn.commit()
            print(f"[{self.company_name}] Saved {len(new_texts)} new texts to database.")
        except Exception as e:
            print(f"[{self.company_name}] Error saving texts to database: {e}")

    def initialize_vector_store(self):
        """Initialize a simple vector store from stored embeddings."""
        if not self.texts or not hasattr(self, 'embeddings') or not self.embeddings:
            print(f"[{self.company_name}] No texts or embeddings available to create vector store.")
            return
        try:
            print(f"[{self.company_name}] Vector store initialized with {len(self.texts)} texts.")
        except Exception as e:
            print(f"[{self.company_name}] Error initializing vector store: {e}")

    def initialize_twitter_api(self):
        """Initialize the Twitter API connection using Tweepy."""
        if not all([self.twitter_api_key, self.twitter_api_secret,
                    self.twitter_access_token, self.twitter_access_token_secret]):
            print(f"[{self.company_name}] Twitter API credentials not provided or incomplete.")
            self.twitter_api = None
            return
        try:
            auth = tweepy.OAuthHandler(self.twitter_api_key, self.twitter_api_secret)
            auth.set_access_token(self.twitter_access_token, self.twitter_access_token_secret)
            self.twitter_api = tweepy.API(auth)
            self.twitter_api.verify_credentials()
            print(f"[{self.company_name}] Twitter API initialized successfully.")
        except Exception as e:
            print(f"[{self.company_name}] Error initializing Twitter API: {e}")
            self.twitter_api = None

    def add_texts_to_vector_store(self, texts):
        """Adds new texts to the vector store."""
        if not texts:
            return
        new_embeddings = []
        for text in texts:
            embedding = self.get_embedding(text)
            if embedding is not None:
                new_embeddings.append(embedding)
        if len(new_embeddings) == len(texts):
            self.save_texts_to_db(texts)
            self.texts.extend(texts)
            if hasattr(self, 'embeddings'):
                self.embeddings.extend(new_embeddings)
            else:
                self.embeddings = new_embeddings
            print(f"[{self.company_name}] Added {len(texts)} texts to vector store.")
        else:
            print(f"[{self.company_name}] Only {len(new_embeddings)} embeddings generated for {len(texts)} texts.")
            self.save_texts_to_db(texts)

    def similarity_search(self, query_text, k=3):
        """Search for similar texts in the vector store."""
        if not hasattr(self, 'embeddings') or not self.embeddings or not self.texts:
            return []
        query_embedding = self.get_embedding(query_text)
        if query_embedding is None:
            return []
        similarities = []
        for i, embedding in enumerate(self.embeddings):
            query_reshaped = query_embedding.reshape(1, -1)
            embedding_reshaped = embedding.reshape(1, -1)
            sim = cosine_similarity(query_reshaped, embedding_reshaped)[0][0]
            similarities.append((i, sim))
        similarities.sort(key=lambda x: x[1], reverse=True)
        results = []
        for i, sim in similarities[:k]:
            results.append({
                "text": self.texts[i],
                "similarity": sim
            })
        return results

    def is_tweet_processed(self, tweet_id):
        """Check if a tweet has already been processed."""
        self.cursor.execute("SELECT tweet_id FROM processed_tweets WHERE tweet_id = ?", (tweet_id,))
        return self.cursor.fetchone() is not None

    def mark_tweet_processed(self, tweet_id):
        """Mark a tweet as processed."""
        self.cursor.execute("INSERT OR IGNORE INTO processed_tweets (tweet_id) VALUES (?)", (tweet_id,))
        self.conn.commit()

    def collect_social_media_data(self):
        """Collect tweets using SerpApi."""
        print(f"[{self.company_name}] Collecting social media data using SerpApi...")
        if not self.serpapi_api_key:
            print(f"[{self.company_name}] SerpApi API key not provided.")
            return []
        # Build query as "<twitter_handle> twitter" with a fixed location (adjust as needed)
        params = {
            "q": f"{self.twitter_handle} twitter",
            "location": "Austin,Texas,United+States",
            "hl": "en",
            "gl": "us",
            "api_key": self.serpapi_api_key
        }
        url = "https://serpapi.com/search.json"
        tweets = []
        try:
            response = requests.get(url, params=params, timeout=15)
            response.raise_for_status()
            data = response.json()
            if "twitter_results" in data:
                twitter_results = data["twitter_results"]
                if "tweets" in twitter_results:
                    for tweet in twitter_results["tweets"]:
                        tweet_url = tweet.get("link", "")
                        tweet_id = ""
                        if "status/" in tweet_url:
                            tweet_id = tweet_url.split("status/")[-1].split("?")[0]
                        tweets.append({
                            "tweet_id": tweet_id,
                            "post": tweet.get("snippet", ""),
                            "created_at": tweet.get("published_date", ""),
                            "url": tweet_url
                        })
                    print(f"[{self.company_name}] Collected {len(tweets)} tweets.")
                else:
                    print(f"[{self.company_name}] No tweets found in the twitter_results.")
            else:
                print(f"[{self.company_name}] No twitter_results found in API response.")
        except Exception as e:
            print(f"[{self.company_name}] Error fetching tweets using SerpApi: {e}")
        return tweets

    def collect_news_data(self):
        """Collect news data using MediaStack API."""
        print(f"[{self.company_name}] Collecting news using MediaStack API...")
        if not self.mediastack_api_key:
            print(f"[{self.company_name}] MediaStack API key not provided.")
            return []
        url = "http://api.mediastack.com/v1/news"
        params = {
            'access_key': self.mediastack_api_key,
            'keywords': self.company_name,
            'languages': 'en',
            'limit': 10,
            'sort': 'published_desc'
        }
        news_data = []
        try:
            response = requests.get(url, params=params, timeout=15)
            if response.status_code == 200:
                data = response.json()
                if 'data' in data and data['data']:
                    for item in data['data']:
                        if not item.get('title') or not item.get('description'):
                            continue
                        news_item = {
                            "title": item.get('title', ''),
                            "description": item.get('description', ''),
                            "source": item.get('source', ''),
                            "url": item.get('url', ''),
                            "published_at": item.get('published_at', ''),
                            "collected_at": datetime.now().isoformat()
                        }
                        news_data.append(news_item)
                        self.cursor.execute("""
                            INSERT OR IGNORE INTO news_items
                            (title, description, source, url, published_at, collected_at)
                            VALUES (?, ?, ?, ?, ?, ?)
                        """, (
                            news_item["title"],
                            news_item["description"],
                            news_item["source"],
                            news_item["url"],
                            news_item["published_at"],
                            news_item["collected_at"]
                        ))
                    self.conn.commit()
                    print(f"[{self.company_name}] Collected {len(news_data)} news items.")
                else:
                    print(f"[{self.company_name}] No news data found in the API response.")
            else:
                print(f"[{self.company_name}] MediaStack API returned status code {response.status_code}.")
        except Exception as e:
            print(f"[{self.company_name}] Error fetching news: {e}")
        return news_data

    def analyze_sentiment(self, text):
        """Runs sentiment analysis on the given text using DistilBERT."""
        if not self.sentiment_analyzer:
            return {"label": "NEUTRAL", "score": 0.5}
        try:
            result = self.sentiment_analyzer(text)
            return result[0]
        except Exception as e:
            print(f"[{self.company_name}] Error in sentiment analysis: {e}")
            return {"label": "NEUTRAL", "score": 0.5}

    def generate_engagement_response(self, tweet_text):
        """
        Generates a context-aware comment using a RAG-like approach:
        Retrieves similar context and uses GPT-2 for generating a friendly comment.
        """
        if not self.text_generator:
            import random
            templates = [
                f"Thanks for sharing! {self.company_name} values your feedback.",
                f"Interesting point! We at {self.company_name} are always listening.",
                f"Thank you for engaging with {self.company_name}!"
            ]
            return random.choice(templates)
        try:
            similar_docs = self.similarity_search(tweet_text, k=3)
            context_text = " ".join([doc["text"] for doc in similar_docs])[:300]
            prompt = f"""
Tweet: {tweet_text}

Company: {self.company_name}
Company Description: {self.description}

Relevant context: {context_text}

Generate a friendly and professional comment (around 30 words) as {self.company_name}'s response:
"""
            max_length = min(len(prompt.split()) + 50, 512)
            generation = self.text_generator(prompt, max_new_tokens=max_length, num_return_sequences=1)[0]['generated_text']
            # Attempt to extract the generated response
            response_section = generation.split(f"{self.company_name}'s response")
            if len(response_section) > 1:
                response_only = response_section[-1].strip()
                if response_only.startswith(":"):
                    response_only = response_only[1:].strip()
            else:
                sentences = generation.split('.')[-3:]
                response_only = '.'.join(sentences).strip()
            if len(response_only.split()) > 40:
                response_only = " ".join(response_only.split()[:40]) + "..."
            if self.company_name not in response_only:
                response_only += f" - {self.company_name}"
            print(f"[{self.company_name}] Generated response: {response_only}")
            return response_only
        except Exception as e:
            print(f"[{self.company_name}] Error generating engagement response: {e}")
            return f"Thank you for your message. {self.company_name} appreciates your feedback."

    def has_engaged_with_tweet(self, tweet_id, action=None):
        """Check if we've already engaged with this tweet."""
        query = "SELECT action FROM social_engagements WHERE tweet_id = ?"
        params = (tweet_id,)
        if action:
            query += " AND action = ?"
            params = (tweet_id, action)
        self.cursor.execute(query, params)
        return self.cursor.fetchone() is not None

    def record_engagement(self, tweet_id, action, content=None):
        """Record that we've engaged with a tweet."""
        self.cursor.execute(
            "INSERT OR IGNORE INTO social_engagements (tweet_id, action, content) VALUES (?, ?, ?)",
            (tweet_id, action, content)
        )
        self.conn.commit()

    def engage_social_media(self, social_data, news_data):
        """Processes tweets and news data, then takes actions accordingly."""
        texts_to_add = []
        for post in social_data:
            tweet_id = post["tweet_id"]
            if self.is_tweet_processed(tweet_id):
                continue
            texts_to_add.append(post['post'])
            sentiment = self.analyze_sentiment(post["post"])
            action = "Monitor"
            generated_response = None
            if not self.has_engaged_with_tweet(tweet_id):
                if sentiment["label"] == "POSITIVE" or sentiment["score"] > 0.6:
                    generated_response = self.generate_engagement_response(post["post"])
                    action = "Comment"
                    self.record_engagement(tweet_id, action, generated_response)
                elif sentiment["score"] > 0.3:
                    action = "Like"
                    self.record_engagement(tweet_id, action)
            print(f"[{self.company_name}] Processed Tweet (ID: {tweet_id[:6]}..., URL: {post['url']}): Action: {action}")
            self.mark_tweet_processed(tweet_id)
            data_json = json.dumps({
                "source": "twitter",
                "tweet_id": tweet_id,
                "url": post["url"],
                "post": post["post"],
                "sentiment": sentiment,
                "action_taken": action,
                "response_content": generated_response,
                "processed_at": datetime.now().isoformat()
            })
            self.cursor.execute("INSERT INTO company_data (data) VALUES (?)", (data_json,))
            self.conn.commit()
        for news in news_data:
            texts_to_add.append(news["title"])
            if news.get("description"):
                texts_to_add.append(news["description"])
            data_json = json.dumps({
                "source": "news",
                "title": news["title"],
                "description": news.get("description", ""),
                "source_name": news.get("source", ""),
                "url": news.get("url", ""),
                "collected_at": news.get("collected_at", datetime.now().isoformat())
            })
            self.cursor.execute("INSERT INTO company_data (data) VALUES (?)", (data_json,))
            self.conn.commit()
            print(f"[{self.company_name}] Processed news item: '{news['title']}', URL: {news.get('url', '')}")
        if texts_to_add:
            self.add_texts_to_vector_store(texts_to_add)

    def generate_scheduled_post(self):
        """Generate a post based on recent news and company knowledge."""
        if not self.text_generator:
            return None
        try:
            self.cursor.execute("""
                SELECT title, description FROM news_items
                ORDER BY collected_at DESC LIMIT 5
            """)
            recent_news = self.cursor.fetchall()
            news_context = ""
            for title, description in recent_news:
                if title and description:
                    news_context += f"- {title}: {description[:100]}...\n"
            prompt = f"""
Company: {self.company_name}
Description: {self.description}

Recent news:
{news_context}

Generate a short, engaging social media post (maximum 280 characters) for {self.company_name} based on recent developments:
"""
            max_length = len(prompt.split()) + 50
            generation = self.text_generator(prompt, max_new_tokens=max_length, num_return_sequences=1)[0]['generated_text']
            if "social media post" in generation:
                post_content = generation.split("social media post")[-1].strip()
                if post_content.startswith(":"):
                    post_content = post_content[1:].strip()
            else:
                sentences = generation.split('.')[-3:]
                post_content = '.'.join(sentences).strip()
            if len(post_content) > 280:
                post_content = post_content[:277] + "..."
            return post_content
        except Exception as e:
            print(f"[{self.company_name}] Error generating scheduled post: {e}")
            return None

    def create_scheduled_posts(self, num_posts=1):
        """Create scheduled posts based on company knowledge."""
        created_posts = []
        for _ in range(num_posts):
            post_content = self.generate_scheduled_post()
            if post_content:
                timestamp = datetime.now().isoformat()
                post_data = {
                    "type": "scheduled_post",
                    "content": post_content,
                    "created_at": timestamp
                }
                data_json = json.dumps(post_data)
                self.cursor.execute("INSERT INTO company_data (data) VALUES (?)", (data_json,))
                self.conn.commit()
                created_posts.append(post_content)
                print(f"[{self.company_name}] Created scheduled post: {post_content[:50]}...")
        return created_posts

    def run(self):
        """Runs the agent: collects social media and news data, then engages."""
        try:
            print(f"\n[{self.company_name}] Running agent at {datetime.now().isoformat()}")
            social_data = self.collect_social_media_data()
            news_data = self.collect_news_data()
            self.engage_social_media(social_data, news_data)
            self.create_scheduled_posts(1)
            print(f"[{self.company_name}] Run completed successfully.\n")
        except Exception as e:
            print(f"[{self.company_name}] Error during run: {e}")

    def _del_(self):
        """Cleanup: close the database connection."""
        try:
            if hasattr(self, 'conn') and self.conn:
                self.conn.close()
                print(f"[{self.company_name}] Database connection closed.")
        except Exception as e:
            print(f"[{self.company_name}] Error closing database connection: {e}")

def trigger_training(agents):
    """Triggers data collection and engagement for all agents."""
    print(f"\n===== Starting agent runs at {datetime.now().isoformat()} =====")
    for agent in agents:
        try:
            agent.run()
        except Exception as e:
            print(f"Error running agent for {agent.company_name}: {e}")
    print(f"===== All agent runs completed at {datetime.now().isoformat()} =====")

def main():
    """Initializes an agent based on user input and runs it."""
    company_name = input("Enter company name: ")
    description = input("Enter company description (press Enter for default): ")
    if not description:
        description = f"{company_name} is a well-known company."
    twitter_handle = company_name.replace(" ", "")

    agent = CompanyAgent(
        company_name,
        description,
        twitter_handle,
        twitter_api_key=os.environ.get("TWITTER_API_KEY"),
        twitter_api_secret=os.environ.get("TWITTER_API_SECRET"),
        twitter_access_token=os.environ.get("TWITTER_ACCESS_TOKEN"),
        twitter_access_token_secret=os.environ.get("TWITTER_ACCESS_TOKEN_SECRET"),
        mediastack_api_key=os.environ.get("MEDIASTACK_API_KEY"),
        serpapi_api_key=os.environ.get("SERPAPI_API_KEY")
    )

    # Run one cycle then exit
    trigger_training([agent])

if __name__ == "__main__":
    main()
