In [None]:
# Quantitative quality evaluation of the generated RNs

from config import init_env, settings

init_env('evaluation')

In [None]:
# Code to attempt to detect AI-generated text [relatively] quickly via compression ratios
# (C) 2023 Thinkst Applied Research, PTY
# Author: Jacob Torrey <jacob@thinkst.com>

import lzma
from brotli import compress as brotli_compress, MODE_TEXT
from numpy import array_split
import re
from abc import ABC, abstractmethod
from enum import Enum
import numpy as np
from typing import List, Optional, Tuple, TypeAlias
from importlib.resources import files

def clean_text(s : str) -> str:
    '''
    Removes formatting and other non-content data that may skew compression ratios (e.g., duplicate spaces)
    '''
    # Remove extra spaces and duplicate newlines.
    s = re.sub(' +', ' ', s)
    s = re.sub('\t', '', s)
    s = re.sub('\n+', '\n', s)
    s = re.sub('\n ', '\n', s)
    s = re.sub(' \n', '\n', s)

    # Remove non-alphanumeric chars
    s = re.sub(r'[^0-9A-Za-z,\.\(\) \n]', '', s)

    return s

class LlmDetector:
    CHUNK_SIZE = 1500
    prelude_ratio = 0.0
    
    def _compress(self, s : str) -> float:
        pass
    
    def score_text(self, sample: str) -> float | None:
        '''
        Returns a tuple of a string (AI or Human) and a float confidence (higher is more confident) that the sample was generated 
        by either an AI or human. Returns None if it cannot make a determination
        '''
        if self.prelude_ratio == 0.0:
            return None
        # clean sample
        sample = clean_text(sample)
        sample_score = self._compress(self.prelude_str + sample)
        print(self.__class__.__name__ + ': ' + str((self.prelude_ratio, sample_score)))
        return (self.prelude_ratio - sample_score) * 100

    def score_long_text(self, sample: str) -> float | None:
        '''
        Returns a tuple of a string (AI or Human) and a float confidence (higher is more confident) that the sample was generated 
        by either an AI or human. Returns None if it cannot make a determination
        '''
        if self.prelude_ratio == 0.0:
            return None
        # clean sample
        sample = clean_text(sample)
        sample_scores = []
        for chunk in [sample[i:i + self.CHUNK_SIZE] for i in range(0, len(sample), self.CHUNK_SIZE)]:
            sample_scores.append(self._compress(self.prelude_str + chunk))
        print(self.__class__.__name__ + ': ' + str((self.prelude_ratio, np.mean(sample_scores))))
        return (self.prelude_ratio - np.mean(sample_scores)) * 100

class BrotliLlmDetector(LlmDetector):
    '''Class providing functionality to attempt to detect LLM/generative AI generated text using the brotli compression algorithm'''
    def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, \
                 prelude_ratio : Optional[float] = None, preset : int = 8):
        self.PRESET = preset
        self.WIN_SIZE = 24
        self.BLOCK_SIZE = 0
        self.CHUNK_SIZE = 1500
        self.prelude_ratio = 0.0
        if prelude_ratio != None:
            self.prelude_ratio = prelude_ratio
        
        if prelude_file != None:
            with open(prelude_file, encoding='utf-8') as fp:
                self.prelude_str = clean_text(fp.read())
            self.prelude_ratio = self._compress(self.prelude_str)
            return
    
        if prelude_str != None:
            self.prelude_str = prelude_str
            self.prelude_ratio = self._compress(self.prelude_str)

    def _compress(self, s : str) -> float:
        orig_len = len(s.encode())
        c_len = len(brotli_compress(s.encode(), mode=MODE_TEXT, quality=self.PRESET, lgwin=self.WIN_SIZE, lgblock=self.BLOCK_SIZE))
        return c_len / orig_len
    
class LzmaLlmDetector(LlmDetector):
    '''Class providing functionality to attempt to detect LLM/generative AI generated text using the LZMA compression algorithm'''
    def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 4, normalize : bool = False) -> None:
        '''Initializes a compression with the passed prelude file, and optionally the number of digits to round to compare prelude vs. sample compression'''
        self.PRESET : int = preset
        self.c_buf : List[bytes] = []
        self.in_bytes : int = 0
        self.prelude_ratio : float = 0.0
        self.nf : float = 0.0

        if prelude_ratio != None:
            self.prelude_ratio = prelude_ratio

        if prelude_file != None:
            # Read it once to get the default compression ratio for the prelude
            with open(prelude_file, 'r', encoding='utf-8') as fp:
                self.prelude_str = fp.read()
            self.prelude_ratio = self._compress(self.prelude_str)
            return

        if prelude_str != None:
            self.prelude_str = prelude_str
            if self.prelude_ratio == 0.0:
                self.prelude_ratio = self._compress(prelude_str)
        if normalize:
            self.nf : float = self.prelude_ratio / len(self.prelude_str)

    def _compress(self, s : str) -> float:
        orig_len = len(s.encode())
        c = lzma.LZMACompressor(preset=self.PRESET)
        bytes = c.compress(s.encode())
        bytes += c.flush()
        c_len = len(bytes)
        return c_len / orig_len


