In [27]:
# File Management and Archiving
import os
import glob
import shutil
import zipfile

# Data Processing with PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (StructType, StructField, StringType,
                               IntegerType, FloatType, TimestampType,
                               LongType)

<div style="background-color:#000000 ; padding: 10px 0;">
    <center><h1 style="color: white; font-weight:bold">FILE EXTRACTION</h1></center>
</div> 

Only certain files in the directory will be utilized. We will first extract these files then unzip them later.

In [16]:
def copy_files(destination_dir, search_pattern, start_date, end_date):
    """
    Copy files from a source directory to a target directory based on a date
    range.

    Parameters
    ----------
    destination_dir : str
        The path to the directory where the files should be copied.
    search_pattern : str
        The glob pattern to locate the source files (full path).
    start_date : str
        The earliest date to include in the filtering (format: YYYYMMDD).
    end_date : str
        The latest date to include in the filtering (format: YYYYMMDD).
    """
    os.makedirs(destination_dir, exist_ok=True)
    source_dir = "/mnt/data/public/gdeltv2/gkg"
    target_dir = destination_dir

    # Find all files matching the search_pattern
    all_files = glob.glob(search_pattern)

    # Initialize an empty list to collect matching files
    filtered_files = []

    # Iterate through the matched files and filter by the desired date range
    for file in all_files:
        # Extract the base file name without path
        base_name = os.path.basename(file)
        
        # Extract the date portion (first 8 characters) of the base name
        file_date = base_name[:8]
        
        # Check if the file date falls within the specified range
        if start_date <= file_date <= end_date:
            filtered_files.append(file)
    
    # Copy each filtered file to the target directory
    for file in filtered_files:
        shutil.copy(file, target_dir)
        print(f"Copied: {file} to {destination_dir}")

# Copy files 
# 2016-10-02 to 2016-10-08
copy_files('/home/msds2024/xx/cptxx_shared/data_2016',
           '/mnt/data/public/gdeltv2/gkg/201510*.gkg.csv.zip',
           '20161002', '20161008')

# 2015-10-02 to 2015-10-08
copy_files('/home/msds2024/xx/cptx_shared/data_2015',
           '/mnt/data/public/gdeltv2/gkg/201510*.gkg.csv.zip',
           '20151002', '20151008')

Copied: /mnt/data/public/gdeltv2/gkg/20151002130000.gkg.csv.zip to /home/msds2024/pmartinez/cpt1_shared/data_2015
Copied: /mnt/data/public/gdeltv2/gkg/20151005004500.gkg.csv.zip to /home/msds2024/pmartinez/cpt1_shared/data_2015
Copied: /mnt/data/public/gdeltv2/gkg/20151004104500.gkg.csv.zip to /home/msds2024/pmartinez/cpt1_shared/data_2015
Copied: /mnt/data/public/gdeltv2/gkg/20151008024500.gkg.csv.zip to /home/msds2024/pmartinez/cpt1_shared/data_2015
Copied: /mnt/data/public/gdeltv2/gkg/20151006184500.gkg.csv.zip to /home/msds2024/pmartinez/cpt1_shared/data_2015
Copied: /mnt/data/public/gdeltv2/gkg/20151005140000.gkg.csv.zip to /home/msds2024/pmartinez/cpt1_shared/data_2015
Copied: /mnt/data/public/gdeltv2/gkg/20151003213000.gkg.csv.zip to /home/msds2024/pmartinez/cpt1_shared/data_2015
Copied: /mnt/data/public/gdeltv2/gkg/20151007040000.gkg.csv.zip to /home/msds2024/pmartinez/cpt1_shared/data_2015
Copied: /mnt/data/public/gdeltv2/gkg/20151004094500.gkg.csv.zip to /home/msds2024/pmarti

<div style="background-color:#000000 ; padding: 10px 0;">
    <center><h1 style="color: white; font-weight:bold">UNZIP THE FILES</h1></center>
</div> 

In [18]:
def unzip_files(source_dir, destination_dir, pattern):
    """
    Extract ZIP files from a source directory to a destination directory.

    Parameters
    ----------
    source_dir : str
        The path to the directory containing the ZIP files.
    destination_dir : str
        The path to the directory where the extracted files should be saved.
    pattern : str
        The glob pattern to locate the ZIP files (e.g., "*.zip").
    """
    # Create the output directory if it doesn't exist
    os.makedirs(destination_dir, exist_ok=True)

    # Get a list of all files matching the pattern
    zip_files = glob.glob(os.path.join(source_dir, pattern))

    # Extract each zip file to the destination directory
    for zip_filepath in zip_files:
        with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
            zip_ref.extractall(destination_dir)
    print(f"Extraction completed. Files were extracted to: {destination_dir}")

# Unzip the 2015 and 2016 zip files
unzip_files('/home/msds2024/xx/cptx_shared/data_2015',
            '/home/msds2024/xx/cptx_shared/slt1a/bdcc/201510',
            '*.gkg.csv.zip')

