In [1]:
import os
import json
from typing import List, Dict, Tuple
from sentence_transformers import SentenceTransformer
from faiss import IndexFlatL2
import numpy as np
import transformers
import torch
from langchain_community.llms import HuggingFaceEndpoint
from langchain_community.chat_models.huggingface import ChatHuggingFace
from langchain.agents import Tool
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain_huggingface import HuggingFacePipeline

from transformers import AutoModelForCausalLM, AutoTokenizer
hf_token = "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"

  from tqdm.autonotebook import tqdm, trange


In [2]:
model_name = "google/gemma-2-9b-it"
tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token)
model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto", torch_dtype=torch.float16, token=hf_token)

# Create a HuggingFacePipeline
pipeline = transformers.pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_length=10048,
    temperature=0.7,
    top_p=0.95,
    repetition_penalty=1.15
)

local_llm = HuggingFacePipeline(pipeline=pipeline)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model = torch.compile(model)


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [17]:
class CloneDetector:
    def __init__(self, embedding_model: str = 'all-MiniLM-L6-v2'):
        print(f"Loading embedding model: {embedding_model}")
        self.embedding_model = SentenceTransformer(embedding_model)
        self.index = IndexFlatL2(384)
        self.documents: List[Dict[str, str]] = []
        
    def read_directory(self, directory: str):
        for filename in os.listdir(directory):
            if filename.endswith('.py'):
                with open(os.path.join(directory, filename), 'r') as file:
                    content = file.read()
                    self.documents.append({
                        'id': filename,
                        'content': content
                    })
                    embedding = self.embedding_model.encode([content])[0]
                    self.index.add(np.array([embedding]).astype('float32'))
        print(f"Processed {len(self.documents)} documents")
    
    def find_similar_documents(self, query: str, k: int = 5):
        query_embedding = self.embedding_model.encode([query])[0]
        distances, indices = self.index.search(np.array([query_embedding]).astype('float32'), k)
        return [self.documents[i] for i in indices[0]]


In [18]:
detector = CloneDetector()

# Read the directory containing Python files to analyze
detector.read_directory("./mutated/")

# Define a custom tool for clone detection
def detect_clones(query: str) -> str:
    similar_docs = detector.find_similar_documents(query)
    context = "\n".join([f"Document {doc['id']}:\n{doc['content']}\n" for doc in similar_docs])
    return context

clone_detection_tool = Tool(
    name="CloneDetector",
    func=detect_clones,
    description="Useful for finding similar code snippets and detecting potential clones."
)

# Set up the agent
tools = [clone_detection_tool]

prompt = PromptTemplate.from_template(
    """You are an AI assistant specialized in analyzing code for potential clones and similarities.
    You have access to the following tools:

    {tools}

    Use the following format:

    Question: the input question you must answer
    Thought: you should always think about what to do
    Action: the action to take, should be one of [{tool_names}]
    Action Input: the input to the action
    Observation: the result of the action
    ... (this Thought/Action/Action Input/Observation can repeat N times)
    Thought: I now know the final answer
    Final Answer: the final answer to the original input question

    Begin!

    Question: {input}
    Thought: To answer this question, I need to use the CloneDetector tool to find similar code snippets.
    {agent_scratchpad}"""
)

agent = create_react_agent(local_llm, tools, prompt)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# Example usage: Detect clones for a given code snippet
query = """
def func_b(potential, minister: Tuple[str, float], uncle, a):
    stat = "foo"
    if c > 2:
        stat = "foo" + str(c)
    else:
        pass
    while a > 100:
        a = math.sqrt(a)
    import math
    result = (c + a, stat)
    return result
"""

result = agent_executor.invoke(
    {
        "input": f"Analyze the following code snippet for potential clones or similarities:\n\n{query}\n\nProvide your analysis in a JSON format with the following structure:\n{{\n    \"clone_detected\": boolean,\n    \"similarity_level\": \"high\" | \"medium\" | \"low\" | \"none\",\n    \"similarities\": [\n        {{\n            \"type\": \"exact\" | \"near-exact\" | \"logical\",\n            \"description\": \"string\",\n            \"affected_files\": [\"file1\", \"file2\", ...]\n        }},\n        ...\n    ],\n    \"explanation\": \"string\"\n}}"
    }
)

