In [None]:
import os
os.environ["GOOGLE_API_KEY"] = ""
os.environ["GEMINI_API_KEY"] = ""

In [2]:
import os
import requests
import json
from typing import Any, List, Optional

from langchain_core.prompts import PromptTemplate
from langchain_core.language_models.llms import BaseLLM
from langchain_core.outputs import LLMResult

from pydantic import BaseModel, Field, validator
from langchain_core.output_parsers import StrOutputParser

from langchain_core.runnables import RunnableLambda # Added for mapping chain outputs

In [3]:

# --- 1. Custom LLM Implementation ---

class CustomHTTPGemini(BaseLLM):
    """
    A custom LangChain LLM wrapper that interacts with the Google Gemini API
    using direct HTTP requests (POST to generateContent endpoint), with optional
    support for JSON output via response_schema.
    """

    # Model and API Configuration
    api_key: Optional[str] = None
    model_name: str = Field(default="gemini-2.5-flash", alias="model")
    base_url: str = "https://generativelanguage.googleapis.com/v1beta/models/"
    # New field to hold the JSON schema definition for generationConfig
    response_schema: Optional[dict] = None

    def __init__(self, **kwargs: Any):
        super().__init__(**kwargs)
        # Ensure the API key is set, prioritizing the passed argument or environment variable
        if not self.api_key:
            self.api_key = os.getenv("GEMINI_API_KEY")

        if not self.api_key:
            raise ValueError("GEMINI_API_KEY must be provided or set as an environment variable.")

    @property
    def _llm_type(self) -> str:
        """Return type of LLM."""
        return "custom_http_gemini"

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[Any] = None,
        **kwargs: Any,
    ) -> str:
        """
        The core logic to make the HTTP POST request to the Gemini API.

        This method is called by the LangChain framework when the LLM is invoked.
        """
        # 1. Construct the API Endpoint for the specific model and method
        api_endpoint = f"{self.base_url}{self.model_name}:generateContent"

        # 2. Construct the complete URL with API Key as query parameter
        url = f"{api_endpoint}?key={self.api_key}"

        # 3. Define the HTTP headers
        headers = {
            "Content-Type": "application/json"
        }

        # 4. Construct the JSON request body following the Gemini API spec
        request_data = {
            "contents": [
                {
                    "parts": [
                        {
                            "text": prompt
                        }
                    ]
                }
            ]
        }

        # 5. Add generation configuration for JSON output if schema is present
        if self.response_schema:
            request_data["generationConfig"] = {
                "responseMimeType": "application/json",
                "responseSchema": self.response_schema
            }

        # 6. Send the request
        try:
            # Using 'json=request_data' is a cleaner way to send JSON data with requests
            response = requests.post(
                url=url,
                headers=headers,
                json=request_data
            )
            response.raise_for_status() # Raise exception for bad status codes

            response_json = response.json()

            # 7. Extract the generated text from the structured JSON response
            # Note: For JSON mode, the output text is the raw JSON string.
            generated_text = response_json['candidates'][0]['content']['parts'][0]['text']

            return generated_text

        except requests.exceptions.HTTPError as err:
            error_message = f"Gemini API HTTP Error ({err.response.status_code}): {err.response.text}"
            raise RuntimeError(error_message) from err
        except Exception as e:
            raise RuntimeError(f"An unexpected error occurred during API call: {e}")

    # Note: _generate is required by BaseLLM if _call is not implemented, but since
    # we implemented _call for simplicity, we provide a basic _generate for completeness
    # in case of future changes in the base class.
    def _generate(
        self,
        prompts: List[str],
        stop: Optional[List[str]] = None,
        run_manager: Optional[Any] = None,
        **kwargs: Any,
    ) -> LLMResult:
        """Call the LLM on a list of prompts."""
        generations = []
        for prompt in prompts:
            text = self._call(prompt, stop, run_manager, **kwargs)
            generations.append([{"text": text}]) # Wrap the result in the expected structure
        return LLMResult(generations=generations)


# --- 2. LCEL Chain Integration (Cascading Agents with JSON) ---

# --- JSON Schema Definitions (Mandatory for Gemini JSON mode) ---

# Schema for Agent 1: Score
SCORE_SCHEMA = {
    "type": "OBJECT",
    "properties": {
        "score": {"type": "NUMBER", "description": "A random technical score between 0.0 and 1.0."},
    },
    "required": ["score"],
    "propertyOrdering": ["score"]
}

# Schema for Agent 2: Detailed Review
REVIEW_SCHEMA = {
    "type": "OBJECT",
    "properties": {
        "review_text": {"type": "STRING", "description": "A concise, technical review."},
        "category": {"type": "STRING", "description": "The category of the review (e.g., 'Positive', 'Neutral', 'Negative')."}
    },
    "required": ["review_text", "category"],
    "propertyOrdering": ["review_text", "category"]
}



In [4]:

