# Simple Classifier for LLM Prompts

### 🔨 **Setup**

In [1]:
# Use the Language.build_library method to compile these into a library that's usable from Python. 
# This function will return immediately if the library has already been compiled since the last 
# time its source code was modified:

from tree_sitter import Language, Parser
import os

# Ensuring that the library is compiled each time this cell is run.
if os.path.exists("build/my-languages.so"):
    os.remove("build/my-languages.so")

Language.build_library(
    # Store the library in the `build` directory
    "build/my-languages.so",
    # Include one or more languages
    ["vendor/tree-sitter-python"],
)

True

### 🔍 **Modified Parser for Training**

Assumption: Variable assigned strings with newline characters are prompts

In [2]:
def parse_strings(filename):
    PY_LANGUAGE = Language('./build/my-languages.so', 'python')
    parser = Parser()
    parser.set_language(PY_LANGUAGE)
    result = []

    with open(filename, "rb") as f:
        tree = parser.parse(f.read())

    # cursor = tree.walk()  Not using this for tree-traversal

    # Alternative method
    def traverse(node):
        if node.type == "string":
            # convert bytes to string, and add to list
            string = node.text.decode("utf-8")
            result.append(string) if len(string) > 0 else None
                    
        for child in node.children:
            traverse(child)

    traverse(tree.root_node)

    return result

def parse_prompts(filename):
    PY_LANGUAGE = Language('./build/my-languages.so', 'python')
    parser = Parser()
    parser.set_language(PY_LANGUAGE)
    result = []

    with open(filename, "rb") as f:
        tree = parser.parse(f.read())

    query = PY_LANGUAGE.query("""
        (expression_statement
            (assignment
                left: (identifier) @var.name
                right: (string) @var.value
            )
        )
    """)

    for usage in query.captures(tree.root_node):
        if usage[1] == "var.value":
            # heuristic, check if string has a newline in it, if so then it's probably a prompt
            res = usage[0].text.decode("utf-8")
            if "\n" in res:
                result.append(res)

    return result

In [3]:

# Using the parser to generate training data for the prompt classifier
root_dir = "repos"
count = 0
all_prompts = set()
all_prompt_classifications = []
for repo in os.listdir(root_dir):
    repo_path = os.path.join(root_dir, repo)
    for file in os.listdir(repo_path):
        file_path = os.path.join(repo_path, file)
        try:
            prompts = parse_prompts(file_path)
            strings = parse_strings(file_path)
            if len(prompts) > 0:
                count += 1
                all_prompts.update(prompts)
            for string in strings:
                all_prompt_classifications.append([string, int(string in prompts)]) 
        except Exception as e:
            print(e)
            print("Error: ", repo_path, file_path)

# print(repo_to_prompts)
print(f"Parser Returns result for {count} files out of 1444 files")

Parser Returns result for 560 files out of 1444 files


In [4]:
import pandas as pd

# load all_prompt_classifications to a dataframe
df = pd.DataFrame(all_prompt_classifications, columns=["text", "is_prompt"])
print(f"df_size: {len(df)} ; Prompt Count: {df['is_prompt'].sum()} ; Non-Prompt Count: {len(df) - df['is_prompt'].sum()}")

# Downsample the dataframe to have equal number of prompts and non-prompts
print("Downsampling")
df = df.groupby('is_prompt').apply(lambda x: x.sample(n=df["is_prompt"].sum())).reset_index(drop=True)

# Save the prompt classifications as a csv file
df.to_csv('prompt_classifications.csv', index=False)

print(f"df_size: {len(df)} ; Prompt Count: {df['is_prompt'].sum()} ; Non-Prompt Count: {len(df) - df['is_prompt'].sum()}")

df_size: 64128 ; Prompt Count: 1127 ; Non-Prompt Count: 63001
Downsampling
df_size: 2254 ; Prompt Count: 1127 ; Non-Prompt Count: 1127


### **Approach 1**: Binary Classification using Logistic Regression 🪵

