In [2]:
from langchain.chains.combine_documents.map_reduce import MapReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.combine_documents.map_reduce import MapReduceDocumentsChain
from langchain.chains.mapreduce import MapReduceChain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dotenv import load_dotenv
from langchain.llms import OpenAI
from langchain import PromptTemplate, LLMChain
import glob

load_dotenv()

True

In [3]:
from langchain.chains.combine_documents.refine import RefineDocumentsChain


In [4]:
# prompt = PromptTemplate(template=map_template_string, input_variables=["code"])
# llm_chain = LLMChain(prompt=prompt, llm=llm)
# llm_chain.run(code=code)

In [5]:
"""Give the following code information, for the following and access the code quality in terms of following points.
    Answer the following if possible with a score 0-10.
        1. lines of code
        2. cyclomatic complexity
        3. nesting depth
        4. code duplication
        5. code coupling
        6. Code readability
        7. Code maintanibility
        8. Proper documentation
        9. Proper function doc strings
        10. Proper maintained readme.

    The code is given in following format.
    Format:
        filename
        ##########
        code

    Code:
        {code}
    """

'Give the following code information, for the following and access the code quality in terms of following points.\n    Answer the following if possible with a score 0-10.\n        1. lines of code\n        2. cyclomatic complexity\n        3. nesting depth\n        4. code duplication\n        5. code coupling\n        6. Code readability\n        7. Code maintanibility\n        8. Proper documentation\n        9. Proper function doc strings\n        10. Proper maintained readme.\n\n    The code is given in following format.\n    Format:\n        filename\n        ##########\n        code\n\n    Code:\n        {code}\n    '

In [19]:
import re
import json
import random
import os

EXTENSION_SET = {
    ".py",  # Python
    ".java",  # Java
    ".js",  # JavaScript
    ".cpp",  # C++
    ".c",  # C
    ".html",  # HTML
    ".css",  # CSS
    ".php",  # PHP
    ".rb",  # Ruby
    ".swift",  # Swift
    ".go",  # Go
    ".ts",  # TypeScript
    ".sh",  # Shell script
    ".pl",  # Perl
    ".r",  # R
    ".scala",  # Scala
    ".lua",  # Lua
    ".md",  # Markdown
    ".json",  # JSON
    ".xml",  # XML
    ".yaml",  # YAML
    ".sql",  # SQL
    ".h",  # Header file
    ".hpp",  # C++ header file
    ".cs",  # C#
    ".vb",  # Visual Basic
    ".asm",  # Assembly
    ".dockerfile",  # Dockerfile
    ".yml",  # YAML (alternative extension)
    ".kt",  # Kotlin
    ".jl",  # Julia
    ".groovy",  # Groovy
    ".pl",  # Prolog
    ".ps1",  # PowerShell
    ".tex",  # LaTeX
    ".matlab",  # MATLAB
    ".m",  # MATLAB (alternative extension)
    ".dart",  # Dart
    ".bash",  # Bash script
    ".jsx",  # JSX (JavaScript extension)
    ".tsx",  # TSX (TypeScript extension)
    ".cfg",  # Configuration file
    ".ini",  # INI file
    ".md",  # Markdown file
}


