In [1]:
from langchain.agents import AgentType, initialize_agent
from langchain_community.llms import Ollama
from langchain_community.tools import ShellTool



LOCAL_LLM = 'llama3'
llm = Ollama(model=LOCAL_LLM, temperature=0)

# shell_tool = ShellTool()
# shell_tool.description = shell_tool.description + f"args {shell_tool.args}".replace(
#     "{", "{{"
# ).replace("}", "}}")
# self_ask_with_search = initialize_agent(
#     [shell_tool], llm, agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True
# )
# self_ask_with_search.run(
#     "Download the langchain.com webpage and grep for all urls. Return only a sorted list of them. Be sure to use double quotes."
# )

  llm = Ollama(model=LOCAL_LLM, temperature=0)


In [2]:
from pinecone.grpc import PineconeGRPC as Pinecone
import os
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer

# Define Pinecone API Key
load_dotenv()
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")

# Initialize Pinecone Client
pc = Pinecone(api_key=PINECONE_API_KEY)

index_name = "cis-aws-benchmark"
index = pc.Index(index_name)

# Load embedding model for queries
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

def get_relevant_cis_sections(aws_asset):
    query_text = f"CIS AWS Benchmark best practices for {aws_asset}"
    query_embedding = embedding_model.encode([query_text])
    results = index.query(
        namespace="capstone-g3",
        vector=query_embedding[0].tolist(),
        top_k=5,
        include_values=False,
        include_metadata=True
    )
    return results  

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import boto3
import os
from dotenv import load_dotenv

load_dotenv()
sts_client = boto3.client("sts")
print(sts_client.get_caller_identity())  # Should print your AWS account details

{'UserId': 'AIDA4HWJUGWB6YNABUKFH', 'Account': '841162700163', 'Arn': 'arn:aws:iam::841162700163:user/test-user', 'ResponseMetadata': {'RequestId': '93fc0cda-ed53-40ec-9ed5-036ee6bf3ae6', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '93fc0cda-ed53-40ec-9ed5-036ee6bf3ae6', 'content-type': 'text/xml', 'content-length': '406', 'date': 'Sun, 16 Feb 2025 15:15:48 GMT'}, 'RetryAttempts': 0}}


In [4]:
from botocore.exceptions import NoCredentialsError, PartialCredentialsError

def list_aws_assets():
    try:
        # Initialize a session
        session = boto3.Session()
        detected_assets = {}

        # AWS IAM
        iam_client = session.client("iam")
        users = iam_client.list_users()
        roles = iam_client.list_roles()
        detected_assets["IAM Users"] = [user["UserName"] for user in users.get("Users", [])]
        detected_assets["IAM Roles"] = [role["RoleName"] for role in roles.get("Roles", [])]

        # IAM Access Analyzer
        analyzer_client = session.client("accessanalyzer")
        analyzers = analyzer_client.list_analyzers()
        detected_assets["IAM Access Analyzers"] = [a["name"] for a in analyzers.get("analyzers", [])]

        # AWS Config
        config_client = session.client("config")
        config_rules = config_client.describe_config_rules()
        detected_assets["AWS Config Rules"] = [rule["ConfigRuleName"] for rule in config_rules.get("ConfigRules", [])]

        # AWS CloudTrail
        cloudtrail_client = session.client("cloudtrail")
        trails = cloudtrail_client.describe_trails()
        detected_assets["CloudTrail Trails"] = [trail["Name"] for trail in trails.get("trailList", [])]

        # AWS CloudWatch
        cloudwatch_client = session.client("cloudwatch")
        alarms = cloudwatch_client.describe_alarms()
        detected_assets["CloudWatch Alarms"] = [alarm["AlarmName"] for alarm in alarms.get("MetricAlarms", [])]

        # AWS Simple Notification Service (SNS)
        sns_client = session.client("sns")
        topics = sns_client.list_topics()
        detected_assets["SNS Topics"] = [t["TopicArn"] for t in topics.get("Topics", [])]

        # AWS Simple Storage Service (S3)
        s3_client = session.client("s3")
        s3_buckets = s3_client.list_buckets()
        detected_assets["S3 Buckets"] = [b["Name"] for b in s3_buckets.get("Buckets", [])]

        # Elastic Compute Cloud (EC2)
        ec2_client = session.client("ec2")
        instances = ec2_client.describe_instances()
        detected_assets["EC2 Instances"] = [i["InstanceId"] for r in instances.get("Reservations", []) for i in r.get("Instances", [])]

        # Relational Database Service (RDS)
        rds_client = session.client("rds")
        rds_instances = rds_client.describe_db_instances()
        detected_assets["RDS Instances"] = [db["DBInstanceIdentifier"] for db in rds_instances.get("DBInstances", [])]

        # AWS VPC
        vpc_client = session.client("ec2")
        vpcs = vpc_client.describe_vpcs()
        detected_assets["VPCs"] = [vpc["VpcId"] for vpc in vpcs.get("Vpcs", [])]

        return detected_assets

    except (NoCredentialsError, PartialCredentialsError):
        print("AWS credentials not provided or are incorrect.")
        return {}


