In [1]:
import numpy as np
import pandas as pd
import os, requests, zipfile, warnings, time, random
from io import BytesIO
import datetime
import tqdm
import concurrent.futures
from threading import Lock
warnings.filterwarnings('ignore')

In [2]:
test_url = 'http://data.gdeltproject.org/gdeltv2/20250219000000.gkg.csv.zip'
base_url = 'http://data.gdeltproject.org/gdeltv2/'

In [3]:
# GCAM sentiment variable dictionary
doc = pd.read_csv(r"..\Data\GCAM-MASTER-CODEBOOK.TXT",
        delimiter='\t',
        encoding='latin',
        low_memory=False,
        usecols=['Variable', 'Type', 'DimensionHumanName'],
    )
doc['new_name'] = doc['Variable'] + '; ' + doc['Type'] + '; ' + doc['DimensionHumanName']
doc_dict = doc.to_dict(orient='list')

In [4]:
def parse_data(data):
    df = pd.read_csv(
        data,
        delimiter='\t',
        encoding='latin',
        low_memory=False,
        header=None,
        usecols=[0, 1, 2, 3, 4, 7, 9, 13, 15, 17, 25, 26],
        names=[
            'GKGRECORDID',
            'V2.1DATE',
            'V2SOURCECOLLECTIONIDENTIFIER',
            'V2SOURCECOMMONNAME',
            'V2DOCUMENTIDENTIFIER',
            'V1THEMES',
            'V1LOCATIONS',
            'V1ORGANIZATIONS',
            'V1.5TONE',
            'V2GCAM',
            'V2.1TRANSLATIONINFO',
            'V2EXTRASXML'
        ]
    )
    return(df)

In [5]:
def download_unzip(url):
    """
    Downloads a zipped CSV file from the given URL, extracts the first file in the archive,
    and parses it into a pandas DataFrame using the parse_data function.

    Args:
        url (str): The URL to the .zip file containing the CSV.

    Returns:
        tuple: (DataFrame, status_code)
            DataFrame: The parsed data as a pandas DataFrame.
            status_code: The HTTP status code from the download request.
    """
    response = requests.get(url)
    status_code = response.status_code
    
    # Check if request was successful before proceeding
    if status_code != 200:
        return None, status_code
    
    else:
        # Create a BytesIO object from the downloaded content
        zip_bytes = BytesIO(response.content)

        # Open the zip file in memory
        with zipfile.ZipFile(zip_bytes, 'r') as zip_file:
            # Get the first file in the zip
            first_file = zip_file.namelist()[0]
            
            # Read the file content directly into memory
            with zip_file.open(first_file) as file:
                return(parse_data(file), status_code)

In [6]:
def generate_url_list(start_dt, end_dt, increment_minutes=15):
    """
    Generates a list of GDELT GKG .csv.zip file URLs between two datetimes.

    Args:
        start_dt (datetime.datetime): The starting datetime.
        end_dt (datetime.datetime): The ending datetime.
        increment_minutes (int, optional): The interval in minutes between URLs. Defaults to 15.

    Returns:
        list: List of URLs as strings for each time increment between start_dt and end_dt.
    """
    url_list = []
    current_dt = start_dt

    while current_dt <= end_dt:
        # Format as YYYYMMDDHHMMSS
        datetime_str = current_dt.strftime('%Y%m%d%H%M%S')
        url_list.append(f'{base_url}{datetime_str}.gkg.csv.zip')

        # Add increment
        current_dt += datetime.timedelta(minutes=increment_minutes)

    return url_list

In [7]:
# function to chunk datetime_strings into groups of 100
def chunk_datetime_strings(urls, chunk_size=100):
    return [urls[i:i + chunk_size] for i in range(0, len(urls), chunk_size)]

