In [None]:
#@title install srcml
!wget https://github.com/srcML/srcMLReleases/raw/main/srcml_1.0.0-1_ubuntu20.04.deb
!sudo apt install ./srcml_1.0.0-1_ubuntu20.04.deb

### Install V-SZZ

In [None]:
!git clone https://github.com/baolingfeng/V-SZZ.git

In [None]:
%cd /content/V-SZZ/ICSE2022ReplicationPackage/

In [None]:
!pip install -r requirements.txt

### Files overwrite/fix

In [None]:
%%writefile identify_duplicated_patch.py
import os
import sys
import json
import subprocess
import re
import hashlib
import git # get diff patch on Windows 
from unidiff import PatchSet
from io import StringIO
from log_generation import GitLog

from setting import *

from git_analysis.analyze_git_logs import retrieve_git_logs, retrieve_git_logs_dict, get_ancestors, get_parent_tags, get_son_tags
from data_loader import JAVA_CVE_FIX_COMMITS, C_CVE_FIX_COMMITS, read_cve_commits, REPOS_DIR, JAVA_PROJECTS, C_PROJECTS, ANNOTATED_CVES

repos_dir = REPOS_DIR
log_dir = LOG_DIR

def clear_patched_file(patched_file):
    results = []
    for line in patched_file.split('\n'):
        if line.startswith('index '):
            continue
        
        # ignore the line with line information since some cherry picked patches have different line number
        if line.startswith('@@'):
            continue
        
        results.append(line)
    
    return '\n'.join(results)

def is_target_file(file_path):
    splitted_path_tokens = file_path.lower().split('/')

    file_name = splitted_path_tokens[-1]
    idx = file_name.find('.')
    if idx <= 0:
        return False
    
    suffix = file_name[idx+1:]
    if suffix not in ['java', 'c', 'cpp', 'h', 'hpp']:
        return False
    
    if 'test' in splitted_path_tokens:
        return False
    
    if file_name.startswith('test') or file_name.endswith('test'):
        return False
    
    return True
    
def genereate_hashes_for_patch(repository, commit_id):
    try:
        uni_diff_text = repository.git.diff(commit_id+ '~1', commit_id,
                                        ignore_blank_lines=True, 
                                        ignore_space_at_eol=True)
        
        patch_set = PatchSet(StringIO(uni_diff_text))
    except Exception as e:
        print(e)
        return None
    
    hashes = []
    for patched_file in patch_set:
        file_path = patched_file.path
        if not is_target_file(file_path):
            continue

        content = clear_patched_file(str(patched_file))
        # print(content)
        h = hashlib.sha1(content.encode('utf-8', 'ignore')).hexdigest()

        hashes.append(h)
    
    return hashes

def identify_duplicate_patch(project):
    git_logs = retrieve_git_logs(os.path.join(log_dir, project+"-meta.log"), project)
    
    project_path = os.path.join(repos_dir, project)
    repository = git.Repo(project_path)

    commit_patch_map = {}
    for gl in git_logs:
        print(gl.commit_id)
        hashes = genereate_hashes_for_patch(repository, gl.commit_id)
        if hashes is not None:
            commit_patch_map[gl.commit_id] = hashes
    
    patch_commit_map = {}
    for commit_id in commit_patch_map:
        for h in commit_patch_map[commit_id]:
            if h in patch_commit_map:
                patch_commit_map[h].append(commit_id)
            else:
                patch_commit_map[h] = [commit_id]
    
    return commit_patch_map, patch_commit_map

def batch_duplicate_detection(projects):
    # for project in C_PROJECTS:
    for project in projects:
        try:
            commit_patch_map, patch_commit_map = identify_duplicate_patch(project)
            with open(f'data_commit_patch_map/{project}-commit-patch.json', 'w') as fout1, \
                open(f'data_commit_patch_map/{project}-patch-commit.json', 'w') as fout2:
                json.dump(commit_patch_map, fout1, indent=4)
                json.dump(patch_commit_map, fout2, indent=4)
        except Exception as e:
            print(project, e)
        else:
            pass
        
        break
   
if __name__ == '__main__':
    # Generate hashes for hunks in commits
    batch_duplicate_detection()
   


In [None]:
%%writefile setting.py
import sys
import os

# config your working folder and the correponding folder
WORK_DIR = '/content/V-SZZ/ICSE2022ReplicationPackage/'