unzip_files('/home/msds2024/xx/cptx_shared/data_2016',
            '/home/msds2024/xx/cptx_shared/slt1x/bdcc/201610',
            '*.gkg.csv.zip')

Extraction completed. Files were extracted to: /home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201510


<div style="background-color:#000000 ; padding: 10px 0;">
    <center><h1 style="color: white; font-weight:bold">CHECK TOTAL FILE SIZE</h1></center>
</div> 

In [21]:
def check_file_size(directory):
    """
    Calculate and print the total size of all files (in GB) in a specified
    directory.

    Parameters
    ----------
    directory : str
        The path to the directory whose total file size is to be calculated.
    """
    total_size = 0

    # List all items in the specified directory (only the top level)
    for item in os.listdir(directory):
        # Get the full path of the item
        item_path = os.path.join(directory, item)
        
        # Check if the item is a file, and add its size if it is
        if os.path.isfile(item_path):
            total_size += os.path.getsize(item_path)

    # Convert the size to gigabytes (GB)
    total_size_gb = total_size / (1024 * 1024 * 1024)
    
    # Print out the total size in GB
    print(f"Total size of files in '{directory}': {total_size_gb:.2f} GB")

check_file_size('/home/msds2024/xx/cptx_shared/sltxx/bdcc/201510')
check_file_size('/home/msds2024/xx/cptx_shared/sltxx/bdcc/201610')

Total size of files in '/home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201510': 17.80 GB
Total size of files in '/home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201610': 23.99 GB


<div style="background-color:#000000 ; padding: 10px 0;">
    <center><h1 style="color: white; font-weight:bold">CONVERT CSV TO PARQUET</h1></center>
</div> 

In [25]:
spark = (SparkSession
     .builder
     .master('local[*]') # Master URL;
     .getOrCreate())

In [26]:
def convert_to_parquet(csv_filepath, output_path, file_pattern):
    """
    Convert a set of CSV files to Parquet format based on a given schema and
    pattern.

    Parameters
    ----------
    csv_filepath : str
        The path to the directory containing the source CSV files.
    output_path : str
        The directory where the Parquet files should be written.
    file_pattern : str
        The glob pattern used to locate the relevant CSV files.
    """
    os.makedirs(output_path, exist_ok=True)
    schema = StructType([
        StructField("GKGRECORDID", StringType(), True),
        StructField("V2.1DATE", LongType(), True),
        StructField("V2SOURCECOLLECTIONIDENTIFIER", IntegerType(), True),
        StructField("V2SOURCECOMMONNAME", StringType(), True),
        StructField("V2DOCUMENTIDENTIFIER", StringType(), True),
        StructField("V1COUNTS", StringType(), True),
        StructField("V2.1COUNTS", StringType(), True),
        StructField("V1THEMES", StringType(), True),
        StructField("V2ENHANCEDTHEMES", StringType(), True),
        StructField("V1LOCATIONS", StringType(), True),
        StructField("V2ENHANCEDLOCATIONS", StringType(), True),
        StructField("V1PERSONS", StringType(), True),
        StructField("V2ENHANCEDPERSONS", StringType(), True),
        StructField("V1ORGANIZATIONS", StringType(), True),
        StructField("V2ENHANCEDORGANIZATIONS", StringType(), True),
        StructField("V1.5TONE", StringType(), True),
        StructField("V2.1ENHANCEDDATES", StringType(), True),
        StructField("V2GCAM", StringType(), True),
        StructField("V2.1SHARINGIMAGE", StringType(), True),
        StructField("V2.1RELATEDIMAGES", StringType(), True),
        StructField("V2.1SOCIALIMAGEEMBEDS", StringType(), True),
        StructField("V2.1SOCIALVIDEOEMBEDS", StringType(), True),
        StructField("V2.1QUOTATIONS", StringType(), True),
        StructField("V2.1ALLNAMES", StringType(), True),
        StructField("V2.1AMOUNTS", StringType(), True),
        StructField("V2.1TRANSLATIONINFO", StringType(), True),
        StructField("V2EXTRASXML", StringType(), True)
    ])
    search_path = os.path.join(csv_filepath, file_pattern)
    
    # List all files matching the pattern
    files = glob.glob(search_path)
    df = spark.read.csv(files, sep='\t', schema=schema)

    # Write the DataFrame to Parquet format
    df.write.parquet(output_path, mode='overwrite')

# Convert 2015 CSVs to parquet
convert_to_parquet("/home/msds2024/xx/cptx_shared/sltxx/bdcc/201510",
                   "/home/msds2024/xx/cptx_shared/sltxx/bdcc/"\
                   "201510/parquets",
                   "201510*.gkg.csv")

# Convert 2016 CSVs to parquet
convert_to_parquet("/home/msds2024/xx/cptx_shared/sltxx/bdcc/201610",
                   "/home/msds2024/xx/cptx_shared/sltxx/bdcc/"\
                   "201610/parquets",
                   "201610*.gkg.csv")