In [8]:
def download_chunk_parallel(chunk, max_workers=10):
    """
    Downloads and processes multiple GDELT CSV files in parallel using threading.
    
    This function downloads zipped CSV files from URLs, extracts and parses them into
    pandas DataFrames, and combines them into a single DataFrame. It includes error
    handling, logging of failed downloads, and automatic stopping if too many
    consecutive downloads fail.
    
    Args:
        chunk (list): List of URLs pointing to .zip files containing CSV data.
        max_workers (int, optional): Maximum number of concurrent threads for downloading.
            Defaults to 10.
    
    Returns:
        pandas.DataFrame or None: Combined DataFrame containing data from all successful
            downloads, or None if no downloads succeeded.
    
    Features:
        - Parallel downloading using ThreadPoolExecutor
        - Thread-safe logging of failed downloads to timestamped log file
        - Progress bar showing download status
        - Automatic stopping if last 10 consecutive downloads fail
        - Error handling for network issues and parsing problems
    
    Error Handling:
        - HTTP errors (non-200 status codes) are logged
        - Network exceptions are caught and logged
        - Failed downloads are written to 'failed_downloads_YYYYMMDD_HHMMSS.txt'
        - Function stops early if 10 consecutive downloads fail to prevent wasted resources
    
    Note:
        Uses the existing download_unzip() and parse_data() functions for individual file processing.
    """
    df_list = []
    response_codes = []
    current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = f"../logs/failed_downloads_{current_time}.txt"
    lock = Lock()  # For thread-safe file writing
    
    def download_single_url(url):
        try:
            df, code = download_unzip(url)
            
            # Thread-safe logging of response codes
            with lock:
                response_codes.append(code)
                
                if code != 200:
                    with open(log_file, "a") as f:
                        f.write(f"{url} - Status Code: {code}\n")
                else:
                    return df
                    
                # Check last 10 response codes
                if len(response_codes) >= 10 and all(c != 200 for c in response_codes[-10:]):
                    raise Exception("Error: Last 10 downloads failed (status code != 200). Stopping.")

        except Exception as e:
            with lock:
                with open(log_file, "a") as f:
                    f.write(f"{url} - Error: {str(e)}\n")
            print(f"Failed to download or parse {url}: {str(e)}")
            return None

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all download tasks
        future_to_url = {executor.submit(download_single_url, url): url for url in chunk}
        
        # Collect results with progress bar
        for future in tqdm.tqdm(concurrent.futures.as_completed(future_to_url), 
                               total=len(chunk), desc="Downloading", unit="file"):
            try:
                result = future.result()
                if result is not None:
                    df_list.append(result)
            except Exception as e:
                print(f"Error in thread execution: {str(e)}")
                break

            time.sleep(0.1)
    
    if df_list:
        return pd.concat(df_list, ignore_index=True)
    else:
        print(f"None of the URLs worked.")
        return None

In [9]:
def first_pass_clean(df):
    # Limit to include United States
    df = df[df['V1LOCATIONS'].str.contains('united states', case=False, na=False)]

    # Limit to include airlines
    df = df[df['V1ORGANIZATIONS'].str.contains(
        "Alaska Airlines|American Airlines|Delta Air Lines|Frontier Airlines|Hawaiian Airlines|JetBlue|Southwest Airlines|Spirit Airlines|Sun Country Airlines|United Airlines|Allegiant Air"
        , case=False, na=False, regex=True)]
    
    # Drop if missing fields
    df = df.dropna(
        subset=['GKGRECORDID', 'V2.1DATE', 'V2SOURCECOLLECTIONIDENTIFIER',
       'V2DOCUMENTIDENTIFIER', 'V1LOCATIONS',
       'V1ORGANIZATIONS', 'V1.5TONE', 'V2GCAM']
        )
    
    # reset the index
    df = df.reset_index(drop=True)

    return(df)

