<a href="https://colab.research.google.com/github/yacine715/GITHUB_TOKEN/blob/main/dataset_script.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import token

import requests
from datetime import datetime
import csv
import os
import time
import logging
import shutil
from datetime import timezone

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Use environment variables for sensitive information
github_token = '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'

output_csv = 'builds-final-10.csv'


import time
import math

def get_request(url, token):
    headers = {'Authorization': f'token {token}'}
    attempt = 0
    while attempt < 5:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            return response.json()
        elif response.status_code == 403 and 'X-RateLimit-Reset' in response.headers:
            reset_time = datetime.fromtimestamp(int(response.headers['X-RateLimit-Reset']), timezone.utc)
            sleep_time = (reset_time - datetime.now(timezone.utc)).total_seconds() + 10
            logging.error(f"Rate limit exceeded, sleeping for {sleep_time} seconds. URL: {url}")
            time.sleep(sleep_time)
        else:
            logging.error(f"Failed to fetch data, status code: {response.status_code}, URL: {url}, Response: {response.text}")
            time.sleep(math.pow(2, attempt) * 10)  # Exponential backoff
        attempt += 1
    return None




def chercher_projets_avec_build_yml(token, query='filename:build.yml path:.github/workflows', per_page=100):
    """
    Recherche des projets GitHub utilisant un fichier build.yml.
    """
    url = "https://api.github.com/search/code"
    headers = {'Authorization': f'token {token}'}
    params = {'q': query, 'per_page': per_page}
    projets = []
    page = 1

    while True:
        params['page'] = page
        response = requests.get(url, headers=headers, params=params)
        if response.status_code == 403:
            print("Limite de taux dépassée. En attente pour réessayer...")
            time.sleep(200)  # Attendre 60 secondes avant de réessayer
            continue
        elif response.status_code != 200:
            print(f"Erreur lors de la récupération des projets : {response.status_code}")
            break
        data = response.json()
        projets.extend([item['repository']['full_name'] for item in data['items']])
        if 'next' not in response.links:
            break
        page += 1

    return projets


def fetch_file_contents(repo_full_name, path, sha, token):
    """Fetch the contents of a file from GitHub for a specific commit SHA."""
    url = f"https://api.github.com/repos/{repo_full_name}/git/trees/{sha}?recursive=1"

    headers = {'Authorization': f'token {token}', 'Accept': 'application/vnd.github.v3+json'}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        tree = response.json().get('tree', [])
        file_paths = [item['path'] for item in tree if item['type'] == 'blob']

        total_lines = 0
        for file_path in file_paths:
            content_url = f"https://api.github.com/repos/repo_full_name/contents/{file_path}?ref={sha}"
            content_response = requests.get(content_url, headers=headers)
            if content_response.status_code == 200:
                file_content = content_response.json().get('content', '')
                file_lines = len(file_content.encode('utf-8').splitlines())
                total_lines += file_lines

        return total_lines
    else:
        logging.error(f"Failed to fetch file contents, status code: {response.status_code}")
        return 0


def get_repository_languages(repo_full_name, token):
    """Fetch the programming languages used in a given repository."""
    api_url = f'https://api.github.com/repos/{repo_full_name}/languages'
    languages_data = get_request(api_url, token)  # This now correctly passes both arguments
    if languages_data:
        return ', '.join(languages_data.keys())
    return "Failed to fetch languages"

def calculate_sloc_via_github_api(repo_full_name, commit_sha, token):
    url = f"https://api.github.com/repos/{repo_full_name}/git/trees/{commit_sha}?recursive=true"
    headers = {'Authorization': f'token {token}'}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        tree_data = response.json()
        sloc = sum(1 for item in tree_data.get('tree', []) if item['type'] == 'blob')
        return sloc
    else:
        logging.error(f"Failed to fetch tree data, status code: {response.status_code}")
        return 0


def remove_readonly_dir(target_dir):
    # Check if the target directory exists
    if not os.path.exists(target_dir):
        print(f"The directory {target_dir} does not exist.")
        return

    # Walk through the directory
    for root, dirs, files in os.walk(target_dir, topdown=False):
        for name in files:
            filepath = os.path.join(root, name)
            # Change the file permission to writable
            os.chmod(filepath, 0o666)
        for name in dirs:
            dirpath = os.path.join(root, name)
            # Change the directory permission to writable
            os.chmod(dirpath, 0o666)

    # Once all permissions are changed, remove the directory
    shutil.rmtree(target_dir)
    print(f"Successfully removed {target_dir}")

