In [None]:
# GitHub API Extraction using Python

## Introduction

In this notebook, we will interact with the GitHub API to fulfill the client's needs:

- Search Repositories (public)
- Retrieve Commits
- Access Repository Contents

We will implement authentication, handle pagination, manage rate limits, and include error handling as per best practices.

## Setup

### Import Libraries

```python
import requests
import time
import os
from requests.exceptions import HTTPError, ConnectionError, Timeout


## Authentication
### We need to authenticate our requests using a GitHub Personal Access Token (PAT).

In [None]:
# Securely load the GitHub token from environment variables
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')

if not GITHUB_TOKEN:
    raise ValueError("GitHub token not found. Please set the GITHUB_TOKEN environment variable.")

headers = {
    'Authorization': f'token {GITHUB_TOKEN}',
    'Accept': 'application/vnd.github.v3+json'
}


## Note: Ensure that you have set the GITHUB_TOKEN environment variable before running this notebook. You can set it in Colab using:

In [None]:
import os
os.environ['GITHUB_TOKEN'] = 'your_personal_access_token'

## Helper Functions
### Rate Limit Handling

In [None]:
def check_rate_limit(response):
    """
    Checks the API rate limit and sleeps if necessary.

    Args:
        response (requests.Response): The response object from a previous request.

    Raises:
        Exception: If the rate limit is exceeded and cannot proceed.
    """
    remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
    reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
    if remaining == 0:
        sleep_time = reset_time - int(time.time()) + 5  # Add buffer
        if sleep_time > 0:
            print(f"Rate limit exceeded. Sleeping for {sleep_time} seconds.")
            time.sleep(sleep_time)
        else:
            raise Exception("Rate limit exceeded and cannot sleep.")


## Error Handling

In [None]:
def handle_errors(response):
    """
    Handles HTTP errors and raises exceptions.

    Args:
        response (requests.Response): The response object to check.

    Raises:
        HTTPError: If an HTTP error occurred.
    """
    try:
        response.raise_for_status()
    except HTTPError as http_err:
        status_code = response.status_code
        error_message = response.json().get('message', '')
        print(f"HTTP error occurred: {http_err} - {error_message}")
        if status_code == 401:
            print("Unauthorized access. Check your authentication token.")
        elif status_code == 403:
            print("Forbidden access. You might have hit the rate limit.")
        elif status_code == 404:
            print("Resource not found. Check the endpoint and parameters.")
        else:
            print(f"An error occurred: {response.content}")
        raise
    except Exception as err:
        print(f"An error occurred: {err}")
        raise


## Pagination Handling

In [None]:
def get_next_page(response):
    """
    Parses the 'Link' header to get the URL for the next page.

    Args:
        response (requests.Response): The response object containing pagination links.

    Returns:
        str or None: The URL for the next page, or None if there is no next page.
    """
    link = response.headers.get('Link', '')
    if 'rel="next"' in link:
        links = link.split(',')
        for l in links:
            if 'rel="next"' in l:
                url = l[l.find('<') + 1:l.find('>')]
                return url
    return None


## Functions for API Endpoints

### Search Repositories

In [None]:
def search_repositories(query, per_page=30, max_pages=5):
    """
    Searches public repositories based on a query.

    Args:
        query (str): The search query.
        per_page (int, optional): Number of results per page. Defaults to 30.
        max_pages (int, optional): Maximum number of pages to retrieve. Defaults to 5.

    Returns:
        list: A list of repository items.
    """
    url = 'https://api.github.com/search/repositories'
    params = {'q': query, 'per_page': per_page}
    all_items = []
    page = 1

    while url and page <= max_pages:
        try:
            response = requests.get(url, headers=headers, params=params)
            handle_errors(response)
            check_rate_limit(response)
            data = response.json()
            items = data.get('items', [])
            all_items.extend(items)
            print(f"Fetched {len(items)} repositories from page {page}.")
            url = get_next_page(response)
            params = None  # Clear params after the first request
            page += 1
        except (ConnectionError, Timeout) as e:
            print(f"Network error occurred: {e}. Retrying after 5 seconds.")
            time.sleep(5)
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            break
    return all_items


### Example Usage

In [None]:
# Example: Search for repositories related to 'data analysis'
repositories = search_repositories('data analysis', per_page=30, max_pages=2)

print(f"Total repositories fetched: {len(repositories)}")
for repo in repositories[:5]:
    print(f"{repo['full_name']}: {repo['html_url']}")


### Retrieve Commits

In [None]:
def get_commits(owner, repo, per_page=30, max_pages=5):
    """
    Retrieves commits from a repository.

    Args:
        owner (str): Owner of the repository.
        repo (str): Repository name.
        per_page (int, optional): Number of results per page. Defaults to 30.
        max_pages (int, optional): Maximum number of pages to retrieve. Defaults to 5.

    Returns:
        list: A list of commits.
    """
    url = f'https://api.github.com/repos/{owner}/{repo}/commits'
    params = {'per_page': per_page}
    all_commits = []
    page = 1

    while url and page <= max_pages:
        try:
            response = requests.get(url, headers=headers, params=params)
            handle_errors(response)
            check_rate_limit(response)
            commits = response.json()
            all_commits.extend(commits)
            print(f"Fetched {len(commits)} commits from page {page}.")
            url = get_next_page(response)
            params = None
            page += 1
        except (ConnectionError, Timeout) as e:
            print(f"Network error occurred: {e}. Retrying after 5 seconds.")
            time.sleep(5)
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            break
    return all_commits


### Example Usage

In [None]:
# Example: Get commits from a specific repository
owner = 'xartd0'
repo = 'Hello-World'
commits = get_commits(owner, repo, per_page=30, max_pages=2)

print(f"Total commits fetched: {len(commits)}")
for commit in commits[:5]:
    message = commit['commit']['message']
    sha = commit['sha']
    print(f"Commit {sha[:7]}: {message.splitlines()[0]}")


### Access Repository Contents

In [None]:
def get_repo_contents(owner, repo, path='', ref=None):
    """
    Retrieves the contents of a repository or directory.

    Args:
        owner (str): Owner of the repository.
        repo (str): Repository name.
        path (str, optional): Path within the repository. Defaults to '' (root).
        ref (str, optional): The name of the commit/branch/tag. Defaults to default branch.

    Returns:
        list or dict: The contents of the repository at the given path.
    """
    url = f'https://api.github.com/repos/{owner}/{repo}/contents/{path}'
    params = {}
    if ref:
        params['ref'] = ref
    try:
        response = requests.get(url, headers=headers, params=params)
        handle_errors(response)
        check_rate_limit(response)
        contents = response.json()
        return contents
    except (ConnectionError, Timeout) as e:
        print(f"Network error occurred: {e}. Retrying after 5 seconds.")
        time.sleep(5)
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None


### Example Usage

In [None]:
# Example: Get the contents of the root directory of a repository
owner = 'xartd0'
repo = 'Hello-World'
contents = get_repo_contents(owner, repo)

if contents:
    print("Repository Contents:")
    for item in contents:
        print(f"{item['type']}: {item['name']}")
