<a href="https://colab.research.google.com/github/mrncstt/githubevents_pypsark/blob/main/github_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark

In [None]:
!python -m pip install --upgrade pip

In [7]:
import os
import requests
import calendar
from zipfile import ZipFile
from datetime import datetime
from pathlib import Path
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
from tqdm import tqdm
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, to_date, when
from IPython.display import FileLink, display
import gzip
import json
import time
import py4j
import logging
from concurrent.futures import ThreadPoolExecutor


In [8]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [9]:
def create_directory(path):
    """
    Creates a directory if it doesn't exist.

    Parameters:
    path (str): The path of the directory to be created.

    Logs:
    - Info: If the directory is created successfully or already exists.
    - Error: If an error occurs during the directory creation process.
    """
    try:
        path_obj = Path(path)
        if not path_obj.exists():
            path_obj.mkdir(parents=True, exist_ok=True)
            logging.info(f"Directory '{path}' created successfully.")
        else:
            logging.info(f"Directory '{path}' already exists.")
    except Exception as e:
        logging.error(f"An error occurred while creating the directory '{path}': {e}")

In [10]:
def download_file(url, output_file):
    """
    Downloads a file from a URL to the specified output path.

    Parameters:
    url (str): The URL of the file to be downloaded.
    output_file (str): The path where the downloaded file will be saved.

    Logs:
    - Info: If the file is downloaded successfully.
    - Error: If an error occurs during the download process, including HTTP errors, connection errors, and timeouts.
    """
    try:
        with requests.get(url, stream=True, timeout=10) as response:
            response.raise_for_status()
            total_size = int(response.headers.get('content-length', 0))
            block_size = 1024
            t = tqdm(total=total_size, unit='iB', unit_scale=True)

            with open(output_file, 'wb') as f:
                for data in response.iter_content(block_size):
                    t.update(len(data))
                    f.write(data)
            t.close()

        logging.info(f"Downloaded - {url}\nPath - {output_file}")
    except (requests.HTTPError, requests.ConnectionError, requests.Timeout) as e:
        logging.error(f"Error occurred while downloading {url}: {e}")
    except requests.RequestException as e:
        logging.error(f"Request error occurred while downloading {url}: {e}")

In [11]:
def download_github_events(year, month, output_dir):
    """
    Downloads GitHub event data for the specified year and month to the given output directory.

    Parameters:
    year (int): The year of the event data to download.
    month (int): The month of the event data to download.
    output_dir (str): The directory where the downloaded data will be saved.

    Creates directories for each day and hour if they don't exist, and downloads the data files. Uses parallel processing to speed up the download process.
    """
    base_url = "https://data.gharchive.org/"
    create_directory(output_dir)

    num_days = calendar.monthrange(year, month)[1]

    def download_day_hour(day, hour):
        """Downloads the GitHub event data for a specific day and hour."""
        day_dir = os.path.join(output_dir, f"{year}-{month:02d}-{day:02d}")
        create_directory(day_dir)

        url = f"{base_url}{year}-{month:02d}-{day:02d}-{hour}.json.gz"
        output_file = os.path.join(day_dir, f"{year}-{month:02d}-{day:02d}-{hour}.json.gz")
        if not os.path.exists(output_file):
            logging.info(f"Downloading {url} to {output_file}")
            download_file(url, output_file)
        else:
            logging.info(f"File already exists: {output_file}")

    with ThreadPoolExecutor(max_workers=2) as executor:
        for day in range(1, num_days + 1):
            for hour in range(24):
                executor.submit(download_day_hour, day, hour)

In [12]:
def verify_downloaded_files(year, month, output_dir):
    """
    Verifies that all expected files for a given month and year have been downloaded.

    This function checks for the presence of JSON files compressed with gzip (.json.gz)
    for every hour of every day in the specified month and year. It assumes the files are
    organized in a directory structure where each day has its own subdirectory named
    in the format 'YYYY-MM-DD', and each file is named in the format 'YYYY-MM-DD-HH.json.gz'.

    Parameters:
    year (int): The year for which to verify the files.
    month (int): The month for which to verify the files (1-12).
    output_dir (str): The base directory where the files are expected to be located.

    Prints:
    A list of missing files if any are not found, otherwise a confirmation message that all files are present.
    """
    missing_files = []
    num_days = calendar.monthrange(year, month)[1]

    for day in range(1, num_days + 1):
        for hour in range(24):
            expected_file = os.path.join(output_dir, f"{year}-{month:02d}-{day:02d}", f"{year}-{month:02d}-{day:02d}-{hour}.json.gz")
            if not os.path.exists(expected_file):
                missing_files.append(expected_file)

    if missing_files:
        print(f"Some files are missing ({len(missing_files)} files):")
        for file in missing_files:
            print(file)
    else:
        print("All files have been downloaded successfully.")


In [None]:
year = 2024
month = 5
output_dir = '/path/to/github_events/downloaded'
output_directory = '/path/to/github_events/processed'
zip_directory = '/path/to/github_events/zipped'