def get_pr_details(pr_url, token):
    """Fetch the pull request's title, description, and comments count."""
    pr_data = get_request(pr_url, token)
    if pr_data:
        title_words = len(pr_data.get('title', '').split())
        body_words = len(pr_data.get('body', '') or ''.split())
        comments_count = pr_data.get('comments', 0)
        description_complexity = title_words + body_words
        return description_complexity, comments_count
    return 0, 0

def get_commit_data(commit_sha, repo_full_name, token):
    """Fetch commit data for a given commit SHA, focusing on production and test code churn."""
    url = f"https://api.github.com/repos/{repo_full_name}/commits/{commit_sha}"
    headers = {'Authorization': f'token {token}'}
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        logging.error(f"Failed to fetch data for commit {commit_sha}, status code: {response.status_code}, URL: {url}, Response: {response.text if response.status_code != 422 else response.json()}")
        return None

    commit_data = response.json()
    if not commit_data:
        return None

    files = commit_data.get('files', [])
    file_types = set([os.path.splitext(file.get('filename', ''))[1] for file in files])
    src_churn, test_churn = calculate_churn(files)

    added_files_count = sum(1 for file in files if file.get('status') == 'added_lines')
    deleted_files_count = sum(1 for file in files if file.get('status') == 'deleted_lines')
    modified_files_count = sum(1 for file in files if file.get('status') == 'modified')

    return {
        'added_lines': added_files_count,
        'deleted_lines': deleted_files_count,
        'modified_files': modified_files_count,
        'tests_added': sum(file.get('additions', 0) for file in files if is_test_file(file.get('filename', ''))),
        'tests_deleted': sum(file.get('deletions', 0) for file in files if is_test_file(file.get('filename', ''))),
        'total_files': len(files),
        'additions_files_commit': commit_data.get('stats', {}).get('additions', 0),
        'deletions_files_commit': commit_data.get('stats', {}).get('deletions', 0),
        'file_types': ', '.join(file_types),
        'src_churn': src_churn,
        'test_churn': test_churn,
    }

def calculate_churn(files):
    """Calculate source and test code churn from file data."""
    src_churn = test_churn = 0
    for file in files:
        added_lines = file.get('additions', 0)
        deleted_lines = file.get('deletions', 0)
        if is_test_file(file.get('filename', '')):
            test_churn += added_lines + deleted_lines
        else:
            src_churn += added_lines + deleted_lines
    return src_churn, test_churn

def is_test_file(file_path):
    """Determine if the given file path corresponds to a testing file."""
    return 'test' in file_path.lower() or file_path.startswith('tests/')

def is_specific_file_type(file_path, file_type='production'):
    """
    Determine if a file path corresponds to a specific file type based on predefined criteria.

    Parameters:
    - file_path: The path of the file to evaluate.
    - file_type: The type of file to check for ('production' or 'documentation').

    Returns True if the file is considered of the specified type; False otherwise.
    """
    if file_type == 'production':
        extensions = ['.py', '.js', '.java', '.cpp', '.cs']
        excluded_paths = ['tests', 'test']
    elif file_type == 'documentation':
        extensions = ['.md', '.rst', '.txt', '.docx']
        excluded_paths = []
    else:
        return False

    file_extension = os.path.splitext(file_path)[1]
    return file_extension in extensions and not any(excluded_path in file_path for excluded_path in excluded_paths)
def get_pr_details(pr_url, token):
    """Fetch the pull request's title, description, and comments count."""
    pr_data = get_request(pr_url, token)
    if pr_data:
        title_words = len(pr_data.get('title', '').split())
        body_words = len(pr_data.get('body', '') or ''.split())
        comments_count = pr_data.get('comments', 0)
        description_complexity = title_words + body_words
        return description_complexity, comments_count
    return 0, 0

