requirements

In [None]:
!pip install --upgrade google-cloud-aiplatform pymongo requests

project credentials

In [None]:
from google.cloud import aiplatform

# Set your GCP project and region
PROJECT_ID = "projectid"
REGION = "us-central1"

aiplatform.init(project=PROJECT_ID, location=REGION)

this is our ingestion agent that analysis and stores scraped data from different social media platforms, user-submitted reports

In [None]:
#ingestion agent
# --- Setup ---
import requests, json, imghdr, re
from vertexai.preview.generative_models import GenerativeModel, Part
from google.cloud import firestore

# --- Initialize Gemini Model ---
gemini_model = GenerativeModel("gemini-2.5-flash")
embedding_model = GenerativeModel("gemini-2.5-flash")

# --- Initialize Firestore ---
db = firestore.Client()
collection = db.collection("ingestion")  # Firestore collection name


# --- Helper: Insert If Not Exists ---
def insert_if_not_exists(doc):
    query = collection.where("media_url", "==", doc["media_url"]).limit(1).stream()
    if any(query):
        print(" Duplicate media_url found. Skipping insertion.")
        return False
    collection.add(doc)
    print(" Inserted into Firestore.")
    return True


# --- Helper: Build Tags for Clustering ---
def build_tokens(doc):
    text = " ".join([
        doc["summary"] or "",
        doc["sentiment"] or "",
        doc["media_insight"] or ""
    ]).strip()

    prompt = f"""
You are a text-to-tags generator for clustering.
Extract exactly 20 to 50 discriminating keywords or short phrases (one word only) from the text below.
Return ONLY a valid JSON array of strings, nothing else.

Text:
{text}
"""
    try:
        raw = embedding_model.generate_content(prompt).text.strip()
        raw = re.sub(r"```(?:json)?", "", raw, flags=re.I).strip()
        tokens = json.loads(raw)

        if not isinstance(tokens, list):
            raise ValueError("Gemini did not return a JSON array")
        tokens = [t.strip() for t in tokens if t.strip()]
        if not (20 <= len(tokens) <= 50):
            raise ValueError(f"Token count {len(tokens)} is outside 20–50")
        return tokens
    except json.JSONDecodeError as e:
        print(f"JSON parsing error in build_tokens: {e}. Raw content: {raw}")
        raise
    except Exception as e:
        print(f"Error building tokens: {e}")
        raise


# --- Main Function: Image Post Ingestion ---
def process_instagram_image_post(post: dict):
    """
    Ingests an Instagram image post:
    {
        "id": "123456",
        "media_url": "https://...jpg",
        "caption": "Some caption here...",
        "timestamp": "2025-07-27T08:00:00Z"
    }
    """
    caption_raw = ""
    try:
        # 1. Download & validate image
        resp = requests.get(post["media_url"], stream=True, timeout=30)
        resp.raise_for_status()
        img_bytes = resp.content
        img_fmt = imghdr.what(None, img_bytes)
        if not img_fmt:
            raise ValueError("Downloaded bytes are not a recognised image format")
        mime = f"image/{img_fmt}"
        img_part = Part.from_data(img_bytes, mime)

        # 2. Generate image insight + time
        vision_prompt = """
You are a video analyst for a breaking news agency.

Watch the video carefully and report:
1. What is happening in the scene? Mention number of people, actions (protest, celebration, accident), banners, signs, police, etc.
2. Are there any audible elements like shouting, sirens, music, slogans?
3. How many people are visible? Small, medium, or large crowd?
4. Mention vehicles or symbols (flags, logos, etc.) visible.
5. If any protest or movement, mention likely cause (if banners or text suggest it).

Be concise but factual. Use your visual understanding.
Output must be under 120 words.
"""
        time_prompt = """
Estimate time-of-day from shadows and lighting. Format: "07AM-12PM", "12PM-05PM", etc.
"""

        media_insight = gemini_model.generate_content([vision_prompt, img_part]).text.strip()
        time_of_day = gemini_model.generate_content([time_prompt, img_part]).text.strip()

        # 3. Analyze caption for summary + sentiment
        caption_prompt = f"""
You are an event intelligence analyst.
Output JSON: {{"summary": "One-sentence event summary in English", "sentiment": "positive | negative | neutral"}}
Caption: {post['caption']}
"""
        caption_raw = gemini_model.generate_content(caption_prompt).text
        caption_json = json.loads(re.sub(r"```(?:json)?", "", caption_raw, flags=re.I).strip())

        # 4. Extract location from caption + image
        loc_prompt = f"""
Extract the most specific Indian location from caption + image.
Return JSON: {{"location": "City, State"}}
Caption: {post['caption']}
Image Description: {media_insight}
"""
        loc_raw = gemini_model.generate_content(loc_prompt).text
        location = json.loads(re.sub(r"```(?:json)?", "", loc_raw, flags=re.I).strip())["location"]

        # 5. Re-validate sentiment for news-worthiness
        sentiment_prompt = f"""
Classify sentiment for news-worthiness. Return JSON: {{"sentiment": "positive|negative|neutral|invalid"}}
Caption: {post['caption']}
Image Description: {media_insight}
"""
        sent_raw = gemini_model.generate_content(sentiment_prompt).text
        final_sentiment = json.loads(re.sub(r"```(?:json)?", "", sent_raw, flags=re.I).strip())["sentiment"]
        if final_sentiment == "invalid":
            print(" Post classified as 'invalid'. Skipping.")
            return None

        # 6. Build document and store
        doc = {
            "id": post.get("id"),
            "summary": caption_json["summary"],
            "sentiment": final_sentiment,
            "media_insight": media_insight,
            "time_of_day": time_of_day,
            "location": location,
            "media_url": post["media_url"],
            "timestamp": post["timestamp"]
        }

        doc["tags"] = build_tokens(doc)
        insert_if_not_exists(doc)
        return doc

    except Exception as e:
        print(f" Image ingestion error: {e}")
        return None


Example post.

In [None]:
test_posts = [

    {
        "id": "405029",
        "caption": "Hundreds of farmers came on roads to protest on national highway in ghaziabad",
        "media_url": "https://newsus.cgtn.com/news/2021-02-04/Indian-farmers-protest-and-highway-blockade-continues-XAM6kIF8vC/img/2d536ed6969341bca0ddecdb36754c8e/2d536ed6969341bca0ddecdb36754c8e.jpeg",
        "timestamp": "2025-07-26T08:09:00Z"
    }
]

# Process each post
for post in test_posts:
    result = process_instagram_image_post(post)
    print(result)

 Image inserted
{'id': '405029', 'summary': 'Hundreds of farmers are protesting on a national highway in Ghaziabad.', 'sentiment': 'negative', 'media_insight': '1.  A vast crowd of predominantly men is gathered outdoors in Kandela, India. Many raise their hands, some open-palmed, others in fists, beneath a prominently displayed Indian flag. Figures are also visible on a distant vehicle and a raised platform.\n2.  The large assembly, unified gestures, and prominent national flag strongly indicate an ongoing rally, demonstration, or protest. The raised hands suggest active participation and collective solidarity or dissent.\n3.  Bright, natural daylight illuminates the scene. Many participants wear multiple layers, including sweaters and shawls, suggesting cool, dry weather, likely during late autumn or winter.', 'time_of_day': '12PM-05PM', 'location': 'Ghaziabad, Uttar Pradesh', 'media_url': 'https://newsus.cgtn.com/news/2021-02-04/Indian-farmers-protest-and-highway-blockade-continues-X