In [10]:
def second_pass_clean(df):
    df['V2SOURCECOLLECTIONIDENTIFIER'] = df['V2SOURCECOLLECTIONIDENTIFIER'].astype(np.int8)

    # dates
    df['datetime'] = pd.to_datetime(df['V2.1DATE'], format='%Y%m%d%H%M%S', errors='coerce')
    df['date'] = df['datetime'].dt.date

    # company Dummies
    to_check= ["airplane","airline","airport","Alaska Airlines","American Airlines","Delta Air Lines","Frontier Airlines","Hawaiian Airlines","JetBlue","Southwest Airlines","Spirit Airlines","Sun Country Airlines","United Airlines","Allegiant Air"]
    for word in to_check:
        df[word] = df['V1ORGANIZATIONS'].str.contains(word, case=False, na=False).astype(np.int8)

    # Extract the article title from the V2EXTRASXML column, which is between <PAGE_TITLE> and </PAGE_TITLE> 
    df['article_title'] = df['V2EXTRASXML'].str.extract(r'<PAGE_TITLE>(.*?)</PAGE_TITLE>', expand=False)

    # Split V1.5TONE into multiple columns using , as the delimiter
    df[['Tone','Positive Score','Negative Score',
        'Polarity','Activity Reference Density',
        'Self/Group Reference Density','Word Count']] = df['V1.5TONE'].str.split(',', expand=True)
    # Convert the tone columns to numeric, coercing errors to 0
    df[['Tone','Positive Score','Negative Score','Polarity','Activity Reference Density',
        'Self/Group Reference Density','Word Count']] = df[[
        'Tone','Positive Score','Negative Score','Polarity','Activity Reference Density',
        'Self/Group Reference Density','Word Count']].apply(pd.to_numeric, downcast="integer", errors='coerce').fillna(0)

    df['V2GCAM'] = df['V2GCAM'].str.split(',')

    # Keep the first time that a V2DOCUMENTIDENTIFIER value appears
    df = df.sort_values(by=['V2DOCUMENTIDENTIFIER', 'datetime']).drop_duplicates(subset='V2DOCUMENTIDENTIFIER', keep='first')

    df.drop(columns=['V2.1DATE','V1LOCATIONS','V2EXTRASXML','V1.5TONE','V1ORGANIZATIONS'], inplace=True)

    # Handling GCAM attributes
    def list_to_dict(list):
        return {item.split(':')[0]: item.split(':')[1] for item in list if ':' in item}
    df['GCAM'] = df['V2GCAM'].apply(list_to_dict)

    # Now create the columns. Documentation says if the value is missing, it should be 0
    for i in range(len(doc_dict['Variable'])):
        key = doc_dict['Variable'][i]
        column_name = doc_dict['new_name'][i]
        df[column_name] = df['GCAM'].apply(lambda x: x.get(key) if x.get(key) is not None else 0)
        # Convert to numeric
        df[column_name] = pd.to_numeric(df[column_name], downcast="integer", errors='coerce').fillna(0)

    df.drop(columns=['V2GCAM','GCAM'], inplace=True)

    return(df)

In [None]:
def gdelt_wrapper(output_path = '../data/processed',
        start = datetime.datetime(2018, 1, 1, 0, 0, 0),
        end = datetime.datetime(2025, 5, 1, 0, 0, 0),
        chunk_size = 100,
        num_chunks = 1,
        max_workers = 10,
        seed = 1234
        ):
    
    urls = generate_url_list(start, end, 15)
    # Randomly shuffle the urls
    random.seed(seed)
    random.shuffle(urls)

    chunked_datetime_strings = chunk_datetime_strings(urls, chunk_size)
    chunks = chunked_datetime_strings[0:num_chunks]

    for i in range(num_chunks):
        df = download_chunk_parallel(chunks[i], max_workers)
        print(f"Cleaning chunk {i+1} of {num_chunks}...")
        df = first_pass_clean(df)
        df = second_pass_clean(df)
        
        print(f"Saving chunk {i+1} of {num_chunks}...")
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = f"{output_path}/gdelt_cleaned_chunk{i+1}_of_{num_chunks}_{timestamp}.csv"
        df.to_csv(output_file, index=False)

        if i==0:
            combined_df = df
        else:
            combined_df = pd.concat([combined_df, df], ignore_index=True)

    print(f"Saving combined data...")
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    combined_output_file = f"{output_path}/gdelt_cleaned_combined_{timestamp}.csv"
    combined_df.to_csv(combined_output_file, index=False)

    print(f"Done! Combined data saved to {combined_output_file}")
    return(combined_df)

In [14]:
# Test run
df = gdelt_wrapper(chunk_size=5,num_chunks=2)

Downloading: 100%|██████████| 5/5 [00:02<00:00,  2.13file/s]


Cleaning chunk 1 of 2...
Saving chunk 1 of 2...


Downloading: 100%|██████████| 5/5 [00:01<00:00,  3.18file/s]


Cleaning chunk 2 of 2...
Saving chunk 2 of 2...
Saving combined data...