class LLMCodeAnalyser:
    map_template_string = """Give the following code information, for the following and access the code quality in terms of following points.
    Answer the following if possible with a score 0-10.
        1. lines of code
        2. cyclomatic complexity
        3. nesting depth
        4. code duplication
        5. code coupling
        6. Code readability
        7. Code maintanibility
        8. Proper documentation
        9. Proper function doc strings
        10. Proper maintained readme.

    Code:
        {code}
    """

    reduce_template_string = """Given the information about the code quality, 
    Aggregate the results below and convert it to python dict
        {code_description}
        Answer:
    """

    def __init__(self) -> None:
        self.llm = OpenAI()

    def _convert_notebook_to_code_string(notebook_path: str):
        # Open the Jupyter Notebook file
        with open(notebook_path, "r") as f:
            notebook_content = json.load(f)

        code_cells = []
        for cell in notebook_content["cells"]:
            if cell["cell_type"] == "code":
                code = "".join(cell["source"])
                code_cells.append(code)
            elif cell["cell_type"] == "mardown":
                comment = "".join(cell["source"])
                code_cells.append("'''" + comment + "'''")

        code_string = "\n".join(code_cells)
        return code_string

    def _load_text_files(self, file):
        # handle notebooks
        extension = "." + file.split(".")[-1]
        if extension == ".ipynb":
            code = self._convert_notebook_to_code_string(file)
            code = f"------------------------------------\n{file}\n{code}"
            return code
        elif extension in EXTENSION_SET:
            try:
                f = open(file, "r")
                code = f.read()
                code = f"------------------------------------\n{file}\n{code}"
                return code
            except:
                return None
    
    def _get_file_paths(self, directory):
        file_paths = []
        for root, dirs, files in os.walk(directory):
            for file in files:
                file_path = os.path.join(root, file)
                file_paths.append(file_path)
        return file_paths

    def get_code(self, repo_path: str, sampling_rate: float = 0.25):
        files = self._get_file_paths(repo_path)
        files = sorted(files, key=lambda x: len(x.split("/")))
        min_level = min(map(lambda x: len(x.split("/")), files))
        files_at_level0 = list(filter(lambda x: len(x.split("/")) == min_level, files))
        random_sampled_files = random.sample(
            list(filter(lambda x: (len(x.split("/")) != min_level) and '.' in x, files)),
            k=min(10, int(sampling_rate * len(files))),
        )
        files = files_at_level0 + random_sampled_files
        files = list(
            filter(
                lambda x: x is not None and len(x) > 0,
                map(lambda x: self._load_text_files(x), files),
            )
        )
        files = "".join(files)
        return files

    def analyse_repo_gpt(self, repo_path: str) -> dict:
        codebase = self.get_code(repo_path)
        map_prompt = PromptTemplate(
            input_variables=["code"], template=self.map_template_string
        )
        map_llm_chain = LLMChain(llm=self.llm, prompt=map_prompt)
        reduce_prompt = PromptTemplate(
            input_variables=["code_description"], template=self.reduce_template_string
        )
        reduce_llm_chain = LLMChain(llm=self.llm, prompt=reduce_prompt)
        generative_result_reduce_chain = StuffDocumentsChain(
            llm_chain=reduce_llm_chain,
            document_variable_name="code_description",
        )
        combine_documents = MapReduceDocumentsChain(
            llm_chain=map_llm_chain,
            combine_document_chain=generative_result_reduce_chain,
            document_variable_name="code",
        )
        map_reduce = MapReduceChain(
            combine_documents_chain=combine_documents,
            text_splitter=RecursiveCharacterTextSplitter(
                chunk_size=2000,
                chunk_overlap=10,
                length_function=len,
            ),
        )
        result = map_reduce.run(input_text=codebase, verbose=True)
        return result

In [20]:
code_analyser = LLMCodeAnalyser()

In [21]:
repo_path = 'repos/darts'
codebase = code_analyser.get_code(repo_path)

In [8]:
code_analyser.analyse_repo_gpt(repo_path)

IndexError: list index out of range

In [None]:
result = map_reduce.run(
    input_text=codebase
)

InvalidRequestError: This model's maximum context length is 2049 tokens, however you requested 3091 tokens (2835 in your prompt; 256 for the completion). Please reduce your prompt; or completion length.

In [None]:
from langchain.chains.mapreduce import MapReduceChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.combine_documents.refine import RefineDocumentsChain

map_template_string = """Given the following python code information, generate a description that explains what the code does and also mention the time complexity.
Code:
{code}

Return the the description in the following format:
name of the function: description of the function
"""


reduce_template_string = """Given the following python function names and descriptions, answer the following question
{code_description}
Question: {question}
Answer:
"""

MAP_PROMPT = PromptTemplate(input_variables=["code"], template=map_template_string)
REDUCE_PROMPT = PromptTemplate(input_variables=["code_description", "question"], template=reduce_template_string)

llm = OpenAI()

map_llm_chain = LLMChain(llm=llm, prompt=MAP_PROMPT)
reduce_llm_chain = LLMChain(llm=llm, prompt=REDUCE_PROMPT)

generative_result_reduce_chain = StuffDocumentsChain(
    llm_chain=reduce_llm_chain,
    document_variable_name="code_description",
)

combine_documents = RefineDocumentsChain(
    llm_chain=map_llm_chain,
    combine_document_chain=generative_result_reduce_chain,
    document_variable_name="code",
)

map_reduce = MapReduceChain(
    combine_documents_chain=combine_documents,
    text_splitter=RecursiveCharacterTextSplitter(
                chunk_size=2000,
                chunk_overlap=10,
                length_function=len,
            ),
)

code = """
def bubblesort(list):
   for iter_num in range(len(list)-1,0,-1):
      for idx in range(iter_num):
         if list[idx]>list[idx+1]:
            temp = list[idx]
            list[idx] = list[idx+1]
            list[idx+1] = temp
    return list
##
def insertion_sort(InputList):
   for i in range(1, len(InputList)):
      j = i-1
      nxt_element = InputList[i]
   while (InputList[j] > nxt_element) and (j >= 0):
      InputList[j+1] = InputList[j]
      j=j-1
   InputList[j+1] = nxt_element
   return InputList
##
def shellSort(input_list):
   gap = len(input_list) // 2
   while gap > 0:
      for i in range(gap, len(input_list)):
         temp = input_list[i]
         j = i
   while j >= gap and input_list[j - gap] > temp:
      input_list[j] = input_list[j - gap]
      j = j-gap
      input_list[j] = temp
   gap = gap//2
   return input_list
"""



KeyError: 'initial_llm_chain'

In [22]:
map_reduce.run(input_text=codebase, question="What are the function names?")

NameError: name 'map_reduce' is not defined

In [None]:
codebase