In [5]:
list_aws_assets()

{'IAM Users': ['admin-user', 'cli-user', 'test-user'],
 'IAM Roles': ['AWSServiceRoleForAPIGateway',
  'AWSServiceRoleForOrganizations',
  'AWSServiceRoleForRDS',
  'AWSServiceRoleForSSO',
  'AWSServiceRoleForSupport',
  'AWSServiceRoleForTrustedAdvisor',
  'rds-monitoring-role'],
 'IAM Access Analyzers': [],
 'AWS Config Rules': [],
 'CloudTrail Trails': [],
 'CloudWatch Alarms': [],
 'SNS Topics': [],
 'S3 Buckets': ['testbucketsiol'],
 'EC2 Instances': [],
 'RDS Instances': ['database-1'],
 'VPCs': ['vpc-0f8144ea4375c89ad']}

In [6]:
results = get_relevant_cis_sections("AWS CloudTrail")
results

{'matches': [{'id': 'chunk_29',
              'metadata': {'source': '2.1.1_Ensure_S3_Bucket_Policy_is_set_to_deny_HTTP_requests.md',
                           'text': '(Automated)\n'
                                   '\n'
                                   'Profile Applicability:\n'
                                   '\n'
                                   '•  Level 2\n'
                                   '\n'
                                   'Description:\n'
                                   '\n'
                                   'At the Amazon S3 bucket level, you can '
                                   'configure permissions through a bucket '
                                   'policy,\n'
                                   'making the objects accessible only through '
                                   'HTTPS.\n'
                                   '\n'
                                   'Rationale:\n'
                                   '\n'
                                 

In [None]:
import os
import subprocess
from langchain.schema import SystemMessage, HumanMessage


# Query Pinecone for CloudTrail-related security sections
results = get_relevant_cis_sections("AWS CloudTrail")

# Iterate over each CIS Benchmark section
for match in results["matches"]:
    section_title = match["metadata"].get("title", "No Title")
    section_text = match["metadata"].get("text", "No content available.")
    similarity_score = match["score"]

    print(f"\n📄 Section: {section_title}")
    print(f"🔹 Similarity Score: {similarity_score}")
    print(section_text)

    # Start 10 refinement iterations for this section
    execution_history = ""

    for i in range(2):
        print(f"\n🔄 Iteration {i+1} - Improving AWS CLI Command")

        # Generate AWS CLI script using LLM
        system_prompt = """
        You are a cloud security expert. Based on the CIS AWS Benchmark recommendation,
        generate a full AWS CLI script to apply the security best practice. If multiple commands are needed,
        format them as a Bash script.

        Take into account previous execution results to improve command accuracy.
        """

        user_prompt = f"""
        The following CIS Benchmark recommendation needs an AWS CLI script:
        ---
        {section_text}
        ---

        Previous execution results:
        {execution_history}

        Generate an improved AWS CLI script based on previous output.
        """

        response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)])
        generated_script = response.content.strip()

        print(f"\n⚡ Generated AWS CLI Script:\n{generated_script}")

        # Save the script to a temporary file
        script_filename = f"cis_benchmark_script_{section_title.replace(' ', '_')}_iter{i+1}.sh"
        with open(script_filename, "w", encoding="utf-8") as script_file:
            script_file.write("#!/bin/bash\n")
            script_file.write(generated_script)

        # Make script executable (Linux/macOS)
        os.chmod(script_filename, 0o755)

        # Ask user before running the script
        confirm = input("\nRun this script on AWS? (y/n): ").strip().lower()
        if confirm == "y":
            result = subprocess.run(["bash", script_filename], capture_output=True, text=True)
            output_text = f"✅ Output:\n{result.stdout}\n❌ Errors:\n{result.stderr if result.stderr else 'None'}"

            # Append execution results to context for next iteration
            execution_history += f"\nIteration {i+1} Execution Results:\n{output_text}\n"

            print(output_text)

        print(f"\n⚠ Script saved as: {script_filename} (Run manually if needed)")

        # Stop early if an answer has been reached
        stop_prompt = f"""
        Given the following execution history:

        {execution_history}

        Has the problem been resolved? If yes, respond with "YES". If more iterations are needed, respond with "NO".
        """

        stop_response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=stop_prompt)]).content.strip()
        
        if "YES" in stop_response.upper():
            print("\n✅ Solution achieved for this section, stopping iterations.")
            print(execution_history)
            break



📄 Section: No Title
🔹 Similarity Score: 0.3773635
No content available.

🔄 Iteration 1 - Improving AWS CLI Command


KeyboardInterrupt: 