In [15]:
df

Unnamed: 0,GKGRECORDID,V2SOURCECOLLECTIONIDENTIFIER,V2SOURCECOMMONNAME,V2DOCUMENTIDENTIFIER,V1THEMES,V2.1TRANSLATIONINFO,datetime,date,airplane,airline,...,v42.2; SCOREDVALUE; care_p,v42.3; SCOREDVALUE; fairness_p,v42.4; SCOREDVALUE; loyalty_p,v42.5; SCOREDVALUE; authority_p,v42.6; SCOREDVALUE; sanctity_p,v42.7; SCOREDVALUE; care_sent,v42.8; SCOREDVALUE; fairness_sent,v42.9; SCOREDVALUE; loyalty_sent,v42.10; SCOREDVALUE; authority_sent,v42.11; SCOREDVALUE; sanctity_sent
0,20181105004500-1323,1,kxxv.com,http://www.kxxv.com/story/39414478/american-ai...,TAX_ETHNICITY;TAX_ETHNICITY_AMERICAN;CRISISLEX...,,2018-11-05 00:45:00,2018-11-05,0,1,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,20181105004500-1392,1,aecnewstoday.com,https://aecnewstoday.com/2018/singapore-mornin...,EPU_ECONOMY_HISTORIC;ECON_IPO;WB_135_TRANSPORT...,,2018-11-05 00:45:00,2018-11-05,0,1,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,20210729070000-375,1,gazette.com,https://gazette.com/ap/lifestyles/biden-to-lau...,TAX_FNCACT;TAX_FNCACT_EMPLOYERS;WB_2690_CATEGO...,,2021-07-29 07:00:00,2021-07-29,0,1,...,0.098804,0.107081,0.086125,0.098837,0.075707,-0.068714,-0.015434,0.000758,-0.029164,-0.028657
3,20210729070000-889,1,lancasteronline.com,https://lancasteronline.com/business/nation/bi...,TAX_FNCACT;TAX_FNCACT_EMPLOYERS;WB_2690_CATEGO...,,2021-07-29 07:00:00,2021-07-29,0,1,...,0.098643,0.106932,0.085988,0.098627,0.075666,-0.067213,-0.015217,0.000614,-0.028143,-0.02879
4,20210729070000-971,1,mynorthwest.com,https://mynorthwest.com/3059912/biden-to-launc...,LEADER;TAX_FNCACT;TAX_FNCACT_PRESIDENT;USPEC_P...,,2021-07-29 07:00:00,2021-07-29,0,1,...,0.098716,0.106009,0.085518,0.098265,0.075773,-0.071774,-0.012579,-0.001504,-0.027363,-0.029119
5,20210729070000-689,1,inquirer.net,https://newsinfo.inquirer.net/1466378/the-day-...,WB_698_TRADE;MANMADE_DISASTER_IMPLIED;WB_165_A...,,2021-07-29 07:00:00,2021-07-29,0,1,...,0.103529,0.083451,0.083486,0.091003,0.07277,-0.142955,-0.071575,-0.039296,-0.040589,-0.066856
6,20181105004500-723,1,rbtereport.com,https://rbtereport.com/airline-ancillary-servi...,,,2018-11-05 00:45:00,2018-11-05,0,1,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,20181105004500-430,1,trib.com,https://trib.com/news/state-and-regional/unite...,MANMADE_DISASTER_IMPLIED;CRISISLEX_C07_SAFETY;...,,2018-11-05 00:45:00,2018-11-05,0,1,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
8,20240916090000-388,1,969hitsfm.com,https://www.969hitsfm.com/trip/,,,2024-09-16 09:00:00,2024-09-16,0,1,...,0.106756,0.127524,0.086507,0.103459,0.087559,-0.04465,0.00195,-0.006138,-0.016564,-0.048873
9,20210729070000-65,1,birminghamstar.com,https://www.birminghamstar.com/news/270490708/...,TAX_ECON_PRICE;EDUCATION;SOC_POINTSOFINTEREST;...,,2021-07-29 07:00:00,2021-07-29,0,1,...,0.099892,0.082915,0.076934,0.075348,0.059729,-0.110396,-0.047216,-0.025282,0.005746,-0.014049
