In [9]:
import os
import json
import asyncio
import aiohttp
import random
import pandas as pd
import base64
from datetime import datetime, timedelta
from dotenv import load_dotenv
from azure.identity import AzureCliCredential

credential = AzureCliCredential()

token = credential.get_token("https://app.vssps.visualstudio.com/.default")

In [10]:
load_dotenv()

ADO_ORG_URL = f'https://dev.azure.com/{os.getenv("ADO_ORG")}'
ADO_PROJECT = os.getenv("ADO_PROJECT")

DEBUG = False

START_DATE = datetime(2024, 1, 1)  # inclusive
END_DATE = datetime(2025, 1, 1)  # exclusive
FILTER_USER = False  # set to False to get commit data for all users
USER = "example@example.com"

BASE_URL = f"https://microsoft.visualstudio.com/{ADO_PROJECT}/_apis/git/repositories"

HEADERS = {
    "Host": "microsoft.visualstudio.com",
    "User-Agent": "python/3.11.8 (Windows-10-10.0.22635-SP0) msrest/0.7.1 azure-devops/5.1.0b4 devOpsCli/1.0.1",
    "Accept-Encoding": "gzip, deflate",
    "Accept": "application/json;api-version=5.0",
    "Connection": "keep-alive",
    "Content-Type": "application/json; charset=utf-8",
    "Authorization": f"Bearer {token.token}",
}


async def fetch_repositories(session: aiohttp.ClientSession) -> dict:
    """
    Fetch all repositories and return a dict mapping repo_id to repo_name.
    """
    async with session.get(BASE_URL, headers=HEADERS) as response:
        if response.status == 200:
            data = await response.json()
            repos = data.get("value", [])
            return {
                repo["id"]: repo["name"]
                for repo in repos
                if repo["isDisabled"] == False
            }
        else:
            print(f"Failed to fetch repositories. Status code: {response.status}")
            return {}


def summarize_commit(commit_json: dict) -> dict:
    """
    Given a single commit JSON structure from Azure DevOps,
    produce a summary of key information.
    """
    commit_id = commit_json.get("commitId", "")
    author_info = commit_json.get("author", {})
    author_name = author_info.get("name", "")
    author_email = author_info.get("email", "")
    commit_date = author_info.get("date", "")
    comment = commit_json.get("comment", "")

    total_changed_files = []
    changes = commit_json.get("changes", []).get("changes", [])
    for change in changes:
        if change.get("item", {}).get("gitObjectType") == "blob":
            filename = change.get("item", {}).get("path", "").split("/")[-1]
            total_changed_files.append(filename)
    summary = {
        "commitId": commit_id,
        "authorName": author_name,
        "authorEmail": author_email,
        "date": commit_date,
        "comment": comment,
        "files": total_changed_files,
        "changedFilesCount": len(total_changed_files),
    }
    return summary


async def fetch_with_retries(
    session, url, headers, retries=3, delay=2, backoff=2, timeout=10000
):
    """
    Perform an HTTP GET request with retries in case of transient failures.
    """
    for attempt in range(retries):
        try:
            async with session.get(url, headers=headers, timeout=timeout) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    print(f"Attempt {attempt + 1} failed with status {response.status}")
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            print(f"Attempt {attempt + 1} failed with error: {e}")

        if attempt < retries - 1:
            await asyncio.sleep(delay)
            delay *= backoff  # Exponential backoff
    raise Exception(f"Failed to fetch {url} after {retries} retries")


