In [1]:
import nltk
import os
import torch
import requests
from text_generation import Client
import json
import re
import tiktoken
nltk.download('punkt')


[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\User\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [2]:
api_key = '0QGPOO4GHY3230X2I51YWXTXP8E3UDG0AVNOCO9O'

In [8]:
class prep_and_prompt():
    
    def __init__(self, folder_path , url, api_key, file_path, enable_logging):
        self.folder_path  = folder_path 
        self.url = url
        self.api_key = api_key
        self.file_path = file_path
        self.enable_logging = enable_logging
        self.story = None
        self.segments = {}
        self.files_processed = 0
        self.json_objects_created = 0


    def load_data(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            self.story = f.read()
    
    def clean_whitespace(self):
        self.story = re.sub(r'\n{2,}', '\n', self.story)
        self.story = re.sub(r' {2,}', '\n', self.story)
        self.story = self.story.strip()
        
    def encode_data(self):
        self.encoding = tiktoken.get_encoding("cl100k_base")
        self.num_tokens = self.encoding.encode(self.story)
        
    def segment_text(self):
        self.segments = {}
        current_segment = []
        seg_num = 1
        
        # will loop untill segment has 1000 tokens
        for token in self.num_tokens:
            current_segment.append(token)
            
            # if >= 1000 will add this batch to dict
            if len(current_segment) >= 1000:
                segment_text = self.encoding.decode(current_segment)
                self.segments[f'Segment: {seg_num}'] = segment_text
                current_segment = []
                seg_num += 1
                
        # grabs remaining tokens        
        if current_segment:
            segment_text = self.encoding.decode(current_segment)
            self.segments[f'Segment: {seg_num}'] = segment_text
        return self.segments

    def generate_prompt(self, input_text):
        
    
        prompt= f"""
        Based on the following story segment '{input_text}', directly create a brief sci-fi story prompt. 
        Start the prompt immediately without any introduction, explanation, or additional words. 
        End the prompt without any concluding remarks or questions. Provide only the prompt, exactly as requested, nothing more, nothing less.
        """

        payload = { "input": {
            "prompt": prompt,
            "sampling_params": {
                "max_tokens": 1000,
                "n": 1,
                "best_of": None,
                "presence_penalty": 0,
                "frequency_penalty": 0.2,
                "temperature": 0.6,
                "top_p": 1,
                "top_k": -1,
                "use_beam_search": False,
                "ignore_eos": False,
                "logprobs": None
            }
        } }
        headers = {
        "accept": "application/json",
        "content-type": "application/json",
        "authorization": self.api_key
        }
        
        response = requests.post(self.url, json=payload, headers=headers)
    
        response_data = response.json()
    
        if response.status_code == 200:
            job_id = response_data.get('id')
            return job_id
        else:
            print("Failed to generate prompt:", response_data)
            return None
        
    def poll_for_result(self, job_id, interval=5):
        status_url = f"https://api.runpod.ai/v2/llama2-13b-chat/status/{job_id}"  
        headers = {
            "accept": "application/json",
            "authorization": self.api_key
        }
    
        while True:
            response = requests.get(status_url, headers=headers)
            result = response.json()
    
            if result['status'] == 'COMPLETED':
                output = result.get('output')
                return output  
            elif result['status'] in ['FAILED', 'ERROR']:
                print("Error or Failed Status:", result)
                return result
    
            time.sleep(interval)  # Wait before polling again

    
    def dump_jsonl(self, json_object):
        with open(self.file_path, 'a', encoding='utf-8') as f:
            json_s = json.dumps(json_object)
            f.write(json_s + '\n')
        if self.enable_logging:
            self.json_objects_created += 1
                  
    def extract_to_prompt(self):
        ''' 
        takes segmented input (dictionary) loops through,
        "gen_prompt" & "poll_for_result" functions
        takes llm gen prompt from output
        and places in new prompt with input text
        ''' 
        i = 1
        # loop through segmented outputs
        for _, text in self.segments.items():
            input_text = text
            job_id = self.generate_prompt(input_text)
            if job_id:
                output = self.poll_for_result(job_id)
                
                text = output['text'][0] if output['text'] else None
                extracted_text = ""
                
                # extract text between \n\n
                if text:
                    matches = re.findall(r"\n\n(.*?)(?:\n\n|$)", text, re.DOTALL)
                    if matches:
                        extracted_text = matches[0].strip()
                        json_obj = {
                          "messages": [
                            {"role": "system", "content": "You are the greatest sci-fi story author in the universe."},
                            {"role": "user", "content": extracted_text},
                            {"role": "assistant", "content": input_text}
                          ]
                        }
        
                        self.dump_jsonl(json_obj) # call jsonl func           
            i += 1
    
    def process_file(self, file_path):
        self.load_data(file_path)
        self.clean_whitespace()
        self.encode_data()
        self.segment_text()
        self.extract_to_prompt()
        if self.enable_logging:
            self.files_processed += 1

    def execute(self):
        ''' 
        checks if the entry is a file - if so calls 
        process_file method on each file
        '''
        for filename in os.listdir(self.folder_path):
            full_path = os.path.join(self.folder_path, filename)
            if os.path.isfile(full_path):
                self.process_file(full_path)
        if self.enable_logging:
            print(f"Total files processed: {self.files_processed}")
            print(f"Total JSON objects created: {self.json_objects_created}")



          
process = prep_and_prompt(folder_path="D:\coding\llms\sci_storys", 
                          url="https://api.runpod.ai/v2/llama2-13b-chat/runsync", 
                          api_key="0QGPOO4GHY3230X2I51YWXTXP8E3UDG0AVNOCO9O",
                         file_path="D:\coding\llms\output.jsonl",
                         enable_logging=True)  
process.execute()

In [7]:
with open("D:\coding\llms\output.jsonl", 'r', encoding='utf-8') as f:
    f.read()

In [12]:
def clean_whitespace(text):
        text = re.sub(r'\n{2,}', '\n', text)
        text = re.sub(r' {2,}', '\n', text)
        text = text.strip()
        return text

def load_data(data):
    with open(data, 'r', encoding='utf-8') as f:
        story = f.read()
    
    clean_story = clean_whitespace(story)
    
    encoding = tiktoken.get_encoding("cl100k_base")
    num_tokens = encoding.encode(clean_story)
    print(len(num_tokens))

    segments = {}
    current_segment = []
    seg_num = 1
    
    # will loop untill dict has 1000 tokens
    for token in num_tokens:
        current_segment.append(token)
        
        # if >= 1000 will add this batch to dict
        if len(current_segment) >= 1000:
            segment_text = encoding.decode(current_segment)
            segments[f'Segment: {seg_num}'] = segment_text
            current_segment = []
            seg_num += 1
            
    # grabs remaining tokens        
    if current_segment:
        segment_text = encoding.decode(current_segment)
        segments[f'Segment: {seg_num}'] = segment_text
     
    return segments

segmented_data = load_data("D:\coding\llms\sci_storys\story4.txt")

3642


In [46]:
def gen_prompt(input_text, api_key):
    url = "https://api.runpod.ai/v2/llama2-13b-chat/runsync"

    prompt= f"""
    Based on the following story segment '{input_text}', directly create a brief sci-fi story prompt. 
    Start the prompt immediately without any introduction, explanation, or additional words. 
    End the prompt without any concluding remarks or questions. Provide only the prompt, exactly as requested, nothing more, nothing less.
    """




    payload = { "input": {
        "prompt": prompt,
        "sampling_params": {
            "max_tokens": 1000,
            "n": 1,
            "best_of": None,
            "presence_penalty": 0,
            "frequency_penalty": 0.2,
            "temperature": 0.6,
            "top_p": 1,
            "top_k": -1,
            "use_beam_search": False,
            "ignore_eos": False,
            "logprobs": None
        }
    } }
    headers = {
    "accept": "application/json",
    "content-type": "application/json",
    "authorization": api_key
    }
    
    response = requests.post(url, json=payload, headers=headers)

    response_data = response.json()

    job_id = response_data.get('id')
    return job_id  
    

def poll_for_result(request_id, api_key, interval=5):
    status_url = f"https://api.runpod.ai/v2/llama2-13b-chat/status/{request_id}"  
    headers = {
        "accept": "application/json",
        "authorization": api_key
    }

    while True:
        response = requests.get(status_url, headers=headers)
        result = response.json()

        if result['status'] == 'COMPLETED':
            output = result.get('output')
            return output  
        elif result['status'] in ['FAILED', 'ERROR']:
            print("Error or Failed Status:", result)
            return result

        time.sleep(interval)  # Wait before polling again

In [59]:
def extract_to_prompt(segmented_data):
    ''' 
    takes segmented input (dictionary) loops through,
    "gen_prompt" & "poll_for_result" functions
    takes llm gen prompt from output
    and places in new prompt with input text
    ''' 
    i = 1
    json_objects = []
    # loop through segmented outputs
    for _, text in segmented_data.items():
        input_text = text
        job_id = gen_prompt(input_text, api_key)
        if job_id:
            output = poll_for_result(job_id, api_key)
            
            text = output['text'][0] if output['text'] else None
            extracted_text = ""
            
            # extract text between \n\n
            if text:
                matches = re.findall(r"\n\n(.*?)(?:\n\n|$)", text, re.DOTALL)
                if matches:
                    extracted_text = matches[0].strip()
                    json_obj = {
                      "messages": [
                        {"role": "system", "content": "You are the greatest sci-fi story author in the universe."},
                        {"role": "user", "content": extracted_text},
                        {"role": "assistant", "content": input_text}
                      ]
                    }
                    json_objects.append(json_obj) 
        i += 1

        return json_objects
json_output = extract_to_prompt(segmented_data)
json_output