if __name__ == "__main__":
    # --- Setup ---
    print("--- LangChain Custom HTTP Gemini Cascading Agent with JSON Output Example ---")

    # NOTE: Set your API Key in your environment before running:
    # export GEMINI_API_KEY="YOUR_API_KEY_HERE"

    # Initialize the custom LLM instances, each with its required JSON schema
    try:
        # LLM for the Score Generator (Agent 1)
        llm_score_generator = CustomHTTPGemini(model_name="gemini-2.5-flash", response_schema=SCORE_SCHEMA)
        # LLM for the Review Generator (Agent 2)
        llm_review_generator = CustomHTTPGemini(model_name="gemini-2.5-flash", response_schema=REVIEW_SCHEMA)
    except ValueError as e:
        print(f"\nERROR: {e}")
        print("Please set the GEMINI_API_KEY environment variable and try again.")
        exit()

    # --- AGENT 1: Score Generator (Input: JSON-like context, Output: JSON string with score) ---

    prompt_1 = PromptTemplate.from_template(
        "You are a technical analyst. Your task is to assign a random score between 0.0 and 1.0 to the '{item_name}' based on its complexity '{complexity_level}'. Output the result strictly in JSON format according to the schema."
    )

    # Chain 1: Score Generator (outputs a raw JSON string)
    chain_1 = prompt_1 | llm_score_generator | StrOutputParser()

    # --- Mapping Step ---
    # 1. Take the original input (which has 'item_name' and 'complexity_level') AND
    #    the JSON score string output from Chain 1.
    # 2. Parse the JSON score string.
    # 3. Combine both into a single dictionary required by Chain 2.

    def map_score_to_review_context(input_dict: dict) -> dict:
        """
        Input dictionary looks like:
        {'item_name': '...', 'complexity_level': '...', 'output': '{"score": 0.95}'}
        """
        score_json_str = input_dict.pop("output") # Get the raw JSON string output from Chain 1

        try:
            score_data = json.loads(score_json_str)
            score = score_data.get('score', 0.0)

            # Combine the original input context with the new score for Chain 2
            return {
                "item_name": input_dict["item_name"],
                "score": score,
                "complexity_level": input_dict["complexity_level"],
            }
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON from Agent 1: {e}")
            # Fallback for Chain 2 if JSON fails
            return {
                "item_name": input_dict["item_name"],
                "score": 0.5, # Default fallback score
                "complexity_level": input_dict["complexity_level"],
            }

    # Use RunnableParallel to ensure the output of Chain 1 is passed alongside the original input
    # The output key is named 'output' by default if no key is specified.
    score_with_context_chain = {
        "item_name": RunnableLambda(lambda x: x["item_name"]),
        "complexity_level": RunnableLambda(lambda x: x["complexity_level"]),
        "output": chain_1 # Output of Chain 1: raw JSON score string
    }

    score_mapper = score_with_context_chain | RunnableLambda(map_score_to_review_context)


    # --- AGENT 2: Review Generator (Input: score, item, complexity, Output: JSON string review) ---

    prompt_2 = PromptTemplate.from_template(
        "Generate a technical review for the item '{item_name}' which has a complexity of '{complexity_level}' and received a technical score of {score}. Your review must reflect this score. Output the review strictly in JSON format according to the schema."
    )

    # Chain 2: Review Generator (outputs a raw JSON string)
    chain_2 = prompt_2 | llm_review_generator | StrOutputParser()

    # --- FINAL CASCADING CHAIN ---
    # 1. Prompt 1 takes input {"item_name": ..., "complexity_level": ...}
    # 2. Mapper executes Chain 1 and combines the score with original inputs.
    # 3. Chain 2 receives the mapped inputs and generates the final review.
    final_chain = score_mapper | chain_2

    # --- Example Invocation ---
    print("\nInvoking the two-agent cascading chain: Score Generator -> Review Generator...")

    # This dictionary serves as the input for the entire final chain
    input_topic = {
        "item_name": "Quantum Entanglement Module v1.2",
        "complexity_level": "High/Experimental"
    }

    # The final chain only requires the input for the first chain's prompt's variables
    response = final_chain.invoke(input_topic)

    print(f"\nOriginal Input:\n{json.dumps(input_topic, indent=2)}")
    print("\n--- Final Output (Generated by Agent 2 based on Agent 1's score) ---")

    # Try to print the final result nicely
    try:
        print(json.dumps(json.loads(response), indent=2))
    except json.JSONDecodeError:
        print(f"Raw Output:\n{response}")

    print("\n--- End of Cascading Chain Execution ---")

--- LangChain Custom HTTP Gemini Cascading Agent with JSON Output Example ---

Invoking the two-agent cascading chain: Score Generator -> Review Generator...

Original Input:
{
  "item_name": "Quantum Entanglement Module v1.2",
  "complexity_level": "High/Experimental"
}

--- Final Output (Generated by Agent 2 based on Agent 1's score) ---
{
  "review_text": "The Quantum Entanglement Module v1.2 demonstrates promising entanglement fidelity (0.7825 E_F) within controlled experimental parameters. Its high complexity is inherent to the experimental nature, showing significant potential for quantum key distribution. However, further iteration is needed to enhance stability and reduce sensitivity to environmental noise for practical integration.",
  "category": "Positive"
}

--- End of Cascading Chain Execution ---