def get_builds_info(repo_full_name, token, output_csv):
    repo_full_name = f"{repo_full_name}"
    builds_info = []
    languages = get_repository_languages(repo_full_name, token)

    initial_url = f"https://api.github.com/repos/{repo_full_name}/actions/runs?page=1&per_page=1"
    initial_response_data = get_request(initial_url, token)

    if initial_response_data is None:
        print(f"Error: Failed to fetch initial data for {repo_full_name}.")
        return
    total_builds = initial_response_data.get('total_count', 0)

    # Continue only if there are 100 builds or more
    if total_builds < 50:
        print(f"Repository {repo_full_name} has less than 100 builds. Skipping...")
        return
    if total_builds > 5000:
        print(f"Repository {repo_full_name} has less than 5000 builds. Skipping...")
        return
    page = 1
    while True:
        api_url = f"https://api.github.com/repos/{repo_full_name}/actions/runs?page={page}&per_page=max"

        response_data = get_request(api_url, token)
        if response_data is None:
            print("Error: Failed to fetch data from GitHub API.")
            break
        if page == 1:
            total_builds = response_data.get('total_count', 0)  # Get the total number of builds only once

        runs = response_data.get('workflow_runs', [])
        if not runs:
            break

        for run in runs:
            commit_sha = run['head_sha']
            sloc = calculate_sloc_via_github_api(repo_full_name, commit_sha, token)
            start_time = datetime.strptime(run['created_at'], '%Y-%m-%dT%H:%M:%SZ')
            end_time = datetime.strptime(run['updated_at'], '%Y-%m-%dT%H:%M:%SZ')
            duration = (end_time - start_time).total_seconds()
            branch = run['head_branch']
            commit_data = get_commit_data(commit_sha, repo_full_name, token)
            tests_ran = True
            tests_failed = run['conclusion'] != 'success'
            gh_is_pr = len(run.get('pull_requests', [])) > 0
            gh_description_complexity = 0
            gh_num_pr_comments = 0  # Initialize this variable before it's possibly set

            if gh_is_pr:
                pr_url = run['pull_requests'][0]['url']
                gh_description_complexity, gh_num_pr_comments = get_pr_details(pr_url, token)
            if commit_data is None:  # Properly check if commit_data is None
                print(f"Error retrieving commit data for SHA {commit_sha}. Skipping this commit.")
                continue
            build_info = {
                'repo': repo_full_name,
                'id_build': run['id'],
                'branch': branch,
                'status': run['status'],
                'etat': run['conclusion'],
                'created_at': run['created_at'],
                'updated_at': run['updated_at'],
                'build_duration': duration,
                'sloc': sloc,
                'added_lines': commit_data['added_lines'],
                'deleted_lines': commit_data['deleted_lines'],
                'modified_files': commit_data['modified_files'],
                'tests_added': commit_data['tests_added'],
                'tests_deleted': commit_data['tests_deleted'],
                'total_files': commit_data['total_files'],
                'tests_ran': tests_ran,
                'tests_failed': tests_failed,
                'additions_files_commit': commit_data['additions_files_commit'],
                'deletions_files_commit': commit_data['deletions_files_commit'],
                'src_churn': commit_data['src_churn'],
                'test_churn': commit_data['test_churn'],
                'file_types': commit_data['file_types'],
                'languages': languages,
                'gh_description_complexity': gh_description_complexity,
                'gh_num_pr_comments': gh_num_pr_comments,
                'gh_is_pr': gh_is_pr,
                'total_builds': total_builds,
            }
            builds_info.append(build_info)

        page += 1
        if not runs:
            break

    with open(output_csv, mode='a', newline='', encoding='utf-8') as file:
        fieldnames = ['repo', 'id_build', 'branch', 'status', 'etat', 'created_at', 'updated_at', 'build_duration', 'total_builds',
                      'added_lines', 'deleted_lines', 'additions_files_commit', 'deletions_files_commit', 'modified_files', 'tests_added', 'tests_deleted',
                      'total_files', 'tests_ran', 'tests_failed', 'src_churn', 'test_churn', 'file_types', 'languages', 'sloc', 'gh_description_complexity',
                      'gh_num_pr_comments', 'gh_is_pr']

        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        for build in builds_info:
            writer.writerow(build)

    print(f"Build information saved to {output_csv}")




def main():
    projets = chercher_projets_avec_build_yml(github_token)  # Dynamically fetch repositories
    for projet in projets:
        logging.info(f"Processing repository: {projet}")
        get_builds_info(projet, github_token, output_csv)


if __name__ == "__main__":
    main()