In [36]:
from langchain.chains.summarize import load_summarize_chain
from langchain.docstore.document import Document
from langchain.chat_models.openai import ChatOpenAI


prompt_template = """Given the following code information, for the following and access the code quality in terms of following points.
    Answer the following with a score 0-10.
        1. lines of code
        2. cyclomatic complexity
        3. nesting depth
        4. code duplication
        5. code coupling
        6. Code readability
        7. Code maintanibility
        8. Proper documentation
        9. Proper function doc strings
        10. Proper maintained readme (1=yes or 0=no).

    Code:
        {text}
    """

refine_template = """Given the information about the code quality till now,
{existing_answer}

Analyse the code below on the parameters given below and update them
1. lines of code
2. cyclomatic complexity
3. nesting depth
4. code duplication
5. code coupling
6. Code readability
7. Code maintanibility
8. Proper documentation
9. Proper function doc strings
10. Proper maintained readme (yes or no).

{text}

Answer:
"""

reduce_template_string = """Given the information about the code quality, 
Aggregate the results below and convert it to python dict
    {text}
    Answer:
"""

prompt = PromptTemplate(template=prompt_template, input_variables=["text"])
# refine_prompt = PromptTemplate(
#     input_variables=["existing_answer", "text"],
#     template=refine_template,
# )
reduce_prompt = PromptTemplate(
    input_variables=["text"],
    template=reduce_template_string,
)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=2000,
    chunk_overlap=10,
    length_function=len,
)
print(len(codebase))
texts = text_splitter.split_text(codebase)
docs = [Document(page_content=t) for t in texts]
print(len(docs))

llm = ChatOpenAI(model='gpt-3.5-turbo')
chain = load_summarize_chain(
    llm,
    chain_type="map_reduce",
    return_intermediate_steps=False,
    map_prompt=prompt,
    combine_prompt=reduce_prompt,
)
print(len(docs))


27210
17
17


In [37]:
# %time output = await chain.acall({"input_documents": docs}, return_only_outputs=True)
%time output = chain({"input_documents": docs}, return_only_outputs=True)

Retrying langchain.chat_models.openai.ChatOpenAI.completion_with_retry.<locals>._completion_with_retry in 1.0 seconds as it raised RateLimitError: That model is currently overloaded with other requests. You can retry your request, or contact us through our help center at help.openai.com if the error persists. (Please include the request ID d681ff3d148ffba33420097363ef4c47 in your message.).


CPU times: user 136 ms, sys: 30.7 ms, total: 167 ms
Wall time: 2min 29s


In [33]:
output

{'intermediate_steps': ['1. lines of code: 36\n2. cyclomatic complexity: 1\n3. nesting depth: 1\n4. code duplication: 0\n5. code coupling: 0\n6. Code readability: 8\n7. Code maintainability: 8\n8. Proper documentation: 1\n9. Proper function doc strings: 0\n10. Proper maintained readme (1=yes or 0=no): 1',
  '1. Lines of code: 10\n2. Cyclomatic complexity: 1\n3. Nesting depth: 1\n4. Code duplication: 0\n5. Code coupling: 0\n6. Code readability: 8\n7. Code maintainability: 8\n8. Proper documentation: 3\n9. Proper function doc strings: 0\n10. Proper maintained readme (1=yes or 0=no): 1',
  '1. lines of code - 15 (excluding comments)\n2. cyclomatic complexity - 1\n3. nesting depth - 1\n4. code duplication - 0\n5. code coupling - 0\n6. Code readability - 8\n7. Code maintainability - 8\n8. Proper documentation - 7\n9. Proper function doc strings - 0\n10. Proper maintained readme - 1',
  '1. lines of code: 42\n2. cyclomatic complexity: 1\n3. nesting depth: 1\n4. code duplication: 0\n5. code c

In [38]:
print(output['output_text'])

{
    "lines of code": [46, 11, 40, 43, 23, 39, 34, 11, 30, 20, 38, 25, 25, 7],
    "cyclomatic complexity": [1, 1, 1, 2, 1, 8, 1, null, 1, 2, 4, 5, 3, 1],
    "nesting depth": [2, 1, 1, 2, 1, 1, 2, 1, 1, 3, 2, 3, null, 1],
    "code duplication": [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
    "code coupling": [1, 0, 1, 2, 0, 6, 2, 2, 1, 3, 1, 3, 3, 0],
    "code readability": [8, 8, 7, 8, 8, 7, 8, 8, 8, 8, 7, 8, 8, 8],
    "code maintainability": [9, 8, 8, 9, 8, 8, 8, 8, 9, 7, 8, 7, 7, 8],
    "proper documentation": [1, 1, 6, 5, 7, 6, 3, 7, 7, 2, 7, 6, 5, 5],
    "proper function doc strings": [0, 0, 0, 0, 7, 0, 2, 0, 5, 1, 6, 5, 0, 5],
    "proper maintained readme": [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
}


In [40]:
import os



# Example usage:
directory_path = 'repos/darts/'
paths = get_file_paths(directory_path)


62