REPOS_DIR = '/data1/baolingfeng/repos'

DATA_FOLDER = os.path.join(WORK_DIR, 'data')

SZZ_FOLDER = os.path.join(WORK_DIR, 'icse2021-szz-replication-package')

DEFAULT_MAX_CHANGE_SIZE = sys.maxsize

AST_MAP_PATH = os.path.join(WORK_DIR, 'ASTMapEval_jar')

LOG_DIR = os.path.join(WORK_DIR, 'GitLogs')

In [None]:
%%writefile main.py
import os
import sys
import json
import logging as log
from numba import jit, cuda
from setting import *

sys.path.append(os.path.join(SZZ_FOLDER, 'tools/pyszz/'))

from szz.ag_szz import AGSZZ
from szz.b_szz import BaseSZZ
from szz.l_szz import LSZZ
from szz.ma_szz import MASZZ, DetectLineMoved
from szz.r_szz import RSZZ
from szz.ra_szz import RASZZ
from szz.pd_szz import PyDrillerSZZ
from szz.my_szz import MySZZ
from tempfile import mkdtemp
from data_loader import JAVA_CVE_FIX_COMMITS, C_CVE_FIX_COMMITS, JAVA_PROJECTS, C_PROJECTS, read_cve_commits
from multiprocessing import Pool
import multiprocessing
from functools import partial
from collections import ChainMap
import numpy as np
def find_vul(commits, project, repo_url, REPOS_DIR, use_temp_dir, AST_MAP_PATH, EXT_TO_PARSE):
          my_szz = MySZZ(repo_full_name=project, repo_url=repo_url, repos_dir=REPOS_DIR, use_temp_dir=use_temp_dir, ast_map_path=mkdtemp(dir=AST_MAP_PATH))
          max_progress = len(commits)
          progress=0 
          output = {}
          pid = os.getpid()
          for commit in commits:
            try:
                print('Fixing Commit:', commit)
                imp_files = my_szz.get_impacted_files(fix_commit_hash=commit, file_ext_to_parse=EXT_TO_PARSE, only_deleted_lines=True)
                bug_introducing_commits = my_szz.find_bic(fix_commit_hash=commit,
                                        impacted_files=imp_files,
                                        ignore_revs_file_path=None)
                output[commit] = bug_introducing_commits
            except Exception as e:
                print(e)
            progress+=1
            print('\033[1m\033[92m[%s] progress %s%%\033[0m' % (pid, 100*progress/max_progress))
          return output
          

  

