In [1]:
import requests
import json
import os
from datetime import datetime, timedelta
import time
import boto3
from botocore.exceptions import ClientError
from bs4 import BeautifulSoup
import shutil


**Retrieve NVD API Key from AWS Secrets**

In [2]:
def get_secret():

    secret_name = "NVD_API"
    region_name = "us-east-1"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    secret = get_secret_value_response['SecretString']

**Retrieve NVD Records from past 120 days**
REPLACE "OUTPUT_DIR" WITH DESIRED TEMPORARY LOCATION

In [3]:

# Define constants
API_KEY = get_secret()
BASE_URL = 'https://services.nvd.nist.gov/rest/json/cves/2.0'
HEADERS = {'apiKey': API_KEY}
RESULTS_PER_PAGE = 100  # NVD's recommended value to balance the load
OUTPUT_DIR = datetime.now().strftime("%Y-%m-%d_%H%M%S")
SLEEP_TIME = 6  # Delay in seconds between requests

# Create output directory if not exists
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

# Define the date range for the past 120 days
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=120)

# Format dates to ISO 8601 format
start_date_str = start_date.strftime('%Y-%m-%dT%H:%M:%S.000Z')
end_date_str = end_date.strftime('%Y-%m-%dT%H:%M:%S.000Z')

# Initialize pagination parameters
start_index = 0
total_results = 1  # Initialize to a non-zero value to enter the loop

# Function to save a CVE record to a file
def save_cve_record(cve_record, output_dir):
    cve_id = cve_record['cve']['id']
    with open(f'{output_dir}/{cve_id}.json', 'w') as file:
        json.dump(cve_record, file, indent=2)

# Retrieve and save CVE records
while start_index < total_results:
    params = {
        'startIndex': start_index,
        'resultsPerPage': RESULTS_PER_PAGE,
        'pubStartDate': start_date_str,
        'pubEndDate': end_date_str,
        'noRejected': None,
        'cvssV3Severity': 'HIGH'  # Filter for high severity
    }
    response = requests.get(BASE_URL, headers=HEADERS, params=params)
    
    if response.status_code == 200:
        try:
            data = response.json()
        except json.JSONDecodeError:
            print(f'Error decoding JSON response at index {start_index}.')
            print('Response text:', response.text)
            break
        
        # Update pagination info
        total_results = data.get('totalResults', 0)
        start_index += RESULTS_PER_PAGE
        
        # Save each CVE record
        for cve in data.get('vulnerabilities', []):
            save_cve_record(cve, OUTPUT_DIR)
        
        print(f'Retrieved {len(data.get("vulnerabilities", []))} CVE records. Total so far: {start_index}.')
    else:
        print(f'Failed to retrieve data: {response.status_code}')
        print('Response text:', response.text)
        break
    
    # Sleep for 6 seconds before the next request
    time.sleep(SLEEP_TIME)

print('Data retrieval and storage complete.')


Retrieved 100 CVE records. Total so far: 100.
Retrieved 100 CVE records. Total so far: 200.
Retrieved 100 CVE records. Total so far: 300.
Retrieved 100 CVE records. Total so far: 400.
Retrieved 100 CVE records. Total so far: 500.
Retrieved 100 CVE records. Total so far: 600.
Retrieved 100 CVE records. Total so far: 700.
Retrieved 100 CVE records. Total so far: 800.
Retrieved 100 CVE records. Total so far: 900.
Retrieved 100 CVE records. Total so far: 1000.
Retrieved 100 CVE records. Total so far: 1100.
Retrieved 72 CVE records. Total so far: 1200.
Data retrieval and storage complete.


**Pull English language description from CVE JSON file**
**Pull "References" from CVE JSON file**
**Save pulled data to new JSON file in "cleaned" directory**

