<a href="https://colab.research.google.com/github/tractorjuice/Building_Wardley_BoK/blob/main/Building_AI_Body_of_Knowledge_Part_17_Websites.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Set Up


Mount Google Drive

In [None]:
# Mount Google Drive
try:
    from google.colab import drive
    drive.mount('/content/gdrive')
except Exception as e:
    print(f"Failed to mount Google Drive. Reason: {e}")

Get required secrets and setup any keys

In [None]:
import os
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')

Check / create directory structure

In [None]:
debug = False  # set this to True if you only want the first 5 files

SETTINGS = "/content/gdrive/MyDrive/AI/WardleyMapsKB/settings.ini"

In [None]:
# Read the settings.ini file to get key configuration
import configparser
config = configparser.ConfigParser()
config.read(SETTINGS)

In [None]:
import os, datetime

# Get the current time at the start of the program and format as a string
start_time = datetime.datetime.now()
start_time_str = start_time.strftime("%Y%m%d%H%M")

# Read the settings.ini file
try:
    KB_FOLDER = config.get('System', 'KB_FOLDER').strip('"')
except configparser.NoOptionError:
    print("Missing configuration key.")
except Exception as e:
    print(f"Failed to read configuration. Reason: {e}")

YT = os.path.join(KB_FOLDER, "youtube")  # All YouTube files
YT_DATASTORE = os.path.join(YT, "datastore")  # Sub-directory for YouTube FAISS datastore files
YT_VIDEOS = os.path.join(YT, "videos")  # Sub-directory for audio files
YT_AUDIO = os.path.join(YT_VIDEOS, "audio")  # Sub-directory for audio files
YT_TRANSCRIPTS = os.path.join(YT_VIDEOS, "transcripts")  # Sub-directory for transcripts of audio files
YT_TRANSCRIPTS_TEXT = os.path.join(YT_TRANSCRIPTS, "full_text")  # Sub-directory for text of audio files
YT_TRANSCRIPTS_WHISPER = os.path.join(YT_TRANSCRIPTS, "whisper_chunks")  # Sub-directory for Whisper chunks of audio files
YT_TRANSCRIPTS_WHISPER_DISTIL = os.path.join(YT_TRANSCRIPTS, "distil_whisper_chunks")  # Sub-directory for Distil Whisper chunks of audio files
YT_TRANSCRIPTS_COMBINED = os.path.join(YT_TRANSCRIPTS, "combined_transcripts")  # Sub-directory for books FAIS datastore file
YT_TRANSCRIPTS_DATASTORE = os.path.join(YT_TRANSCRIPTS, "datastore")  # Sub-directory for books FAISS datastore file

PODCAST = os.path.join(KB_FOLDER, "podcast")  # Sub-directory for YouTube FAIS datastore files
PODCAST_DATASTORE = os.path.join(PODCAST, "datastore")  # Sub-directory for YouTube FAIS datastore files
PODCAST_AUDIO = os.path.join(PODCAST, "audio")  # Sub-directory for YouTube FAIS datastore files
PODCAST_TRANSCRIPTS = os.path.join(PODCAST, "transcripts")  # Sub-directory for YouTube FAIS datastore files

MAPS = os.path.join(KB_FOLDER, "maps")  # Sub-directory for research 2022 files
MAPS_DATASTORE = os.path.join(MAPS, "datastore")  # Sub-directory for maps FAIS datastore files

WEBSITES_DATA = os.path.join(KB_FOLDER, "websites")
WEBSITES_CONTENTS = os.path.join(WEBSITES_DATA, "contents")
WEBSITES_DATASTORE = os.path.join(WEBSITES_DATA, "datastore")

directories = [
    YT,
    YT_DATASTORE,
    YT_VIDEOS,
    YT_AUDIO,
    YT_TRANSCRIPTS,
    YT_TRANSCRIPTS_TEXT,
    YT_TRANSCRIPTS_WHISPER,
    YT_TRANSCRIPTS_WHISPER_DISTIL,
    YT_TRANSCRIPTS_COMBINED,
    YT_TRANSCRIPTS_DATASTORE,
    PODCAST,
    PODCAST_DATASTORE,
    PODCAST_AUDIO,
    PODCAST_TRANSCRIPTS,
    MAPS,
    MAPS_DATASTORE,
    WEBSITES_DATA,
    WEBSITES_CONTENTS,
    WEBSITES_DATASTORE
    ]

for directory in directories:
    print (directory)
    if not os.path.exists(directory):
        try:
            os.makedirs(directory)
        except Exception as e:
            print(f"Failed to create {directory}. Reason: {e}")


Install required dependencies

In [None]:
!pip install -q -U langchain-core
!pip install -q -U langchain-community
!pip install -q -U langchain-openai
!pip install -q -U langchain
!pip install -q -U fake-useragent # Try to give a random fake user header for html calls

Set up OpenAI Model and necessary variables