def run_szz(project, commits, method, repo_url=None, max_change_size=DEFAULT_MAX_CHANGE_SIZE, languages=[]):
    output_file = "results/{method}-{project}.json".format(method=method, project=project)

    if os.path.exists(output_file):
        return
    use_temp_dir = False

    EXT_TO_PARSE = list(set(['c', 'java', 'cpp', 'h', 'hpp', 'js', 'ts'] + languages))

    output = {}

    global max_progress
    max_progress=len(commits)

    if method == "b":
        b_szz = BaseSZZ(repo_full_name=project, repo_url=repo_url, repos_dir=REPOS_DIR)
        for commit in commits:
            print('Fixing Commit:', commit)
            imp_files = b_szz.get_impacted_files(fix_commit_hash=commit, file_ext_to_parse=EXT_TO_PARSE, only_deleted_lines=True)
            bug_introducing_commits = b_szz.find_bic(fix_commit_hash=commit,
                                      impacted_files=imp_files,
                                      ignore_revs_file_path=None)
            output[commit] = [commit.hexsha for commit in bug_introducing_commits]
    elif method == "ag":
        ag_szz = AGSZZ(repo_full_name=project, repo_url=repo_url, repos_dir=REPOS_DIR, use_temp_dir=use_temp_dir)
        for commit in commits:
            print('Fixing Commit:', commit)
            imp_files = ag_szz.get_impacted_files(fix_commit_hash=commit, file_ext_to_parse=EXT_TO_PARSE, only_deleted_lines=True)
            bug_introducing_commits = ag_szz.find_bic(fix_commit_hash=commit,
                                      impacted_files=imp_files,
                                      ignore_revs_file_path=None,
                                      max_change_size=max_change_size)
            output[commit] = [commit.hexsha for commit in bug_introducing_commits]
    elif method == "ma":
        ma_szz = MASZZ(repo_full_name=project, repo_url=repo_url, repos_dir=REPOS_DIR, use_temp_dir=use_temp_dir)
        for commit in commits:
            print('Fixing Commit:', commit)
            imp_files = ma_szz.get_impacted_files(fix_commit_hash=commit, file_ext_to_parse=EXT_TO_PARSE, only_deleted_lines=True)
            bug_introducing_commits = ma_szz.find_bic(fix_commit_hash=commit,
                                      impacted_files=imp_files,
                                      ignore_revs_file_path=None,
                                      max_change_size=max_change_size)

            output[commit] = [commit.hexsha for commit in bug_introducing_commits]
    elif method == "my":
        
        MySZZ(repo_full_name=project, repo_url=repo_url, repos_dir=REPOS_DIR, use_temp_dir=use_temp_dir, ast_map_path=AST_MAP_PATH) #to already clone repo
        cpu_count = multiprocessing.cpu_count()+1
        print(cpu_count)
        pp = Pool(cpu_count)
        
        output = dict(ChainMap(*pp.map(partial(find_vul, project=project, repo_url=repo_url, REPOS_DIR=REPOS_DIR, use_temp_dir=use_temp_dir, AST_MAP_PATH=AST_MAP_PATH, EXT_TO_PARSE=EXT_TO_PARSE), np.array_split(commits, multiprocessing.cpu_count()*2))))
    elif method == "ra":
        ra_szz = RASZZ(repo_full_name=project, repo_url=repo_url, repos_dir=REPOS_DIR, use_temp_dir=use_temp_dir)
        for commit in commits:
            print('Fixing Commit:', commit)
            imp_files = ra_szz.get_impacted_files(fix_commit_hash=commit, file_ext_to_parse=EXT_TO_PARSE, only_deleted_lines=True)
            bug_introducing_commits = ra_szz.find_bic(fix_commit_hash=commit,
                                      impacted_files=imp_files,
                                      ignore_revs_file_path=None,
                                      max_change_size=max_change_size)
            output[commit] = [commit.hexsha for commit in bug_introducing_commits]

    with open(output_file, 'w') as fout:
        json.dump(output, fout, indent=4)

def load_annotated_commits(target_projects=None):
    with open(os.path.join(DATA_FOLDER, 'inputs.json')) as fin:
        annotation = json.load(fin)

        project_commits = []
        for project in annotation:
            project_name, project_url, fixing_commits, languages = project
            if target_projects is not None and project_name not in target_projects:
                continue

            project_commits.append(project)

        return project_commits
import sys
if __name__ == "__main__":
    use_temp_dir = False

    # fixing_commits = JAVA_CVE_FIX_COMMITS
    # fixing_commits = C_CVE_FIX_COMMITS

    project_commits = load_annotated_commits()      
    project_name, project_url, fixing_commits, languages = project_commits[int(sys.argv[2])]
    print("Project:", project_name, sys.argv[2])
    try:
      #assuming all are github repo
      run_szz(project_name, fixing_commits, sys.argv[1], repo_url='https://github.com/%s' % project_url, languages=languages)
    except Exception as e:
      print(e)
      print('can\'t run szz on %s' % project_name)


In [None]:
%%writefile icse2021-szz-replication-package/tools/pyszz/szz/my_szz.py
import os
import sys
import logging as log
import traceback
from typing import List, Set
import subprocess
import json

from git import Commit

from szz.core.abstract_szz import AbstractSZZ, ImpactedFile

from pydriller import ModificationType, GitRepository as PyDrillerGitRepo
import Levenshtein


def remove_whitespace(line_str):
    return ''.join(line_str.strip().split())

def compute_line_ratio(line_str1, line_str2):
    l1 = remove_whitespace(line_str1)
    l2 = remove_whitespace(line_str2)
    return Levenshtein.ratio(l1, l2)

MAXSIZE = sys.maxsize