In [4]:
def clean_json_files(tobecleaned_directory, cleaned_directory):
    # Create output directory if it doesn't exist
    if not os.path.exists(cleaned_directory):
        os.makedirs(cleaned_directory)

    for filename in os.listdir(tobecleaned_directory):
        if filename.endswith(".json"):
            input_filepath = os.path.join(tobecleaned_directory, filename)
            output_filepath = os.path.join(cleaned_directory, filename)
            
            with open(input_filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)

            cleaned_data = {
                "descriptions": [],
                "references": data.get("cve", {}).get("references", [])
            }
            
            # Filter descriptions where lang is "en"
            descriptions = data.get("cve", {}).get("descriptions", [])
            for description in descriptions:
                if description.get("lang") == "en":
                    cleaned_data["descriptions"].append(description["value"])

            # Save cleaned data to new JSON file in the output directory
            with open(output_filepath, 'w', encoding='utf-8') as file:
                json.dump(cleaned_data, file, indent=4)

# Specify the input and output directories
tobecleaned_directory = OUTPUT_DIR
cleaned_directory = f"{OUTPUT_DIR}_cleaned"

clean_json_files(tobecleaned_directory, cleaned_directory)


**Extract exploit URLs from JSON**
**Scrape exploit URLs**
**Saved scraped data to txt file**

In [5]:
# Directory containing JSON files
json_dir = cleaned_directory

# Function to extract URLs tagged as "exploit" from a JSON file
def extract_exploit_urls_from_json(json_file):
    with open(json_file, 'r', encoding='utf-8') as file:
        data = json.load(file)
        urls = []
        references = data.get('references', [])
        print(f"Processing {json_file}: Found {len(references)} references.")
        for ref in references:
            if 'tags' in ref and 'Exploit' in ref['tags']:
                urls.append(ref['url'])
                print(f"Found exploit URL: {ref['url']}")
        return urls

# Function to scrape content from a URL with a delay and save titles, headings, and body text to a file
def scrape_url(url, output_file, delay=1):
    if 'github.com' in url or 'chromium.org' in url:
        print(f"Skipping URL: {url}")
        return
    
    try:
        response = requests.get(url)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        
        title = soup.title.string if soup.title else 'No title found'
        print(f"Title: {title}")
        
        headings = [heading.get_text() for heading in soup.find_all(['h1', 'h2', 'h3'])]
        body = soup.get_text()
        
        with open(output_file, 'a', encoding='utf-8') as f:
            f.write(f"Title: {title}\n")
            f.write("Headings:\n")
            for heading in headings:
                f.write(f"{heading}\n")
            f.write("Body:\n")
            f.write(f"{body}\n\n")
    except requests.RequestException as e:
        print(f"Failed to retrieve {url}: {e}")
    finally:
        time.sleep(delay)  # Delay between requests

# Collect and process each JSON file
for filename in os.listdir(json_dir):
    if filename.endswith('.json'):
        json_file = os.path.join(json_dir, filename)
        exploit_urls = extract_exploit_urls_from_json(json_file)
        if exploit_urls:
            output_file = os.path.join(json_dir, f"{os.path.splitext(filename)[0]}.txt")
            print(f"Extracted {len(exploit_urls)} exploit URLs from {filename}.")
            
            # Ensure the output file is empty before writing
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write('')
            
            # Scrape content from all extracted "exploit" URLs with a delay and save titles, headings, and body text to the file
            for url in exploit_urls:
                scrape_url(url, output_file)

# Debug: Print a message if no exploit URLs were found
        else:
            print(f"No exploit URLs found in {filename}.")

Processing 2024-08-01_214719_cleaned/CVE-2024-30084.json: Found 1 references.
No exploit URLs found in CVE-2024-30084.json.
Processing 2024-08-01_214719_cleaned/CVE-2024-21417.json: Found 1 references.
No exploit URLs found in CVE-2024-21417.json.
Processing 2024-08-01_214719_cleaned/CVE-2023-35952.json: Found 2 references.
No exploit URLs found in CVE-2023-35952.json.
Processing 2024-08-01_214719_cleaned/CVE-2023-49593.json: Found 1 references.
No exploit URLs found in CVE-2023-49593.json.
Processing 2024-08-01_214719_cleaned/CVE-2024-25575.json: Found 1 references.
No exploit URLs found in CVE-2024-25575.json.
Processing 2024-08-01_214719_cleaned/CVE-2024-35778.json: Found 1 references.
No exploit URLs found in CVE-2024-35778.json.
Processing 2024-08-01_214719_cleaned/CVE-2022-48834.json: Found 5 references.
No exploit URLs found in CVE-2022-48834.json.
Processing 2024-08-01_214719_cleaned/CVE-2024-6338.json: Found 4 references.
No exploit URLs found in CVE-2024-6338.json.
Processing