async def fetch_commits_for_day(
    session: aiohttp.ClientSession,
    repo_id: str,
    repo_name: str,
    day_start: datetime,
    day_end: datetime,
    filter_user: bool = False,
) -> pd.DataFrame:
    from_str = day_start.isoformat() + "Z"
    to_str = day_end.isoformat() + "Z"

    email_filter = USER

    if filter_user:
        url = (
            f"{BASE_URL}/{repo_id}/commits?searchCriteria.fromDate={from_str}"
            f"&searchCriteria.toDate={to_str}"
            f"&searchCriteria.$top=1000"
            f"&searchCriteria.author={email_filter}&api-version=7.1"
        )
    else:
        url = (
            f"{BASE_URL}/{repo_id}/commits?searchCriteria.fromDate={from_str}"
            f"&searchCriteria.toDate={to_str}&searchCriteria.$top=1000&api-version=7.1"
        )

    all_commits = []
    while url:
        try:
            data = await fetch_with_retries(session, url, HEADERS)
            batch_commits = data.get("value", [])
            all_commits.extend(batch_commits)

            # Parse 'next' URL from Link header if present
            next_url = None
            link_header = data.get("Link", "")
            if link_header and 'rel="next"' in link_header:
                parts = link_header.split(";")
                if parts:
                    first_part = parts[0].strip()
                    if first_part.startswith("<") and first_part.endswith(">"):
                        next_url = first_part[1:-1]

            url = next_url if next_url and next_url != url else None

        except Exception as e:
            print(
                f"Failed to fetch commits for repository '{repo_name}' (ID: {repo_id}) "
                f"on {day_start.date()}. Error: {e}"
            )
            break

    # Summarize commits
    commit_summaries = []
    for commit in all_commits:
        commit_id = commit.get("commitId")
        commit_url = commit.get("url")

        if commit_url and commit_id:
            try:
                commit_data = await fetch_with_retries(session, commit_url, HEADERS)
                changes_url = (
                    commit_data.get("_links", {}).get("changes", {}).get("href")
                )
                if changes_url:
                    changes_data = await fetch_with_retries(
                        session, changes_url + "?includeDiff=true", HEADERS
                    )
                    commit_data["changes"] = changes_data

                commit_data["summary"] = summarize_commit(commit_data)
                commit_data["summary"]["repoName"] = repo_name
                commit_summaries.append(commit_data["summary"])
            except Exception as e:
                print(f"Failed to fetch commit details for {commit_id}: {e}")

    if commit_summaries:
        return pd.DataFrame(commit_summaries)
    else:
        return pd.DataFrame(
            columns=["commitId", "authorName", "comment", "repoName", "date"]
        )


async def fetch_commits_for_repo(
    session: aiohttp.ClientSession,
    repo_id: str,
    repo_name: str,
    start_date: datetime,
    end_date: datetime,
) -> pd.DataFrame:
    """
    Orchestrate day-by-day async calls to fetch commits for an entire date range.
    Returns a combined DataFrame.
    """

    tasks = []
    current_date = start_date
    while current_date < end_date:
        next_date = current_date + timedelta(days=1)
        task = asyncio.create_task(
            fetch_commits_for_day(
                session,
                repo_id,
                repo_name,
                current_date,
                next_date,
                filter_user=FILTER_USER,
            )
        )
        tasks.append(task)
        current_date = next_date

    daily_dfs = await asyncio.gather(*tasks)

    if daily_dfs:
        return pd.concat(daily_dfs, ignore_index=True)
    else:
        return pd.DataFrame()

In [None]:
async with aiohttp.ClientSession() as session:
    repo_id_name_map = await fetch_repositories(session)
    all_repo_ids = list(repo_id_name_map.keys())
    print(f"Found {len(all_repo_ids)} repositories.")

    tasks = [
        fetch_commits_for_repo(
            session, repo_id, repo_id_name_map[repo_id], START_DATE, END_DATE
        )
        for repo_id in all_repo_ids
    ]

    results = await asyncio.gather(*tasks)
    data = pd.concat(results)
    data.reset_index(drop=True, inplace=True)
    data = data.infer_objects()
    data["changedFilesCount"] = data["changedFilesCount"].astype(int)

In [None]:
data.head(15)

In [None]:
all_files = [file for sublist in data["files"] for file in sublist]

unique_files = set(all_files)

distinct_file_count = len(unique_files)

print("Total distinct files:", distinct_file_count)

In [None]:
data_exploded = data.explode("files")

distinct_files_per_repo = (
    data_exploded.groupby("repoName")
    .agg(distinctFileCount=("files", "nunique"))
    .sort_values(by="distinctFileCount", ascending=False)
)

distinct_files_per_repo

In [None]:
data.groupby("repoName").agg(
    {"commitId": "nunique", "changedFilesCount": "sum"}
).sort_values(by="commitId", ascending=False)

In [None]:
data.groupby(["authorName"]).agg(
    {"changedFilesCount": "sum", "commitId": "nunique", "repoName": "nunique"}
).sort_values("commitId", ascending=False).head(10)