det_br = BrotliLlmDetector('./ai-generated.txt')
det_br.score_text('This is a test')

In [None]:
det_br.score_long_text('This is a test')

In [None]:
det_lzma = LzmaLlmDetector('./ai-generated.txt', normalize=True)
det_lzma.score_text('This is a test')

In [None]:
from functools import lru_cache
import tiktoken

@lru_cache
def get_tokenizer(model_name: str) -> tiktoken:
    return tiktoken.encoding_for_model(model_name)

def count_tokens(text: str, model_name: str = "gpt-4o") -> int:
    tokenizer = get_tokenizer(model_name)
    return len(tokenizer.encode(text, disallowed_special=()))

count_tokens('This is a test')

In [None]:
from math import log2

def count_info_entropy(md: str):
    """
    Count the information entropy of a markdown release note.
    """
    lines = md.split('\n')
    _entries_cnt = []
    _curr_cnt = 0
    for l in lines:
        if l.strip().startswith('#'):
            _entries_cnt.append(_curr_cnt)
            _curr_cnt = 0
        elif l.strip().startswith('-') or l.strip().startswith('*'):
            _curr_cnt += 1
    _entries_cnt.append(_curr_cnt)
    _sum = sum(_entries_cnt)
    _ent = 0
    for ent in _entries_cnt[1:]:
        if ent == 0:
            continue
        _ent += (ent / _sum) * log2(ent / _sum)

    # convert 0.0 to -0.0
    if _ent == 0.0:
        return 0.0
    return -_ent


a = """# Title
- Entry 1
- Entry 2
- Entry 3
- Entry 4
## Title 2
- Entry 5
- Entry 6
# Title 3
"""
count_info_entropy(a)   


In [None]:
from github import Github
from tqdm import tqdm

gh = Github(settings.GITHUB_TOKEN)

@lru_cache
def _get_commits_and_prs_between_versions(
        name_with_owner: str,
        from_version: str,
        to_version: str,
):
    gh_repo = gh.get_repo(name_with_owner)
    # get commits list
    _commits = list(gh_repo.compare(from_version, to_version).commits)
    _pr_to_commits = {}
    
    for c in tqdm(_commits):
        for pr in gh_repo.get_commit(c.sha).get_pulls():
            _pr_to_commits[pr.number] = _pr_to_commits.get(pr.number, []) + [c]
    
    return _pr_to_commits, _commits

_get_commits_and_prs_between_versions('stakater/Reloader', 'v1.0.120', 'v1.0.121')

In [None]:
import re

SHA_REGEX = re.compile(r'([0-9a-f]{7,40})')
PR_NUMBER_REGEX = re.compile(r'#(\d+)')
PR_URL_REGEX = re.compile(r'/pull/(\d+)')

def _match_commits_and_prs(
        rn_md: str,
):
    _commits = SHA_REGEX.findall(rn_md)
    _prs = PR_NUMBER_REGEX.findall(rn_md)
    _pr_urls = PR_URL_REGEX.findall(rn_md)
    return (
        set(_c[:7] for _c in _commits),
        set(int(p) for p in _prs) | set(int(p) for p in _pr_urls),
    )

_match_commits_and_prs(
"""# Title
- Entry 1 #24
- Entry 2 https://github.com/stakater/Reloader/pull/25
- Entry 3 https://github.com/stakater/Reloader/comit/1234ff67ff90
- Entry 4 1234567890
"""
)


In [None]:
def calculate_commit_coverage(
        name_with_owner: str,
        from_version: str,
        to_version: str,
        rn_md: str,
):
    _pr_to_commits, _commits_to_hit = _get_commits_and_prs_between_versions(name_with_owner, from_version, to_version)
    # replace all commits objects with short sha
    _commits_to_hit = set(c.sha[:7] for c in _commits_to_hit)
    _n_commits = len(_commits_to_hit)
    _pr_to_commits = {k: [c.sha[:7] for c in v] for k, v in _pr_to_commits.items()}

    print("PRs", _pr_to_commits, "Commits", _commits_to_hit)

    _commits_in_rn, _prs_in_rn = _match_commits_and_prs(rn_md)
    for c in _commits_in_rn:
        if c in _commits_to_hit:
            _commits_to_hit.remove(c)
    for pr in _prs_in_rn:
        if pr in _pr_to_commits:
            _commits_to_hit -= set(_pr_to_commits[pr])

    return 1 - len(_commits_to_hit) / _n_commits