**Push files to S3**

In [6]:

# Function to retrieve secrets from AWS Secrets Manager
def get_secret2(secret_name2):
    client = boto3.client('secretsmanager')
    response = client.get_secret_value(SecretId=secret_name2)
    secret = json.loads(response['SecretString'])
    return secret

# Retrieve secrets
secret_name2 = "S3InputBucket-RAG"  # Replace with your secret name
secrets = get_secret2(secret_name2)

# Extract secrets
bucket_name = secrets['bucket_name']
role_arn = secrets['role_arn']

# Set the local folder path
local_folder = cleaned_directory

# Create an STS client
sts_client = boto3.client('sts')

# Assume the role
response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName='AssumeRoleSession')
credentials = response['Credentials']

# Configure AWS SDK with temporary credentials
s3_client = boto3.client('s3',
                         aws_access_key_id=credentials['AccessKeyId'],
                         aws_secret_access_key=credentials['SecretAccessKey'],
                         aws_session_token=credentials['SessionToken'])

def upload_folder_to_s3(local_folder, bucket_name, s3_client):
    for root, dirs, files in os.walk(local_folder):
        # Skip the '.ipynb_checkpoints' directory
        if '.ipynb_checkpoints' in dirs:
            dirs.remove('.ipynb_checkpoints')
        
        for file in files:
            local_path = os.path.join(root, file)
            relative_path = os.path.relpath(local_path, local_folder)
            s3_path = relative_path.replace("\\", "/")  # Ensure S3 path uses forward slashes
            try:
                s3_client.upload_file(local_path, bucket_name, s3_path)
                print(f'Successfully uploaded {local_path} to s3://{bucket_name}/{s3_path}')
            except Exception as e:
                print(f'Failed to upload {local_path} to s3://{bucket_name}/{s3_path}: {e}')

# Call the function to upload the folder
upload_folder_to_s3(local_folder, bucket_name, s3_client)


Successfully uploaded 2024-08-01_214719_cleaned/CVE-2024-30084.json to s3://rag-llm-docs/CVE-2024-30084.json
Successfully uploaded 2024-08-01_214719_cleaned/CVE-2024-21417.json to s3://rag-llm-docs/CVE-2024-21417.json
Successfully uploaded 2024-08-01_214719_cleaned/CVE-2023-35952.json to s3://rag-llm-docs/CVE-2023-35952.json
Successfully uploaded 2024-08-01_214719_cleaned/CVE-2023-49593.json to s3://rag-llm-docs/CVE-2023-49593.json
Successfully uploaded 2024-08-01_214719_cleaned/CVE-2024-25575.json to s3://rag-llm-docs/CVE-2024-25575.json
Successfully uploaded 2024-08-01_214719_cleaned/CVE-2024-35778.json to s3://rag-llm-docs/CVE-2024-35778.json
Successfully uploaded 2024-08-01_214719_cleaned/CVE-2022-48834.json to s3://rag-llm-docs/CVE-2022-48834.json
Successfully uploaded 2024-08-01_214719_cleaned/CVE-2024-6338.json to s3://rag-llm-docs/CVE-2024-6338.json
Successfully uploaded 2024-08-01_214719_cleaned/CVE-2024-30042.json to s3://rag-llm-docs/CVE-2024-30042.json
Successfully uploaded

**Once data is pushed to S3, move to archive**

In [7]:

def move_directories_to_archive(dir1, dir2, archive_dir):
    # Ensure the archive directory exists
    if not os.path.exists(archive_dir):
        os.makedirs(archive_dir)
    
    # Move the first directory
    shutil.move(dir1, os.path.join(archive_dir, os.path.basename(dir1)))
    
    # Move the second directory
    shutil.move(dir2, os.path.join(archive_dir, os.path.basename(dir2)))

# Example usage
dir1 = OUTPUT_DIR
dir2 = cleaned_directory
archive_dir = 'archive'

move_directories_to_archive(dir1, dir2, archive_dir)