In [None]:
download_github_events(year, month, output_dir)

In [14]:
def read_files(output_dir):
    """
    Reads all .json.gz files from the specified directory and groups them by day.

    This function traverses through the specified directory and its subdirectories,
    collects all .json.gz files, and groups them by their date, assuming that each subdirectory
    is named in the format 'YYYY-MM-DD'. It returns a dictionary where the keys are dates and the
    values are lists of file paths.

    Parameters:
    output_dir (str): The base directory where the files are expected to be located.

    Returns:
    dict: A dictionary where the keys are dates (str) and the values are lists of file paths (str).

    Prints:
    A message indicating the number of files processed per day. If the directory does not exist,
    it prints an error message.

    """
    if not os.path.exists(output_dir):
        print(f"Directory {output_dir} does not exist.")
        return {}

    files_by_day = {}

    for root, _, files in os.walk(output_dir):
        for file in files:
            if file.endswith(".json.gz"):
                try:
                    file_path = os.path.join(root, file)
                    file_date = root.split('/')[-1]
                    if file_date not in files_by_day:
                        files_by_day[file_date] = []
                    files_by_day[file_date].append(file_path)
                except IndexError:
                    print(f"Skipping file with unexpected format: {file_path}")

    for date, files in files_by_day.items():
        print(f"Date: {date}, Files processed: {len(files)}")

    return files_by_day


Date: 2024-05-01, Files processed: 24
Date: 2024-05-02, Files processed: 23


 41%|████      | 39.2M/96.2M [00:00<00:01, 42.8MiB/s]

In [15]:
files_by_day = read_files(output_dir)

Date: 2024-05-03, Files processed: 7
Date: 2024-05-01, Files processed: 24
Date: 2024-05-02, Files processed: 24


 66%|██████▌   | 59.6M/90.3M [00:01<00:00, 48.0MiB/s]

In [25]:
def is_valid_json(filepath):
    """
    Checks if a gzip-compressed JSON file is valid.

    This function attempts to open and read a gzip-compressed file line by line,
    parsing each line as JSON. If all lines can be successfully parsed, the file
    is considered valid. If any line fails to parse as JSON or if the file cannot
    be read due to an EOFError, the file is considered invalid.

    Parameters:
    filepath (str): The path to the gzip-compressed JSON file.

    Returns:
    bool: True if the file is valid JSON, False otherwise.

    """
    try:
        with gzip.open(filepath, 'rt', encoding='utf-8') as f:
            for line in f:
                json.loads(line)
        return True
    except (json.JSONDecodeError, EOFError):
        return False

In [26]:

def create_spark_session(max_retries=3, retry_delay=5):
    """
    Creates a Spark session with retry logic.

    This function attempts to create a Spark session using the given number of retries and delay between retries.
    If the Spark session cannot be created due to a Py4JNetworkError, it retries up to the specified maximum number of retries,
    waiting for the specified delay between each attempt. If it fails to create the Spark session after the specified number of retries,
    it raises an exception.

    Parameters:
    max_retries (int): The maximum number of retry attempts (default is 3).
    retry_delay (int): The delay in seconds between retry attempts (default is 5 seconds).

    Returns:
    SparkSession: A SparkSession object if the session is successfully created.

    Raises:
    Exception: If the Spark session cannot be created after the specified number of retries.

    """
    retries = 0
    while retries < max_retries:
        try:
            spark = SparkSession.builder.appName("GitHub").getOrCreate()
            return spark
        except py4j.protocol.Py4JNetworkError as e:
            retries += 1
            print(f"Retry {retries}/{max_retries} - Failed to create Spark session: {e}")
            time.sleep(retry_delay)
    raise Exception("Failed to create Spark session after multiple retries")

