In [10]:
import openai
import tiktoken
from typing import Optional

In [11]:
# open file
with open("data/united_states_wikipedia.txt", "r") as file:
    united_states_wikipedia_text = file.read()

In [12]:
encoding = tiktoken.encoding_for_model('gpt-3.5-turbo')
len(encoding.encode(united_states_wikipedia_text))

16446

In [6]:
def get_chat_completion(messages, model='gpt-3.5-turbo'):
    response = openai.ChatCompletion.create(
        model=model,
        messages=messages,
        temperature=0,
    )
    return response.choices[0].message['content']

In [26]:
def summarize(text: str,
              detail: int,
              model: str = 'gpt-3.5-turbo',
              special_instructions: Optional[str] = None,
              minimum_chunk_size: Optional[int] = 500,
              chunk_delimiter: str = ".",
              summarize_recursively = False):
    assert 0 <= detail <= 1
    document_length = len(tokenize(text))
    # interpolate chunk size between minimum_chunk_size and document_length // 2
    chunk_size = int(minimum_chunk_size + detail * (document_length // 2 - minimum_chunk_size))
    print(f"Using a chunk size of {chunk_size} tokens")
    text_chunks = chunk_on_delimiter(text, chunk_size, chunk_delimiter)

    accumulated_summaries = []

    for chunk in text_chunks:
        if summarize_recursively and accumulated_summaries:
            # Creating a structured prompt for recursive summarization
            system_message_content = "Summarize the following text."
            user_message_content = f"Previous summaries:\n\n{'\n\n'.join(accumulated_summaries)}\n\nText to summarize next:\n\n{chunk}"
        else:
            # Directly passing the chunk for summarization without recursive context
            system_message_content = "Summarize the following text."
            user_message_content = chunk

        # Constructing messages based on whether recursive summarization is applied
        messages = [
            {"role": "system", "content": system_message_content},
            {"role": "user", "content": user_message_content}
        ]

        # Assuming this function gets the completion and works as expected
        response = get_chat_completion(messages, model=model)
        accumulated_summaries.append(response)

    # Compile final summary from partial summaries
    final_summary = '\n\n'.join(accumulated_summaries)

    return final_summary, accumulated_summaries

summarize(united_states_wikipedia_text, .5)

Chunk size: 4361


In [29]:
from typing import List, Tuple, Optional


def tokenize(text: str) -> List[str]:
    encoding = tiktoken.encoding_for_model('gpt-3.5-turbo')
    return encoding.encode(text)


def chunk_on_delimiter(input_string, max_tokens, delimiter):
    chunks = input_string.split(delimiter)
    combined_chunks, _, dropped_chunk_count = combine_chunks_with_no_minimum(
        chunks, max_tokens, chunk_delimiter=delimiter, add_ellipsis=True
    )
    print(f"warning: {dropped_chunk_count} chunks were dropped due to overflow")
    combined_chunks = [f"{chunk}{delimiter}" for chunk in combined_chunks]
    return combined_chunks


def combine_chunks_with_no_minimum(
    chunks: List[str],
    max_tokens: int,
    chunk_delimiter="\n\n",
    header: Optional[str] = None,
    add_ellipsis_for_overflow=False,
) -> Tuple[List[str], List[int]]:
    dropped_chunk_count = 0
    output = []  # list to hold the final combined chunks
    output_indices = []  # list to hold the indices of the final combined chunks
    candidate = (
        [] if header is None else [header]
    )  # list to hold the current combined chunk candidate
    candidate_indices = []
    for chunk_i, chunk in enumerate(chunks):
        chunk_with_header = [chunk] if header is None else [header, chunk]
        if len(tokenize(chunk_delimiter.join(chunk_with_header))) > max_tokens:
            print(f"warning: chunk overflow")
            if (
                add_ellipsis_for_overflow
                and len(tokenize(chunk_delimiter.join(candidate + ["..."]))) <= max_tokens
            ):
                candidate.append("...")
                dropped_chunk_count += 1
            continue  # this case would break downstream assumptions
        # estimate token count with the current chunk added
        extended_candidate_token_count = len(tokenize(chunk_delimiter.join(candidate + [chunk])))
        # If the token count exceeds max_tokens, add the current candidate to output and start a new candidate
        if extended_candidate_token_count > max_tokens:
            output.append(chunk_delimiter.join(candidate))
            output_indices.append(candidate_indices)
            candidate = chunk_with_header  # re-initialize candidate
            candidate_indices = [chunk_i]
        # otherwise keep extending the candidate
        else:
            candidate.append(chunk)
            candidate_indices.append(chunk_i)
    # add the remaining candidate to output if it's not empty
    if (header is not None and len(candidate) > 1) or (header is None and len(candidate) > 0):
        output.append(chunk_delimiter.join(candidate))
        output_indices.append(candidate_indices)
    return output, output_indices, dropped_chunk_count

In [39]:
[len(tokenize(x)) for x in chunk_on_delimiter(united_states_wikipedia_text, 300, ".")]

[300,
 301,
 287,
 225,
 297,
 288,
 290,
 295,
 232,
 299,
 297,
 289,
 291,
 273,
 214,
 297,
 298,
 288,
 271,
 266,
 294,
 291,
 300,
 295,
 281,
 301,
 272,
 289,
 290,
 292,
 273,
 289,
 290,
 273,
 296,
 262,
 296,
 301,
 261,
 291,
 301,
 300,
 278,
 300,
 288,
 249,
 293,
 268,
 300,
 296,
 289,
 299,
 273,
 263,
 299,
 293,
 297,
 267]