In [None]:
#MODEL = "gpt-3.5-turbo-16k" # Legacy
MODEL = "gpt-3.5-turbo-1106" # Current model
#MODEL = "gpt-3.5-turbo-0125" # Latest model
#MODEL = "gpt-4-0125-preview" Latest model

## Scrape Websites

In [None]:
!pip install -q -U playwright beautifulsoup4 html2text
!playwright install


**Read** URLs configuration file

In [None]:
# Read the settings.ini file
try:
    website_urls = config.get('Websites', 'URLS').strip('"')
    urls = [url.strip().replace('"', '') for url in website_urls.split(',')]
except configparser.NoOptionError:
    print("Missing configuration key.")
except Exception as e:
    print(f"Failed to read configuration. Reason: {e}")

print(urls)

In [None]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse

headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36"}

def get_first_level_urls(site_url):
    try:
        # Send a request to the website
        response = requests.get(site_url, headers=headers, timeout=5)
        response.raise_for_status()

        # Parse the content with BeautifulSoup
        soup = BeautifulSoup(response.content, 'html.parser')

        # Extract all links
        links = set()
        for a_tag in soup.find_all('a', href=True):
            url = urljoin(site_url, a_tag['href'])
            parsed_url = urlparse(url)
            # Check if the link is a first-level URL
            if parsed_url.netloc == urlparse(site_url).netloc and parsed_url.path.count('/') == 1:
                links.add(url)

        return list(links)

    except requests.RequestException as e:
        print(f"Error during requests to {site_url} : {str(e)}")

# Example usage
site_url = urls[0]
first_level_urls = get_first_level_urls(site_url)
for pages in first_level_urls:
    print(pages)


In [None]:
import os
import re
import json
from langchain_community.document_loaders import AsyncHtmlLoader
from langchain.document_transformers import Html2TextTransformer
from aiohttp.client_exceptions import TooManyRedirects, InvalidURL

# Setup OpenAI embeddings for vector database
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()

def is_valid_url(url):
    return url.startswith('http://') or url.startswith('https://')

def url_to_filename(url):
    """Converts a URL into a valid filename by removing the 'http://' or 'https://' part and non-filename characters."""
    # Remove 'http://' or 'https://'
    url = re.sub(r'^https?://', '', url)
    # Replace non-filename characters with underscores
    filename = re.sub(r'\W+', '_', url)
    return filename


In [None]:
!pip install -q -U langchain_openai
from langchain_openai import ChatOpenAI

In [None]:
import os
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')

llm = ChatOpenAI(temperature=0, model=MODEL)

In [None]:
from langchain.chains import create_extraction_chain

schema = {
    "properties": {
        "wardley_maps_concepts": {"type": "string"},
        "strategy_concepts": {"type": "string"},
        "wardley_doctrine": {"type": "string"},
        "components": {"type": "string"},
    },
    "required": [],
}

def extract(content: str, schema: dict):
    print("Extracting content with LLM")
    return create_extraction_chain(schema=schema, llm=llm).run(content)

In [None]:
import pprint
import json
from langchain.text_splitter import RecursiveCharacterTextSplitter

def scrape_with_playwright(url, schema):
    loader = AsyncHtmlLoader(url)
    doc = loader.load()
    html2text = Html2TextTransformer()
    doc_transformed = html2text.transform_documents(doc)

    # Grab the first 1000 tokens of the site
    splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=10000, chunk_overlap=0
    )
    splits = splitter.split_documents(doc_transformed)

    # Process the first split
    if splits:
        extracted_content = extract(schema=schema, content=splits[0].page_content)
        print(extracted_content)
        return extracted_content


In [None]:
for url in urls:
    filename_json = f"{url_to_filename(url)}.json"
    file_path_json = os.path.join(WEBSITES_CONTENTS, filename_json)
    filename_txt = f"{url_to_filename(url)}.txt"
    file_path_txt = os.path.join(WEBSITES_CONTENTS, filename_txt)

    if os.path.exists(file_path_txt):
        print(f"File already exists, skipping scraping: {file_path_txt}")
        continue

    extracted_content = scrape_with_playwright(url, schema=schema)

    # Check if extracted_content is not empty
    if extracted_content:
        # Flatten the list of dictionaries and remove empty values
        flattened_content = {}
        for content_dict in extracted_content:
            if isinstance(content_dict, dict):
                for key, value in content_dict.items():
                    if value.strip():  # Filter out empty or whitespace-only values
                        flattened_content[key] = value

        pprint.pprint(flattened_content)

        # Save to a separate file if there is content to save
        if flattened_content:
            print("Saving Contents      : ", file_path_json)
            try:
                with open(file_path_json, 'w', encoding='utf-8') as file:
                    json.dump(flattened_content, file, ensure_ascii=False, indent=4)  # Save as JSON
            except json.JSONEncodeError as e:
                print(f"Error saving JSON for {url}: {e}")