calculate_commit_coverage('stakater/Reloader', 'v1.0.120', 'v1.0.121', """# v1.0.121
## 🔧 chore
- Updated artifacts and changed resource field references in environment variables. [70ab566](https://github.com/stakater/Reloader/commit/70ab56606df1f9fd4877b0f615b0b929f8269511) <span style='color:grey;'>(significance=0.61)</span>
- Fixed incorrect environment variables when enableHA is true. [#723](https://github.com/stakater/Reloader/pull/723) <span style='color:grey;'>(significance=0.57)</span>
""")
                     


In [None]:
import os

REGEX_NAME = re.compile(r'-- ([\w-]+)/([\w-]+)')
REGEX_VERSION = re.compile(r'--previous-release\s+(\S+) --current-release\s+(\S+)')

s = 'smartdraft.generator -- stakater/Reloader --previous-release v1.0.120 --current-release v1.0.121 '

def parse_cli_args(s: str):
    # yields an error if unmatched, should never happen
    repo_name = REGEX_NAME.search(s).group(1) + '/' + REGEX_NAME.search(s).group(2)
    from_version = REGEX_VERSION.search(s).group(1)
    to_version = REGEX_VERSION.search(s).group(2)
    return repo_name, from_version, to_version

parse_cli_args(s)

In [None]:
import textstat

def calculate_readability(text: str) -> float:
    return 

calculate_readability('This is a test')

In [None]:
# prepare the tokenizer for token classification
# cite: https://github.com/taidnguyen/software_entity_recognition

from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline

tokenizer = AutoTokenizer.from_pretrained("taidng/wikiser-bert-base")
model = AutoModelForTokenClassification.from_pretrained("taidng/wikiser-bert-base")

nlp = pipeline("ner", model=model, tokenizer=tokenizer, device=settings.TORCH_DEVICE)
example = "This release fixes a bug in the previous version."

# count the number of tokens in the example
_tokenized = tokenizer(example)
print(len(_tokenized['input_ids']))

ner_results = nlp(example)
print(list(l['word'] for l in ner_results))
def entity_density(text: str) -> int:
    return len(nlp(text)) / len(tokenizer(text)['input_ids'])

entity_density('This release fixes a bug in the previous version.')

In [None]:
import os
import pandas as pd

out_path = 'Software_Tools/reloader'

def eval_path(out_path):
    _cli_file_path = os.path.join(out_path, 'details.txt')

    repo_name, from_version, to_version = parse_cli_args(open(_cli_file_path).read())
    repo_name, from_version, to_version

    # list markdown files
    _markdown_files = [f for f in os.listdir(out_path) if f.endswith('.md')]

    results = []
    for _f in _markdown_files:
        print(f'Processing {_f}...')
        _rn_md = open(os.path.join(out_path, _f)).read()
        results.append({
            'repo_name': repo_name,
            'rn_name': _f.replace('.md', ''),
            # 'commit_coverage': calculate_commit_coverage(repo_name, from_version, to_version, _rn_md),
            # 'info_entropy': count_info_entropy(_rn_md),
            # 'tokens_count': count_tokens(_rn_md),
            # 'llm_brotli': det_br.score_text(_rn_md),
            # 'llm_lzma': det_lzma.score_text(_rn_md),
            # 'reading_ease': textstat.flesch_reading_ease(clean_text(_rn_md)),
            'automated_readability_index': textstat.automated_readability_index(clean_text(_rn_md)),
            'dale_chall_readability': textstat.dale_chall_readability_score(clean_text(_rn_md)),
            # 'smog_readability': textstat.smog_index(clean_text(_rn_md)),
            # 'coleman_liau_index': textstat.coleman_liau_index(clean_text(_rn_md)),
            'entity_percent': entity_density(clean_text(_rn_md)) * 100,
            'entity_count': len(nlp(clean_text(_rn_md))),
        })

    return pd.DataFrame(results)

eval_path(out_path)

In [None]:
from glob import glob

_l_ev = []
for out_path in glob('*/*'):
    if 'node_modules' in out_path:
        continue
    if not os.path.exists(os.path.join(out_path, 'details.txt')):
        continue
    print(f'Processing {out_path}...')
    df_rn = eval_path(out_path)
    df_rn['project_domain'] = out_path.split('/')[0]
    df_rn.to_csv(os.path.join(out_path, 'quant_eval.csv'), index=False)
    _l_ev.append(df_rn)

df_ev = pd.concat(_l_ev)
df_ev.to_csv('quant_eval.csv', index=False)

In [None]:
df_ev

In [None]:
# let's group by rn_name and project_domain and calculate the mean
df_ev.groupby(['rn_name']).mean()

In [None]:
# transform to markdown

print(df_ev.groupby(['rn_name']).mean().to_markdown())