In [5]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report

df = pd.read_csv("prompt_classifications.csv")
data = df["text"].values
labels = df["is_prompt"].values

def train_llm_prompt_classifier(data, labels):
    # Split the data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.3, random_state=42, shuffle=True)

    # Convert text to numerical features using TF-IDF
    # May not be the best idea for detecting LLM prompts 😬. Let's see how it goes.
    tfidf_vectorizer = TfidfVectorizer() 

    # Use Logistic Regression for classification
    classifier = LogisticRegression()

    # Create a pipeline
    pipeline = Pipeline([
        ('tfidf', tfidf_vectorizer),
        ('classifier', classifier)
    ])

    # Train the model
    pipeline.fit(X_train, y_train)

    # Evaluate the model on the test data
    y_pred = pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print("\n\nClassifier Performance\n")
    print(f"Accuracy: {accuracy:.2f}\n")
    print(classification_report(y_test, y_pred))

    return pipeline

log_classifier = train_llm_prompt_classifier(data, labels)



Classifier Performance

Accuracy: 0.88

              precision    recall  f1-score   support

           0       0.87      0.91      0.89       355
           1       0.90      0.85      0.87       322

    accuracy                           0.88       677
   macro avg       0.88      0.88      0.88       677
weighted avg       0.88      0.88      0.88       677



In [6]:
def is_llm_prompt(text, classifier):
    prediction = classifier.predict([text])
    return prediction[0] == 1

example_text = """\
<< Example {i}. >>
Data Source:
{data_source}

User Query:
{user_query}

Structured Request:
{structured_request}
"""
print(is_llm_prompt(example_text, log_classifier))

True


**Saving parsing results for log classifier (for later comparison)**

In [7]:
def parse_log_classifier(filename):
    PY_LANGUAGE = Language('./build/my-languages.so', 'python')
    parser = Parser()
    parser.set_language(PY_LANGUAGE)
    result = []

    with open(filename, "rb") as f:
        tree = parser.parse(f.read())

    def traverse(node):
        if node.type == "string" and len(node.text.decode("utf-8")) > 0:
            # convert bytes to string, and add to list
            string = node.text.decode("utf-8")

            if is_llm_prompt(string, log_classifier):
                result.append(string)
            
        for child in node.children:
            traverse(child)

    traverse(tree.root_node)

    return result


root_dir = "repos"
repo_to_prompts = {}
count = 0
repo_count = 0
for repo in os.listdir(root_dir):
    repo_path = os.path.join(root_dir, repo)
    for file in os.listdir(repo_path):
        file_path = os.path.join(repo_path, file)
        try:
            prompt = parse_log_classifier(file_path)
            if len(prompt) > 0:
                count += 1
                val = repo_to_prompts.get(repo, [])
                val.extend(prompt)
                repo_to_prompts[repo] = val
                # print("Repo: ", repo, "; File: ", file)
                # print(prompt)
        except Exception as e:
            print(e)
            print("Error: ", repo_path, file_path)
    repo_count += 1
    if repo_count % 10 == 0:
        print(f"Finished {repo_count} repos")

# Save repo_to_prompts (according to flair) as a json file
import json
with open('repo_to_prompts_logClassifier.json', 'w') as f:
    json.dump(repo_to_prompts, f)


print(repo_to_prompts)
print(f"Parser Returns result for {count} files out of 1444 files")

Finished 10 repos
Finished 20 repos
Finished 30 repos
Finished 40 repos
Finished 50 repos
Finished 60 repos
Finished 70 repos
Finished 80 repos
Finished 90 repos
Finished 100 repos
Finished 110 repos
Finished 120 repos
Finished 130 repos
Finished 140 repos
Finished 150 repos
Finished 160 repos
Finished 170 repos
Finished 180 repos
Finished 190 repos
Finished 200 repos
Finished 210 repos
Finished 220 repos
Finished 230 repos
Finished 240 repos
Finished 250 repos
Finished 260 repos
Finished 270 repos
Finished 280 repos
Finished 290 repos
Finished 300 repos
Finished 310 repos
Finished 320 repos
Finished 330 repos
Finished 340 repos
Finished 350 repos
Finished 360 repos
Finished 370 repos
Parser Returns result for 1129 files out of 1444 files