# Print the result
print(json.dumps(result, indent=2))


Loading embedding model: all-MiniLM-L6-v2


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]



1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Processed 20 documents


[1m> Entering new AgentExecutor chain...[0m


ValueError: An output parsing error occurred. In order to pass this error back to the agent and have it try again, pass `handle_parsing_errors=True` to the AgentExecutor. This is the error: Parsing LLM output produced both a final answer and a parse-able action:: You are an AI assistant specialized in analyzing code for potential clones and similarities.
    You have access to the following tools:

    CloneDetector(query: str) -> str - Useful for finding similar code snippets and detecting potential clones.

    Use the following format:

    Question: the input question you must answer
    Thought: you should always think about what to do
    Action: the action to take, should be one of [CloneDetector]
    Action Input: the input to the action
    Observation: the result of the action
    ... (this Thought/Action/Action Input/Observation can repeat N times)
    Thought: I now know the final answer
    Final Answer: the final answer to the original input question

    Begin!

    Question: Analyze the following code snippet for potential clones or similarities:


def func_b(potential, minister: Tuple[str, float], uncle, a):
    stat = "foo"
    if c > 2:
        stat = "foo" + str(c)
    else:
        pass
    while a > 100:
        a = math.sqrt(a)
    import math
    result = (c + a, stat)
    return result


Provide your analysis in a JSON format with the following structure:
{
    "clone_detected": boolean,
    "similarity_level": "high" | "medium" | "low" | "none",
    "similarities": [
        {
            "type": "exact" | "near-exact" | "logical",
            "description": "string",
            "affected_files": ["file1", "file2", ...]
        },
        ...
    ],
    "explanation": "string"
}
    Thought: To answer this question, I need to use the CloneDetector tool to find similar code snippets.
    

In [24]:
import os
import json
from typing import List, Dict
from sentence_transformers import SentenceTransformer
from faiss import IndexFlatL2
import numpy as np
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

class CloneDetector:
    def __init__(self, embedding_model: str = 'all-MiniLM-L6-v2'):
        print(f"Loading embedding model: {embedding_model}")
        self.embedding_model = SentenceTransformer(embedding_model)
        self.index = IndexFlatL2(384)
        self.documents: List[Dict[str, str]] = []
        
        # Load the language model for analysis
        self.tokenizer = tokenizer
        self.model = model
        
    def read_directory(self, directory: str):
        for filename in os.listdir(directory):
            if filename.endswith('.py'):
                with open(os.path.join(directory, filename), 'r') as file:
                    content = file.read()
                    self.documents.append({
                        'id': filename,
                        'content': content
                    })
                    embedding = self.embedding_model.encode([content])[0]
                    self.index.add(np.array([embedding]).astype('float32'))
        print(f"Processed {len(self.documents)} documents")
    
    def find_similar_documents(self, query: str, k: int = 5):
        query_embedding = self.embedding_model.encode([query])[0]
        distances, indices = self.index.search(np.array([query_embedding]).astype('float32'), k)
        return [self.documents[i] for i in indices[0]]
    
    def analyze_similarities(self, query: str, similar_docs: List[Dict[str, str]]) -> str:
        context = "\n".join([f"Document {doc['id']}:\n{doc['content']}\n" for doc in similar_docs])
        prompt = f"""Analyze the following code snippet for potential clones or similarities:

Query:
{query}

Similar documents:
{context}

Provide your analysis in a JSON format with the following structure:
{{
    "clone_detected": boolean,
    "similarity_level": "high" | "medium" | "low" | "none",
    "similarities": [
        {{
            "type": "exact" | "near-exact" | "logical",
            "description": "string",
            "affected_files": ["file1", "file2", ...]
        }},
        ...
    ],
    "explanation": "string"
}}
"""
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
        with torch.no_grad():
            outputs = self.model.generate(**inputs, max_length=100204, num_return_sequences=1)
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        # Extract JSON from the response
        try:
            start_index = response.index('{')
            end_index = response.rindex('}') + 1
            json_str = response[start_index:end_index]
            return json.loads(json_str)
        except (ValueError, json.JSONDecodeError):
            return {"error": "Failed to generate valid JSON response", "raw_response": response}

    def detect_clones(self, query: str):
        similar_docs = self.find_similar_documents(query)
        return self.analyze_similarities(query, similar_docs)

# Usage example
detector = CloneDetector()
detector.read_directory("./mutated/")

query = """
def func_b(potential, minister: Tuple[str, float], uncle, a):
    stat = "foo"
    if c > 2:
        stat = "foo" + str(c)
    else:
        pass
    while a > 100:
        a = math.sqrt(a)
    import math
    result = (c + a, stat)
    return result
"""

result = detector.detect_clones(query)
print(json.dumps(result, indent=2))

Loading embedding model: all-MiniLM-L6-v2


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Processed 20 documents
{
  "error": "Failed to generate valid JSON response",
  "raw_response": "Analyze the following code snippet for potential clones or similarities:\n\nQuery:\n\ndef func_b(potential, minister: Tuple[str, float], uncle, a):\n    stat = \"foo\"\n    if c > 2:\n        stat = \"foo\" + str(c)\n    else:\n        pass\n    while a > 100:\n        a = math.sqrt(a)\n    import math\n    result = (c + a, stat)\n    return result\n\n\nSimilar documents:\nDocument 0.py:\nimport math\n\n\ndef func_a(administration: Tuple[int, str], king: List[int], taylor: List[str] = [\"y\", \"rF]E:[H|HJ\", \"extent\", \"3\\2d4JKM\", \"MCiUCug4~~G[MSmQ\"]):\n    c = b - (a % 2)\n\n    if c > 2:\n        stat = \"jack\" + str(c)\n\n    while a < 119637:\n        pass\n\n\ndef func_b(potential, minister: Tuple[str, float], uncle, a, ):\n    stat = \"foo\"\n\n    if c > 2:\n        stat = \"foo\" + str(c)\n    else:\n        pass\n\n    while a > 100:\n        a = math.pow(2)\n    import math

In [26]:
inputs = tokenizer("What is 1 + 1?", return_tensors="pt").to(model.device)
with torch.no_grad():
    outputs = model.generate(**inputs, max_length=1004, num_return_sequences=1)
response = tokenizer.decode(outputs[0], skip_specialty_tokens=True)

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


KeyboardInterrupt: 

In [None]:
print(response)

In [3]:
def generate_stream(prompt, max_length=8):
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        for _ in range(max_length):
            try:
                outputs = model(input_ids)
                next_token_logits = outputs.logits[:, -1, :]
                next_token = torch.argmax(next_token_logits, dim=-1).unsqueeze(0)
                
                input_ids = torch.cat([input_ids, next_token], dim=-1)
                
                yield tokenizer.decode(next_token[0])
                
                if next_token.item() == tokenizer.eos_token_id:
                    break
                
            except RuntimeError as e:
                print(f"Error during generation: {e}")
                break

# Example usage
prompt = "System: YOu are system assistant, answer only the question you've beena asked. Human: What is 1 + 1?"
for token in generate_stream(prompt):
    print(token, end='', flush=True)
print()  # New line at the end

We detected that you are passing `past_key_values` as a tuple and this is deprecated and will be removed in v4.43. Please use an appropriate `Cache` class (https://huggingface.co/docs/transformers/v4.41.3/en/internal/generation_utils#transformers.Cache)
You are not running the flash-attention implementation, expect numerical differences.


Assistant:2

Input


In [None]:
# Sentence piece -> embedding
# how to import and apply
# how with x do y

In [3]:
import os
import json
from typing import List, Dict, Tuple
from sentence_transformers import SentenceTransformer
from faiss import IndexFlatL2
import numpy as np
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import math

class CloneDetector:
    def __init__(self, embedding_model: str = 'all-MiniLM-L6-v2'):
        print(f"Loading embedding model: {embedding_model}")
        self.embedding_model = SentenceTransformer(embedding_model)
        self.index = IndexFlatL2(384)
        self.documents: List[Dict[str, str]] = []
        
        # Load the language model for analysis
        self.tokenizer = tokenizer
        self.model = model
        
    def read_directory(self, directory: str):
        for filename in os.listdir(directory):
            if filename.endswith('.py'):
                with open(os.path.join(directory, filename), 'r') as file:
                    content = file.read()
                    self.documents.append({
                        'id': filename,
                        'content': content
                    })
                    embedding = self.embedding_model.encode([content])[0]
                    self.index.add(np.array([embedding]).astype('float32'))
        print(f"Processed {len(self.documents)} documents")
    
    def find_similar_documents(self, query: str, k: int = 5):
        query_embedding = self.embedding_model.encode([query])[0]
        distances, indices = self.index.search(np.array([query_embedding]).astype('float32'), k)
        return [self.documents[i] for i in indices[0]]
    
    def generate_stream(self, prompt, max_length=1000):
        input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.model.device)
        
        with torch.no_grad():
            for i in range(max_length):
                try:
                    outputs = self.model(input_ids)
                    next_token_logits = outputs.logits[:, -1, :]
                    next_token = torch.argmax(next_token_logits, dim=-1).unsqueeze(0)
                    
                    input_ids = torch.cat([input_ids, next_token], dim=-1)
                    
                    token = self.tokenizer.decode(next_token[0])
                    yield token
                    
                    if next_token.item() == self.tokenizer.eos_token_id:
                        print("End of text token generated. Stopping.")
                        break
                    
                except RuntimeError as e:
                    print(f"Error during generation: {e}")
                    break

    def analyze_similarities(self, query: str, similar_docs: List[Dict[str, str]]) -> str:
        context = "\n".join([f"Document {doc['id']}:\n{doc['content']}\n" for doc in similar_docs])
        prompt = f"""
System: Analyze the following code snippet for potential clones or similarities:
Query: {query}
Similar documents: {context}
System: Provide your analysis in a JSON format with the following structure:
{{
    "clones": ["file1", "file2", ...],
    "clone_detected": boolean,
    "similarity_level": "high" | "medium" | "low" | "none",
    "similarities": [
        {{
            "type": "exact" | "near-exact" | "logical",
            "description": "string",
            "affected_files": ["file1", "file2", ...]
        }},
        ...
    ],
    "explanation": "string"
}}

return only json and nothing else. start with brackets and write all message into property, if you want to, return only asked structure
"""
        response = ""
        print(len(prompt))
        for token in self.generate_stream(prompt):
            response += token
            print(token, end='', flush=True)

        # Extract JSON from the response
        try:
            start_index = response.index('{')
            end_index = response.rindex('}') + 1
            json_str = response[start_index:end_index]
            return json.loads(json_str)
        except (ValueError, json.JSONDecodeError):
            return {"error": "Failed to generate valid JSON response", "raw_response": response}

    def detect_clones(self, query: str):
        similar_docs = self.find_similar_documents(query)
        return self.analyze_similarities(query, similar_docs)

# Usage example
detector = CloneDetector()
detector.read_directory("./mutated/")
query = """
def func_b(potential, minister: Tuple[str, float], uncle, a):
    stat = "foo"
    if c > 2:
        stat = "foo" + str(c)
    else:
        pass
    while a > 100:
        a = math.sqrt(a)
    import math
    result = (c + a, stat)
    return result
"""
result = detector.detect_clones(query)
print(json.dumps(result, indent=2))

Loading embedding model: all-MiniLM-L6-v2




Processed 20 documents
3636


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Error during generation: backend='inductor' raised:
RuntimeError: Failed to find C compiler. Please specify via CC environment variable.

Set TORCH_LOGS="+dynamo" and TORCHDYNAMO_VERBOSE=1 for more information


You can suppress this exception and fall back to eager by setting:
    import torch._dynamo
    torch._dynamo.config.suppress_errors = True

{
  "error": "Failed to generate valid JSON response",
  "raw_response": ""
}