In [27]:
def process_files(files_by_day, downloaded_directory, processed_directory, max_retries=3, retry_delay=5):
    """
    Processes JSON files grouped by day, performing aggregations and saving results.

    This function processes JSON files grouped by their date, performing specific aggregations for repositories and users.
    The results are saved as CSV and Parquet files. It includes retry logic for handling failures during Spark session creation
    and file processing.

    Parameters:
    files_by_day (dict): A dictionary where the keys are dates (str) and the values are lists of file paths (str).
    downloaded_directory (str): The directory where the downloaded JSON files are located.
    processed_directory (str): The directory where the processed CSV and Parquet files will be saved.
    max_retries (int): The maximum number of retry attempts for creating the Spark session and processing each file (default is 3).
    retry_delay (int): The delay in seconds between retry attempts (default is 5 seconds).

    Prints:
    Progress and error messages, including retries and skipping of corrupted files.
    """
    os.makedirs(processed_directory, exist_ok=True)

    for file_date, files in files_by_day.items():
        print(f"Processing files for {file_date}...")

        repo_output_csv = os.path.join(processed_directory, f"repo_agg_{file_date}.csv")
        repo_output_parquet = os.path.join(processed_directory, f"repo_agg_{file_date}.parquet")
        user_output_csv = os.path.join(processed_directory, f"user_agg_{file_date}.csv")
        user_output_parquet = os.path.join(processed_directory, f"user_agg_{file_date}.parquet")

        if (os.path.exists(repo_output_csv) and os.path.exists(repo_output_parquet) and
            os.path.exists(user_output_csv) and os.path.exists(user_output_parquet)):
            print(f"Output files for {file_date} already exist. Skipping processing.")
            continue

        spark = create_spark_session(max_retries, retry_delay)

        for file in files:
            file_path = os.path.join(downloaded_directory, file)

            if not is_valid_json(file_path):
                print(f"Skipping corrupted file {file_path}")
                continue

            retries = 0
            success = False

            while retries < max_retries and not success:
                try:
                    df = spark.read.json(file_path)

                    repo_df = df.select(to_date(col("created_at")).alias("date"),
                                        col("repo.id").alias("project_id"),
                                        col("repo.name").alias("project_name"),
                                        col("type"))

                    user_df = df.select(to_date(col("created_at")).alias("date"),
                                        col("actor.id").alias("user_id"),
                                        col("actor.login").alias("user_login"),
                                        col("type"))

                    repo_agg = repo_df.groupBy("date", "project_id", "project_name").agg(
                        count(when(col("type") == "WatchEvent", True)).alias("stars"),
                        count(when(col("type") == "ForkEvent", True)).alias("forks"),
                        count(when(col("type") == "IssuesEvent", True)).alias("issues"),
                        count(when(col("type") == "PullRequestEvent", True)).alias("prs")
                    )

                    user_agg = user_df.groupBy("date", "user_id", "user_login").agg(
                        count(when(col("type") == "WatchEvent", True)).alias("starred_projects"),
                        count(when(col("type") == "IssuesEvent", True)).alias("issues_created"),
                        count(when(col("type") == "PullRequestEvent", True)).alias("prs_created")
                    )

                    repo_agg.write.csv(repo_output_csv, header=True, mode='overwrite')
                    repo_agg.write.parquet(repo_output_parquet, mode='overwrite')
                    user_agg.write.csv(user_output_csv, header=True, mode='overwrite')
                    user_agg.write.parquet(user_output_parquet, mode='overwrite')

                    print(f"Processed and saved data for file {file}")
                    success = True
                except py4j.protocol.Py4JJavaError as e:
                    retries += 1
                    print(f"Retry {retries}/{max_retries} - Failed to process file {file} for {file_date} due to Java error: {e}")
                    time.sleep(retry_delay)
                except py4j.protocol.Py4JNetworkError as e:
                    retries += 1
                    print(f"Retry {retries}/{max_retries} - Failed to process file {file} for {file_date} due to network error: {e}")
                    time.sleep(retry_delay)
                except Exception as e:
                    print(f"Failed to process file {file} for {file_date}: {e}")
                    break

        spark.stop()


  1%|          | 408k/53.9M [00:00<00:13, 4.05MiB/s]

In [None]:
process_files(files_by_day, output_dir, output_directory)

In [None]:
os.makedirs(zip_directory, exist_ok=True)

In [None]:
def zip_all_subdirectories(output_directory, zip_directory):
    """
    Zips all subdirectories within the specified output directory.

    This function traverses the specified output directory, zipping all subdirectories
    that contain files. Each zip file is saved in a corresponding subdirectory within
    the specified zip directory. The zip files are named based on the relative path
    of the subdirectory, with '/' replaced by '_'.

    Parameters:
    output_directory (str): The base directory containing the subdirectories to be zipped.
    zip_directory (str): The base directory where the zip files will be saved.

    Prints:
    Progress messages, including skipping existing zip files and any errors encountered
    during the zipping process.

    """
    for root, dirs, files in os.walk(output_directory):
        if files:
            subdirectory_name = os.path.relpath(root, output_directory)
            zip_file_name = f"{subdirectory_name.replace('/', '_')}.zip"

            date_subdirectory = os.path.join(zip_directory, subdirectory_name)
            os.makedirs(date_subdirectory, exist_ok=True)

            zip_file_path = os.path.join(date_subdirectory, zip_file_name)

            if os.path.exists(zip_file_path):
                print(f"Zip file {zip_file_path} already exists. Skipping...")
                continue

            try:
                with ZipFile(zip_file_path, 'w') as zipf:
                    for file in files:
                        file_path = os.path.join(root, file)
                        zipf.write(file_path, os.path.relpath(file_path, output_directory))
                print(f"Zipped {root} to {zip_file_path}")
            except Exception as e:
                print(f"An error occurred while creating the zip file: {e}")

In [None]:
zip_all_subdirectories(output_directory, zip_directory)

In [None]:
def display_download_links(zip_directory):
    for root, dirs, files in os.walk(zip_directory):
        for file in files:
            if file.endswith('.zip'):
                file_path = os.path.join(root, file)
                display(FileLink(file_path))

In [None]:
display_download_links(zip_directory)