class MySZZ(AbstractSZZ):
    """
    My SZZ implementation.

    Supported **kwargs:

    * ignore_revs_file_path

    """

    def __init__(self, repo_full_name: str, repo_url: str, repos_dir: str = None, use_temp_dir: bool = True, ast_map_path = None):
        super().__init__(repo_full_name, repo_url, repos_dir)
        self.ast_map_path = ast_map_path
        self.repo_full_name = repo_full_name

    def find_bic(self, fix_commit_hash: str, impacted_files: List['ImpactedFile'], **kwargs) -> Set[Commit]:
        """
        Find bug introducing commits candidates.

        :param str fix_commit_hash: hash of fix commit to scan for buggy commits
        :param List[ImpactedFile] impacted_files: list of impacted files in fix commit
        :key ignore_revs_file_path (str): specify ignore revs file for git blame to ignore specific commits.
        :returns Set[Commit] a set of bug introducing commits candidates, represented by Commit object
        """

        log.info(f"find_bic() kwargs: {kwargs}")

        ignore_revs_file_path = kwargs.get('ignore_revs_file_path', None)
        # self._set_working_tree_to_commit(fix_commit_hash)

        bug_introd_commits = []
        for imp_file in impacted_files:
            # print('impacted file', imp_file.file_path)
            try:
                blame_data = self._blame(
                    # rev='HEAD^',
                    rev='{commit_id}^'.format(commit_id=fix_commit_hash),
                    file_path=imp_file.file_path,
                    modified_lines=imp_file.modified_lines,
                    ignore_revs_file_path=ignore_revs_file_path,
                    ignore_whitespaces=False,
                    skip_comments=True
                )

                for entry in blame_data:
                    print(entry.commit, entry.line_num, entry.line_str)
                    previous_commits = []
                    
                    blame_result = entry
                    while True:
                        if imp_file.file_path.endswith(".java"):
                            mapped_line_num, change_type = self.map_modified_line_java(blame_result, imp_file.file_path)
                            previous_commits.append((blame_result.commit.hexsha, blame_result.line_num, blame_result.line_str, change_type))
                        else:
                            mapped_line_num = self.map_modified_line(blame_result, imp_file.file_path)
                            previous_commits.append((blame_result.commit.hexsha, blame_result.line_num, blame_result.line_str))
                        
                        if mapped_line_num == -1:
                            break
                        
                        
                        blame_data2 = self._blame(
                                        rev='{commit_id}^'.format(commit_id=blame_result.commit.hexsha),
                                        file_path=imp_file.file_path,
                                        modified_lines=[mapped_line_num],
                                        ignore_revs_file_path=ignore_revs_file_path,
                                        ignore_whitespaces=False,
                                        skip_comments=True
                                    )
                        blame_result = list(blame_data2)[0]
                        # print(blame_result.commit.hexsha, blame_result.line_num)
                        # print(mapped_line_num, blame_result.commit.hexsha, blame_result.line_num, blame_result.line_str)
                        # previous_commits.append((blame_result.commit, blame_result.line_num, blame_result.line_str))

                    # bug_introd_commits[entry.line_num] = {'line_str': entry.line_str, 'file_path': entry.file_path, 'previous_commits': previous_commits}
                    bug_introd_commits.append({'line_num':entry.line_num, 'line_str': entry.line_str, 'file_path': entry.file_path, 'previous_commits': previous_commits})
                    # bug_introd_commits.append(previous_commits)
            except:
                print(traceback.format_exc())

        return bug_introd_commits

    def map_modified_line_java(self, blame_entry, blame_file_path):
        mapping_cmd = "java -jar ASTMapEval.jar -p {project} -c {commit_id} -o {output} -f {file_path}"
        ast_map_temp = os.path.join(self.ast_map_path, 'temp')

        commit_id = blame_entry.commit.hexsha
        file_path = blame_file_path.replace('\\', '/')
        
        line_num = blame_entry.line_num

        mapping_db = None
        mapping_db_file = os.path.join(ast_map_temp, "{project}.json".format(project=self.repo_full_name))
        if os.path.exists(mapping_db_file):
            mapping_db = json.load(open(mapping_db_file))
            if commit_id not in mapping_db:
                subprocess.check_output(mapping_cmd.format(project=self.repo_full_name, commit_id=commit_id, output=os.path.join(ast_map_temp, "tmp.json"), file_path=file_path), cwd=self.ast_map_path, shell=True).decode('utf-8', errors='ignore')

                mapping_results = json.load(open(os.path.join(ast_map_temp, "tmp.json")))
                mapping_db[commit_id] = {}
                mapping_db[commit_id][file_path] = mapping_results
            elif file_path not in mapping_db[commit_id]:
                subprocess.check_output(mapping_cmd.format(project=self.repo_full_name, commit_id=commit_id, output=os.path.join(ast_map_temp, "tmp.json"), file_path=file_path), cwd=self.ast_map_path, shell=True).decode('utf-8', errors='ignore')

                mapping_results = json.load(open(os.path.join(ast_map_temp, "tmp.json")))
                mapping_db[commit_id][file_path] = mapping_results
            else:
                mapping_results = mapping_db[commit_id][file_path]
        else:
            subprocess.check_output(mapping_cmd.format(project=self.repo_full_name, commit_id=commit_id, output=os.path.join(ast_map_temp, "tmp.json"), file_path=file_path), cwd=self.ast_map_path, shell=True).decode('utf-8', errors='ignore')

            mapping_results = json.load(open(os.path.join(ast_map_temp, "tmp.json")))
            mapping_db = {}
            mapping_db[commit_id] = {}
            mapping_db[commit_id][file_path] = mapping_results

        with open(mapping_db_file, 'w') as fout:
            json.dump(mapping_db, fout, indent=4)

        target_file = None
        target_stmt = None
        for result in mapping_results:
            if file_path == result['src']:
                target_file = result['dst']
                      
                for stmt in result['stmt']:
                    if 'dstStmtStartLine' in stmt and stmt['dstStmtStartLine'] == int(line_num):
                        target_stmt = stmt
                        break
                
                if target_stmt is not None:
                    break
        
        if target_stmt is None:
            # "New File"
            return -1, "New File"
        
        # results.append((buggy_commit, buggy_file, buggy_line, target_stmt['stmtChangeType']))
        if target_stmt['stmtChangeType'] == "Insert":
            return -1, target_stmt['stmtChangeType']
        
        return target_stmt['srcStmtStartLine'], target_stmt['stmtChangeType']
       

    def map_modified_line(self, blame_entry, blame_file_path):
        #TODO: rename type 
        blame_commit = PyDrillerGitRepo(self.repository_path).get_commit(blame_entry.commit.hexsha)
        # print('get blame commit', blame_commit, blame_entry.commit.hexsha)

        for mod in blame_commit.modifications:
            file_path = mod.new_path
            if mod.change_type == ModificationType.DELETE or mod.change_type == ModificationType.RENAME:
                file_path = mod.old_path

            if file_path != blame_file_path:
                continue

            if not mod.old_path:
                # "newly added"
                return -1

            lines_added = [added for added in mod.diff_parsed['added']]
            lines_deleted = [deleted for deleted in mod.diff_parsed['deleted']]

            if len(lines_deleted) == 0:
                return -1
            
            print('line added/deleted', len(lines_added), len(lines_deleted))

            if blame_entry.line_str:
                sorted_lines_deleted = [(line[0], line[1], 
                                            compute_line_ratio(blame_entry.line_str, line[1]), 
                                            abs(blame_entry.line_num - line[0])) 
                                        for line in lines_deleted]
                sorted_lines_deleted = sorted(sorted_lines_deleted, key=lambda x : (x[2], MAXSIZE-x[3]), reverse=True)
                # print(sorted_lines_deleted)
                
                # print(sorted_lines_deleted)
                if sorted_lines_deleted[0][2] > 0.75:
                    return sorted_lines_deleted[0][0]
                                             
        return -1        
                
                    

### Convert Verified vulnerability inducing dataset (small dataset)

In [None]:
import json
repomap = {'FFmpeg':'FFmpeg/FFmpeg', 'ImageMagick':'ImageMagick/ImageMagick', 'linux-kernel':'torvalds/linux', 
           'OpenSSL': 'openssl/openssl', 'php-src': 'php/php-src'}
with open('/content/V-SZZ/ICSE2022ReplicationPackage/data/verified_cve_with_versions_C.json', 'r') as f:
  data = json.load(f)

  outputs = {}
  for cve in data:
    name = cve['project']
    if not name in outputs:
      outputs[name] = [name, repomap[name], [], None]
    outputs[name][2] += [fix['fixing_commit'] for fix in cve['fixing_details']]
  
  with open('./data/inputs.json', 'w') as out:
    json.dump(list(outputs.values()), out, indent=4)


### Setup

In [None]:
!mkdir /content/V-SZZ/ICSE2022ReplicationPackage/ASTMapEval_jar/temp

### Run SZZ

In [None]:
%cd /content/V-SZZ/ICSE2022ReplicationPackage/icse2021-szz-replication-package

In [None]:
!python ../main.py my 0