### **Approach 2**: Text Classification with Flair 🤖

In [8]:
import pandas as pd

df = pd.read_csv("prompt_classifications.csv")

# Create 60-20-20 train-dev-test split
train, test = train_test_split(df, test_size=0.2, random_state=42, shuffle=True)
train, dev = train_test_split(train, test_size=0.25, random_state=42, shuffle=True)

# Save the train, dev, test splits as csv files
train.to_csv('flair_corpus/train.csv', index=False)
dev.to_csv('flair_corpus/dev.csv', index=False)
test.to_csv('flair_corpus/test.csv', index=False)

In [9]:
from flair.data import Corpus
from flair.datasets import CSVClassificationCorpus
from flair.embeddings import TransformerDocumentEmbeddings
from flair.models import TextClassifier
from flair.trainers import ModelTrainer

def create_classifier():
    # 1. Prepare your CSV data (train.csv, dev.csv, test.csv)
    data_folder = "./flair_corpus"
    column_name_map = {0: "text", 1: "label"}

    # 2. Load the corpus using your CSV dataset 
    corpus = CSVClassificationCorpus(data_folder, column_name_map, delimiter=",", 
                                     train_file="train.csv", dev_file="dev.csv", test_file="test.csv", 
                                     label_type="class", skip_header=True)

    # 3. Create the label dictionary
    label_dict = corpus.make_label_dictionary("class")

    # 3.5. Print corpus statistics (For debugging purposes)
    print(corpus.obtain_statistics())
    print(label_dict)

    # 4. Initialize transformer document embeddings (many models are available)
    # Refer to this for other models: https://huggingface.co/transformers/v2.3.0/pretrained_models.html
    document_embeddings = TransformerDocumentEmbeddings('distilbert-base-uncased', fine_tune=True)

    # 5. Create the text classifier
    classifier = TextClassifier(document_embeddings, label_dictionary=label_dict, label_type="class")

    # 6. Initialize the trainer
    trainer = ModelTrainer(classifier, corpus)

    # 7. Run training with fine-tuning
    trainer.fine_tune('resources/classifiers/dj_classifier',
                    learning_rate=5.0e-5,
                    mini_batch_size=4,
                    max_epochs=10,
                    )
    
# create_classifier()  # Uncomment this line to create the classifier

  from .autonotebook import tqdm as notebook_tqdm


✨ Trying out the new Classifier! :-D

In [10]:
from flair.data import Sentence

classifier = TextClassifier.load('resources/classifiers/dj_classifier/final-model.pt')

# create example sentence
sentence = Sentence("""You're an elite algorithm, answering queries based solely on given context. If the context lacks the answer, state ignorance. If you are not 100% sure tell the user.

        Context:
        {context}""")

# predict class and print
classifier.predict(sentence)

print(sentence.labels)

for label in sentence.labels:
    print(label.value, label.score)

len(sentence.labels)

