In [14]:
import os
import zipfile
import glob
from pyspark.sql.types import (StructType, StructField, StringType,
IntegerType, FloatType, TimestampType, LongType)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# **Check for missing files**
---

<b style='color:red'>Insert Jed's Code</b>
- ideally the missing files are in a list

In [2]:
# Define the target directory
target_directory = '/mnt/data/public/gdeltv2/gkg'
prefix = '201610'
suffix = '.gkg.csv.zip'

# List all files in the directory that match the pattern (start with the prefix and end with the suffix)
filtered_files = [
    file for file in os.listdir(target_directory)
    if file.startswith(prefix) and file.endswith(suffix)
]

In [3]:
# Confirm that the missing files identified are indeed not in the directory

# Change this to include all
'20161002174500.gkg.csv.zip' in filtered_files

False

# **Unzip the files**
---

In [6]:
# Define the directory containing the zip files and the pattern to search for
zip_directory = '/home/msds2024/pmartinez/cpt1_shared/data'
pattern = "*.gkg.csv.zip"

# Define the output directory where the files will be extracted
output_directory = '/home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201610'

# Create the output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

# Get a list of all files matching the pattern
zip_files = glob.glob(os.path.join(zip_directory, pattern))

# Extract each zip file to the output directory
for zip_filepath in zip_files:
    with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
        zip_ref.extractall(output_directory)

print(f"Extraction completed. Files were extracted to: {output_directory}")

Extraction completed. Files were extracted to: /home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201610


# **Check the total file size of the CSV files**
---

In [7]:
# Define the target directory
target_directory = '/home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201610'

# Initialize a variable to store the total size
total_size = 0

# Walk through all files and subdirectories in the specified directory
for dirpath, dirnames, filenames in os.walk(target_directory):
    for file in filenames:
        # Get the full path of the file
        filepath = os.path.join(dirpath, file)
        
        # Only consider if it's a file (not a directory), then add its size
        if os.path.isfile(filepath):
            total_size += os.path.getsize(filepath)

# Convert the size to gigabytes (GB)
total_size_gb = total_size / (1024 * 1024 * 1024)

# Print out the total size in GB
print(f"Total size of files in '{target_directory}': {total_size_gb:.2f} GB")

Total size of files in '/home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201610': 23.99 GB


# **Convert the CSV files to parquet for faster execution**

In [15]:
spark = (SparkSession
     .builder
     .master('local[*]') # Master URL;
     .getOrCreate())

In [16]:
schema = StructType([
    StructField("GKGRECORDID", StringType(), True),
    StructField("V2.1DATE", LongType(), True),
    StructField("V2SOURCECOLLECTIONIDENTIFIER", IntegerType(), True),
    StructField("V2SOURCECOMMONNAME", StringType(), True),
    StructField("V2DOCUMENTIDENTIFIER", StringType(), True),
    StructField("V1COUNTS", StringType(), True),
    StructField("V2.1COUNTS", StringType(), True),
    StructField("V1THEMES", StringType(), True),
    StructField("V2ENHANCEDTHEMES", StringType(), True),
    StructField("V1LOCATIONS", StringType(), True),
    StructField("V2ENHANCEDLOCATIONS", StringType(), True),
    StructField("V1PERSONS", StringType(), True),
    StructField("V2ENHANCEDPERSONS", StringType(), True),
    StructField("V1ORGANIZATIONS", StringType(), True),
    StructField("V2ENHANCEDORGANIZATIONS", StringType(), True),
    StructField("V1.5TONE", StringType(), True),
    StructField("V2.1ENHANCEDDATES", StringType(), True),
    StructField("V2GCAM", StringType(), True),
    StructField("V2.1SHARINGIMAGE", StringType(), True),
    StructField("V2.1RELATEDIMAGES", StringType(), True),
    StructField("V2.1SOCIALIMAGEEMBEDS", StringType(), True),
    StructField("V2.1SOCIALVIDEOEMBEDS", StringType(), True),
    StructField("V2.1QUOTATIONS", StringType(), True),
    StructField("V2.1ALLNAMES", StringType(), True),
    StructField("V2.1AMOUNTS", StringType(), True),
    StructField("V2.1TRANSLATIONINFO", StringType(), True),
    StructField("V2EXTRASXML", StringType(), True)
])

# Define the directory and file pattern
directory = '/home/msds2024/username/cptx_shared/sltxa/bdcc/201610'
file_pattern = '201610*.gkg.csv'
search_path = os.path.join(directory, file_pattern)

# List all files matching the pattern
files = glob.glob(search_path)

# Check if any files match the pattern
if not files:
    print(f"No files found matching pattern '{file_pattern}' in '{directory}'")
else:
    # Read the files into a Spark DataFrame with the specified schema
    df_gkg = spark.read.csv(files, sep='\t', schema=schema)

In [17]:
df_gkg.limit(3).toPandas()

Unnamed: 0,GKGRECORDID,V2.1DATE,V2SOURCECOLLECTIONIDENTIFIER,V2SOURCECOMMONNAME,V2DOCUMENTIDENTIFIER,V1COUNTS,V2.1COUNTS,V1THEMES,V2ENHANCEDTHEMES,V1LOCATIONS,...,V2GCAM,V2.1SHARINGIMAGE,V2.1RELATEDIMAGES,V2.1SOCIALIMAGEEMBEDS,V2.1SOCIALVIDEOEMBEDS,V2.1QUOTATIONS,V2.1ALLNAMES,V2.1AMOUNTS,V2.1TRANSLATIONINFO,V2EXTRASXML
0,20161004163000-0,20161004163000,2,BBC Monitoring,Tolo TV in Dari /BBC Monitoring/(c) BBC,,,TAX_FNCACT;TAX_FNCACT_LEADERS;TAX_TERROR_GROUP...,"LEADER,2404;LEADER,2739;TAX_TERROR_GROUP_TALIB...","4#Pol-E Khomri, Baghlan, Afghanistan#AF#AF03#3...",...,"wc:592,c1.2:2,c12.1:25,c12.10:46,c12.12:19,c12...",,,,,,"Jawed Salim,797;Kunduz Province,900;National D...",,,
1,20161004163000-1,20161004163000,2,BBC Monitoring,/BBC Monitoring/(c) BBC,,,SOC_EXPRESSREGRET;TAX_WORLDLANGUAGES;TAX_WORLD...,"WB_2467_TERRORISM,1507;WB_2433_CONFLICT_AND_VI...","1#United States#US#US#38#-97#US;4#Moscow, Mosk...",...,"wc:243,c12.1:17,c12.10:30,c12.12:12,c12.13:10,...",,,,,971|32||are continuing and will continue,"Dmitry Peskov,245;United States,562;United Sta...","2,countries,767;",,
2,20161004163000-2,20161004163000,2,BBC Monitoring,ORTM TV in French /BBC Monitoring/(c) BBC,,,LEADER;TAX_FNCACT;TAX_FNCACT_PRESIDENT;USPEC_P...,"EPU_POLICY_GOVERNMENT,1242;TAX_ETHNICITY_FRENC...","4#Valetta, Malta (General), Malta#MT#MT00#35.8...",...,"wc:187,c12.1:5,c12.10:18,c12.12:4,c12.13:4,c12...",,,,,,"Ibrahim Boubacar Keita,35;Francois Hollande,10...","2,heads of state discussed,924;",,


In [21]:
# Convert to parquet files
csv_file_path = "/home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201610"
out_path = "/home/msds2024/pmartinez/cpt1_shared/slt1a/bdcc/201610/parquets"

# Write the DataFrame to Parquet format
df_gkg.write.parquet(out_path, mode='overwrite')