['Sentence[40]: "You're an elite algorithm, answering queries based solely on given context. If the context lacks the answer, state ignorance. If you are not 100% sure tell the user.          Context:         {context}"'/'1' (0.9999)]
1 0.9999115467071533


1

### **Conclusion**: Flair is better than Logistic Regression. I wonder if the results would be similar if we used the flair embeddings with Logistic Regression. 🤷

### **NLP Powered Parser!** 🤖💪

In [11]:
classifier = TextClassifier.load('resources/classifiers/dj_classifier/final-model.pt')

def parse_flair(filename, classifier):
    PY_LANGUAGE = Language('./build/my-languages.so', 'python')
    parser = Parser()
    parser.set_language(PY_LANGUAGE)
    result = []

    with open(filename, "rb") as f:
        tree = parser.parse(f.read())

    # cursor = tree.walk()  Not using this for tree-traversal

    # Alternative method
    def traverse(node):
        if node.type == "string" and len(node.text.decode("utf-8")) > 0:
            # convert bytes to string, and add to list
            string = node.text.decode("utf-8")

            # create sentence object and predict
            sentence = Sentence(string)  
            classifier.predict(sentence)

            # check if sentence is a prompt
            if len(sentence.labels) > 1:
                raise Exception("More than one label")
            if len(sentence.labels) > 0 and sentence.labels[0].value == '1' and sentence.labels[0].score > 0.95:
                # print(sentence.labels)
                result.append(string)
            
        for child in node.children:
            traverse(child)

    traverse(tree.root_node)

    return result

# Test the parser
res = parse_flair("repos/0ptim~JellyChat/backend~tools~defichainpython_qa.py", classifier)
print(len(res))

for prompt in res:
    print(prompt)

# Test the parser
res = parse_flair("repos/su77ungr~CASALIOY/casalioy~CustomChains.py", classifier)
print(len(res))

for prompt in res:
    print(prompt)

3
"""You're an elite algorithm, answering queries based solely on given context. If the context lacks the answer, state ignorance. If you are not 100% sure tell the user.

        Context:
        {context}"""
f"""Answer: {result.answer}
        Sources: {json.dumps(result.sources)}
        """
"""Use this if you need to answer any question reguarding python and coding in general. Keywords: python, script, coding, connection to a defichain node, connection to ocean API, creating a wallet, create custom transactions. Make sure to include the source of the answer in your response."""
7
"""base class for Question-Answering"""
"""ask a question, return results"""
"""custom QA close to a stuff chain
    compared to the default stuff chain which may exceed the context size, this chain loads as many documents as allowed by the context size.
    Since it uses all the context size, it's meant for a "one-shot" question, not leaving space for a follow-up question which exactly contains the previo

**Saving parsing results for log classifier (for later comparison)**

In [12]:
root_dir = "repos"

repo_to_prompts = {}
count = 0
repo_count = 0
for repo in os.listdir(root_dir):
    repo_path = os.path.join(root_dir, repo)
    for file in os.listdir(repo_path):
        file_path = os.path.join(repo_path, file)
        try:
            prompt = parse_flair(file_path, classifier)
            if len(prompt) > 0:
                count += 1
                val = repo_to_prompts.get(repo, [])
                val.extend(prompt)
                repo_to_prompts[repo] = val
                # print("Repo: ", repo, "; File: ", file)
                # print(prompt)
        except Exception as e:
            print(e)
            print("Error: ", repo_path, file_path)
    repo_count += 1
    if repo_count % 10 == 0:
        print(f"Finished {repo_count} repos")

# Save repo_to_prompts (according to flair) as a json file
import json
with open('repo_to_prompts_FLAIR.json', 'w') as f:
    json.dump(repo_to_prompts, f)


print(repo_to_prompts)
print(f"Parser Returns result for {count} files out of 1444 files")

Finished 10 repos
Finished 20 repos
Finished 30 repos
Finished 40 repos
Finished 50 repos
Finished 60 repos
Finished 70 repos
Finished 80 repos
Finished 90 repos
Finished 100 repos
Finished 110 repos
Finished 120 repos
Finished 130 repos
Finished 140 repos
Finished 150 repos
Finished 160 repos
Finished 170 repos
Finished 180 repos
Finished 190 repos
Finished 200 repos
Finished 210 repos
Finished 220 repos
Finished 230 repos
Finished 240 repos
Finished 250 repos
Finished 260 repos
Finished 270 repos
Finished 280 repos
Finished 290 repos
Finished 300 repos
Finished 310 repos
Finished 320 repos
Finished 330 repos
Finished 340 repos
Finished 350 repos
Finished 360 repos
Finished 370 repos
Parser Returns result for 886 files out of 1444 files
