# Fetching and processing Gdelt data

- 2 years of data - use m5.24xlagre
- 3 mo of data - use 5.12xlarge

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
import re
import sys
import nltk
import zipfile
import warnings
import requests
import itertools
import numpy as np
import pandas as pd
import nest_asyncio
import plotly_express as px

from bs4 import BeautifulSoup
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

nltk.download('punkt')
nest_asyncio.apply()

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 500)

# pd.set_option('display.max_colwidth', -1)
pd.options.mode.chained_assignment = None

SPACE_REGEX = re.compile(r"\s+")
REGEX_TOKENIZER = re.compile(r'\w+')
LAT_LONG_REGEX = re.compile(r"[\#,]")

warnings.filterwarnings("ignore")
%matplotlib inline

In [None]:
# ==================================
# Path to data, models, results
# ==================================
HOME_DIR = os.environ['HOME_PROJECT_X'] if 'HOME_PROJECT_X' in os.environ else r'C:\ProjectX'

WORKSPACE_ROOT = os.path.join(HOME_DIR, 'workspace')
PATH_TO_PROJECT_X_REPO = os.path.join(WORKSPACE_ROOT, 'project_x')
PATH_TO_DATA_ROOT_DIR = os.path.join(WORKSPACE_ROOT, 'data')

In [None]:
# Add path to library to sys path
generic_utils_lib_dir = os.path.join(PATH_TO_PROJECT_X_REPO, 'common')

sys.path.extend([generic_utils_lib_dir])

from generic_utils import (downcast_datatypes, timing, create_output_dir, parallelize)

In [None]:
create_output_dir(PATH_TO_DATA_ROOT_DIR)

### Auxiliary methods

In [None]:
def get_links_for_time_range(start: str, end: str, multilingua_data: bool = False) -> pd.DataFrame:
    """
    Inclusive datetime range
    """
    if multilingua_data:
        # Multilingual data
        master_link = 'http://data.gdeltproject.org/gdeltv2/masterfilelist-translation.txt' 
    else:
        # EN data
        master_link = 'http://data.gdeltproject.org/gdeltv2/masterfilelist.txt' 
    
    get_links_to_processed_data_gdelt_v2 = pd.read_csv(master_link, header=None, sep='\s', 
                                                       names=['id_1', 'id_2', 'url'])
    
    get_links_to_processed_data_gdelt_v2['feed_type'] = \
        get_links_to_processed_data_gdelt_v2['url'].str.split('.').str[-3] 
    
    datetime_idx = -5 if multilingua_data else -4
    
    get_links_to_processed_data_gdelt_v2['datetime'] = \
        get_links_to_processed_data_gdelt_v2['url'].str.split('.').str[datetime_idx].str.split('/').str[-1]
    
    get_links_to_processed_data_gdelt_v2['datetime'] = \
        pd.to_datetime(get_links_to_processed_data_gdelt_v2['datetime'], format='%Y%m%d%H%M%S')
    
    print(f"All Gdelt links (3 feeds: events, mentions, knowledge graph): "
          f"{get_links_to_processed_data_gdelt_v2.shape[0]}")
    print(f"- How many records per each feed: "
          f"{get_links_to_processed_data_gdelt_v2['feed_type'].value_counts().to_dict()}")

    sample = get_links_to_processed_data_gdelt_v2[get_links_to_processed_data_gdelt_v2['datetime'].between(start, end)]
    print(f"- Number of Gdelt links for time range [{start}: {end}]: {sample.shape[0]}\n")
    
    return sample


def fetch_events_data(gdelt_v02_events_link: str) -> pd.DataFrame:

    use_cols_export = [
        'GLOBALEVENTID', 'SQLDATE', 'EventCode', 'EventBaseCode', 'EventRootCode', 'GoldsteinScale',
        'ActionGeo_FullName', 'ActionGeo_CountryCode', 'ActionGeo_Lat', 'ActionGeo_Long', 
        'DATEADDED', 'SOURCEURL'
    ]

    cols_order = [
        'globaleventid', 'sqldate', 'dateadded', 'eventrootcode', 'eventbasecode', 'eventcode', 'goldsteinscale', 
        'actiongeo_fullname', 'actiongeo_countrycode', 'actiongeo_countrycode_iso2', 'actiongeo_countrycode_iso3',
        'actiongeo_lat', 'actiongeo_long', 'sourceurl'
    ]
   

    try:
        events_v02_export_df = pd.read_csv(gdelt_v02_events_link, sep='\t', encoding = "ISO-8859-1", 
                                           header=None, names=event_export_header)
        
        events_v02_export_df = events_v02_export_df[use_cols_export]
        events_v02_export_df.columns = events_v02_export_df.columns.str.lower()
        events_v02_export_df['sqldate'] =  pd.to_datetime(events_v02_export_df['sqldate'], format='%Y%m%d')
        events_v02_export_df['dateadded'] = pd.to_datetime(events_v02_export_df['dateadded'], format='%Y%m%d%H%M%S')
        
        events_v02_export_df['actiongeo_countrycode_iso2'] = \
            events_v02_export_df['actiongeo_countrycode'].map(map_fips_to_iso2)
        
        events_v02_export_df['actiongeo_countrycode_iso3'] = \
            ['actiongeo_countrycode_iso2'].map(map_iso2_to_iso3)
        
        events_v02_export_df = events_v02_export_df[cols_order]

        # Fix issues in data
        events_v02_export_df['actiongeo_lat'] = events_v02_export_df['actiongeo_lat'].apply(clean_lat_long)
        events_v02_export_df['actiongeo_long'] = events_v02_export_df['actiongeo_long'].apply(clean_lat_long)
    
        for c in ['eventrootcode', 'eventbasecode', 'eventcode']:
            events_v02_export_df[c] = pd.to_numeric(events_v02_export_df[c], errors='coerce')

        for c in ['actiongeo_lat', 'actiongeo_long', 'goldsteinscale']:
            events_v02_export_df[c] = pd.to_numeric(events_v02_export_df[c], errors='coerce')
           
        # Drop records with any NULL entries in the following columns
        mask_drop_null_records = \
            events_v02_export_df[['eventrootcode', 'eventbasecode', 'eventcode', 'sourceurl']].isnull().any(axis=1)
        
        events_v02_export_df = events_v02_export_df[~mask_drop_null_records]
        
        # Convert data-types
        for c in ['eventrootcode', 'eventbasecode', 'eventcode']:
            events_v02_export_df[c] = events_v02_export_df[c].astype(np.int16)

        for c in ['actiongeo_lat', 'actiongeo_long', 'goldsteinscale']:
            events_v02_export_df[c] = events_v02_export_df[c].astype(np.float32)
            
        return events_v02_export_df, ''
    
    except:
        return pd.DataFrame(), gdelt_v02_events_link
    

def __parallel_fetch_events(gdelt_v02_events_link_list):
    
    events_v02_export_df_list = []
    links_with_errors_list = []
    
    for l in gdelt_v02_events_link_list:
        df_, links_with_errors = fetch_events_data(gdelt_v02_events_link=l)
    
        if not df_.empty:
            events_v02_export_df_list.append(df_)

        if links_with_errors:
            links_with_errors_list.append(links_with_errors)
        
    events_v02_export_df = pd.concat(events_v02_export_df_list, ignore_index=True)
    return events_v02_export_df, links_with_errors_list 


def clean_lat_long(x):
    if isinstance(x, str):
        return float(re.sub(LAT_LONG_REGEX, "", x))
    return x
    

def line_contains_only_digits(x):
    return x.isdecimal() if isinstance(x, str) else True

---
# FIPS country codes to ISO2 (Gdelt uses old standard - FIPS 10-4)
---

In [None]:
fips_to_iso2 = pd.read_csv(os.path.join(PATH_TO_PROJECT_X_REPO, "auxiliary_data", "countries_fips_to_iso2.csv"), 
                           keep_default_na=False, na_values='NULL')

fips_to_iso2 = fips_to_iso2.replace({'-': np.nan})

mask_countries_in_iso2_not_in_fips = fips_to_iso2['fips_10_4'].isnull()
print(f"\nCountries in ISO2/3 but not in FIPS 10-4:\n\n{fips_to_iso2[mask_countries_in_iso2_not_in_fips]}")

fips_to_iso2 = fips_to_iso2.dropna(subset=['fips_10_4'])
map_fips_to_iso2 = fips_to_iso2.set_index('fips_10_4')['iso2'].to_dict()
map_iso2_to_iso3 = fips_to_iso2.set_index('iso2')['iso3'].to_dict()
map_iso2_to_country_name = fips_to_iso2.set_index('iso2')['country_name'].to_dict()

print(fips_to_iso2.shape)
fips_to_iso2.head()

In [None]:
fips_to_iso2[fips_to_iso2.isnull().any(axis=1)]

---
# 1. GDELT v2 EVENTS (downloading 2020-2022)
---

In [None]:
event_export_header = [
    'GLOBALEVENTID', 'SQLDATE', 'MonthYear', 'Year', 'FractionDate', 'Actor1Code',
    'Actor1Name', 'Actor1CountryCode', 'Actor1KnownGroupCode', 'Actor1EthnicCode',
    'Actor1Religion1Code', 'Actor1Religion2Code', 'Actor1Type1Code', 'Actor1Type2Code',
    'Actor1Type3Code', 'Actor2Code', 'Actor2Name', 'Actor2CountryCode', 
    'Actor2KnownGroupCode', 'Actor2EthnicCode', 'Actor2Religion1Code', 
    'Actor2Religion2Code', 'Actor2Type1Code', 'Actor2Type2Code', 'Actor2Type3Code', 
    'IsRootEvent', 'EventCode', 'EventBaseCode', 'EventRootCode', 'QuadClass', 
    'GoldsteinScale', 'NumMentions', 'NumSources', 'NumArticles', 'AvgTone', 
    'Actor1Geo_Type', 'Actor1Geo_FullName', 'Actor1Geo_CountryCode', 'Actor1Geo_ADM1Code',
    'Actor1Geo_ADM2Code', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor1Geo_FeatureID', 
    'Actor2Geo_Type', 'Actor2Geo_FullName', 'Actor2Geo_CountryCode', 'Actor2Geo_ADM1Code',
    'Actor2Geo_ADM2Code', 'Actor2Geo_Lat', 'Actor2Geo_Long', 'Actor2Geo_FeatureID', 
    'ActionGeo_Type', 'ActionGeo_FullName', 'ActionGeo_CountryCode', 'ActionGeo_ADM1Code',
    'ActionGeo_ADM2Code', 'ActionGeo_Lat', 'ActionGeo_Long', 'ActionGeo_FeatureID', 
    'DATEADDED', 'SOURCEURL'
]

gkg_header = [
    'GKGRECORDID', 'DATE', 'SourceCollectionIdentifier', 'SourceCommonName', 
    'DocumentIdentifier', 'Counts', 'V2Counts', 'Themes', 'V2Themes', 'Locations', 
    'V2Locations', 'Persons', 'V2Persons', 'Organizations', 'V2Organizations', 'V2Tone', 
    'Dates', 'GCAM', 'SharingImage', 'RelatedImages', 'SocialImageEmbeds', 
    'SocialVideoEmbeds', 'Quotations', 'AllNames', 'Amounts', 'TranslationInfo',
    'Extras'
]

cameo_event_root_code = {
    '14': 'PROTESTS',
    '18': 'ASSAULT', 
    '19': 'FIGHT',
    '20': 'USE UNCONVENTIONAL MASS VIOLENCE'
}

cameo_event_base_code = {
    '140': 'Engage in political dissent, not specified below',
    '141': 'Demonstrate or rally, not specified below',
    '142': 'Conduct hunger strike, not specified below',
    '143': 'Conduct strike or boycott, not specified below',
    '144': 'Obstruct passage, block, not specified below', 
    '145': 'Protest violently, riot, not specified below',

    '180': 'Use unconventional violence, not specified below',
    '181': 'Abduct, hijack, or take hostage',
    '182': 'Physically assault, not specified below',
    '183': 'Conduct suicide, car, or other non-military bombing, not specified below',
    '184': 'Use as human shield',
    '185': 'Attempt to assassinate',
    '186': 'Assassinate',

    '190': 'Use conventional military force, not specified below',
    '191': 'Impose blockade, restrict movement',
    '192': 'Occupy territory',
    '193': 'Fight with small arms and light weapons',
    '194': 'Fight with artillery and tanks',
    '195': 'Employ aerial weapons, not specified below',
    '196': 'Violate ceasefire',

    '200': 'Use unconventional mass violence, not specified below',
    '201': 'Engage in mass expulsion',
    '202': 'Engage in mass killings',
    '203': 'Engage in ethnic cleansing',
    '204': 'Use weapons of mass destruction, not specified below' 
}

cameo_event_code = {
    '140': 'Engage in political dissent, not specified below',
    '141': 'Demonstrate or rally, not specified below',
    '1411': 'Demonstrate for leadership change',
    '1412': 'Demonstrate for policy change',
    '1413': 'Demonstrate for rights',
    '1414': 'Demonstrate for change in institutions, regime', 
    '142': 'Conduct hunger strike, not specified below',
    '1421': 'Conduct hunger strike for leadership change',  
    '1422': 'Conduct hunger strike for policy change',  
    '1423': 'Conduct hunger strike for rights',  
    '1424': 'Conduct hunger strike for change in institutions, regime',  
    
    '143': 'Conduct strike or boycott, not specified below',
    '1431': 'Conduct strike or boycott for leadership change',  
    '1432': 'Conduct strike or boycott for policy change',  
    '1433': 'Conduct strike or boycott for rights',  
    '1434': 'Conduct strike or boycott for change in institutions, regime',  
    
    '144': 'Obstruct passage, block, not specified below', 
    '1441': 'Obstruct passage to demand leadership change',  
    '1442': 'Obstruct passage to demand policy change', 
    '1443': 'Obstruct passage to demand rights', 
    '1444': 'Obstruct passage to demand change in institutions, regime',  
    
    '145': 'Protest violently, riot, not specified below',
    '1451': 'Engage in violent protest for leadership change',  
    '1452': 'Engage in violent protest for policy change',  
    '1453': 'Engage in violent protest for rights',  
    '1454': 'Engage in violent protest for change in institutions, regime', 
    
    '180': 'Use unconventional violence, not specified below',
    '181': 'Abduct, hijack, or take hostage',
    '182': 'Physically assault, not specified below',
    '1821': 'Sexually assault', 
    '1822': 'Torture', 
    '1823': 'Kill by physical assault', 
    
    '183': 'Conduct suicide, car, or other non-military bombing, not specified below',
    '1831': 'Carry out suicide bombing', 
    '1832': 'Carry out vehicular bombing', 
    '1833': 'Carry out roadside bombing', 
    '1834': 'Carry out location bombing', 
    
    '184': 'Use as human shield',
    '185': 'Attempt to assassinate',
    '186': 'Assassinate',

    '190': 'Use conventional military force, not specified below',
    '191': 'Impose blockade, restrict movement',
    '192': 'Occupy territory',
    '193': 'Fight with small arms and light weapons',
    '194': 'Fight with artillery and tanks',
    '195': 'Employ aerial weapons, not specified below',
    '1951': 'Employ precision-guided aerial munitions', 
    '1952': 'Employ remotely piloted aerial munitions', 
    '196': 'Violate ceasefire',

    '200': 'Use unconventional mass violence, not specified below',
    '201': 'Engage in mass expulsion',
    '202': 'Engage in mass killings',
    '203': 'Engage in ethnic cleansing',
    '204': 'Use weapons of mass destruction, not specified below', 
    '2041': 'Use chemical, biological, or radiological weapons',  
    '2042': 'Detonate nuclear weapons'
}

### Select period we are going to fetch

In [None]:
start = pd.to_datetime('2022-07-01 00:00:00')
end = pd.to_datetime('2022-10-01 00:00:00')

# start = pd.to_datetime('2020-01-01 00:00:00')
# end = pd.to_datetime('2022-01-20 09:00:00')

path_to_gdelt = os.path.join(PATH_TO_DATA_ROOT_DIR, 'data_providers/gdelt')

create_output_dir(path_to_gdelt)

---
# 1.1 GDELT v2 English
---

**It has the following structure:**
- xxx.translation.export.CSV.zip
- xxx.translation.mentions.CSV.zip
- xxx.translation.gkg.csv.zip

**Get all links to be fetched from Gdelt v2 for a given period of time**

In [None]:
%%time

links_covering_period_en_gdelt_v2 = get_links_for_time_range(start=start, end=end, multilingua_data=False)

> - All Gdelt links (3 feeds: events, mentions, knowledge graph): 711474
    - How many records per each feed:{'gkg': 237143, 'export': 237138, 'mentions': 237138}

**Download data for 1 datetime**

In [None]:
mask_events = links_covering_period_en_gdelt_v2['feed_type'] == 'export'

events_v02_en_export_df_1, errors = \
    fetch_events_data(gdelt_v02_events_link=links_covering_period_en_gdelt_v2.loc[
        mask_events, 'url'].sample(1, random_state=13378).iloc[0])

events_v02_en_export_df_1

### Parallel processing

In [None]:
%%time

path_to_gdelt = os.path.join(PATH_TO_DATA_ROOT_DIR, 'data_providers/gdelt')
fn = f'events_v02_en_export_df_{str(start.date())}_{str(end.date())}.parquet'
create_output_dir(path_to_gdelt)
    
re_fetch_data = False

if not os.path.exists(os.path.join(path_to_gdelt, fn)) or re_fetch_data:
    print(f"Fetching GDELT data")
    mask_events = links_covering_period_en_gdelt_v2['feed_type'] == 'export'
    temp = parallelize(data=links_covering_period_en_gdelt_v2.loc[mask_events, 'url'].to_list(),
                       func=__parallel_fetch_events)

    events_v02_en_export_df_list = []
    links_with_errors_list = []

    for l in temp:
        events_v02_en_export_df_list.append(l[0])
        links_with_errors_list.append(l[1])

    events_v02_en_export_df = pd.concat(events_v02_en_export_df_list, ignore_index=True)   
    links_with_errors_list = list(itertools.chain(*links_with_errors_list))

    print(f"Number of Gdelt events for time range [{start}: {end}]: {events_v02_en_export_df.shape[0]}")
    print(f"Number of broken links: {len(links_with_errors_list)}")

    assert events_v02_en_export_df['globaleventid'].nunique() == events_v02_en_export_df.shape[0],\
        "globaleventid is not unique"
    
    # Saving to parquet    
    print(f"\nSaving fetched GDELT data to {os.path.join(path_to_gdelt, fn)}")
    events_v02_en_export_df.to_parquet(path=os.path.join(path_to_gdelt, fn), engine='auto', compression='snappy')
else:
    print(f"Reading GDELT data from {os.path.join(path_to_gdelt, fn)}")
    events_v02_en_export_df = pd.read_parquet(os.path.join(path_to_gdelt, fn),
                                              engine='auto',
                                              columns=None)

- [15:47:35] INFO generic-utils: Splitting input data into 96 batches
- [15:47:35] INFO generic-utils: Starting parallel processing using 96 CPU
- Number of Gdelt events for time range [2020-01-01 00:00:00: 2022-01-20 09:00:00]: 86160966
- Number of broken links: 1
- CPU times: user 42.2 s, sys: 32.1 s, total: 1min 14s
- Wall time: 2min 31s

**Check for duplicates**

In [None]:
%%time

duplicates_sourceurl = events_v02_en_export_df.groupby('sourceurl').agg(
    {'globaleventid': ['count'],
     'actiongeo_countrycode': [lambda x: x.value_counts().to_dict(), set],
     'eventcode': [lambda x: x.value_counts().to_dict(), set]}).reset_index()

duplicates_sourceurl.columns = ["_".join(filter(lambda col: col, col)) for col in duplicates_sourceurl.columns.ravel()]
duplicates_sourceurl.rename(columns={'globaleventid_count': 'count',
                                     'actiongeo_countrycode_<lambda_0>': 'actiongeo_countrycode_top5',
                                     'eventcode_<lambda_0>': 'eventcode_top5'}, inplace=True)

duplicates_sourceurl = duplicates_sourceurl[duplicates_sourceurl['count'] > 1]
duplicates_sourceurl = duplicates_sourceurl.sort_values('count', ascending=False).reset_index(drop=True)
duplicates_sourceurl.head()

- Wall time: 7min 59s

In [None]:
events_v02_en_export_df[
    events_v02_en_export_df['sourceurl'] == duplicates_sourceurl['sourceurl'].iloc[0]].sort_values(
    ['sourceurl', 'eventcode'])

**Global coverage of Gdelt events**

In [None]:
count = events_v02_en_export_df['actiongeo_countrycode_iso3'].value_counts().reset_index()
count = count.rename(columns={'actiongeo_countrycode_iso3': 'n_records', 'index': 'actiongeo_countrycode_iso3'})

fig = px.choropleth(count,
                    locations='actiongeo_countrycode_iso3',
                    color='n_records',
                    hover_name='n_records',
                    title=f'Countries covered in Gdelt v2 events table ({start.date()} - {end.date()})',
                    projection="natural earth",  # "natural earth", "mercator", "orthographic"
                    range_color=[count['n_records'].quantile(0.25), count['n_records'].quantile(0.995)])

fig.show()

### Selecting CAMEO codes for Base Ops

In [None]:
selected_cameo_codes = events_v02_en_export_df[events_v02_en_export_df['eventrootcode'].isin([14, 18, 19, 20])]

selected_cameo_codes['eventcode_str'] = selected_cameo_codes['eventcode'].astype(str).map(cameo_event_code)
selected_cameo_codes['eventbasecode_str'] = selected_cameo_codes['eventbasecode'].astype(str).map(cameo_event_base_code)
selected_cameo_codes['eventrootcode_str'] = selected_cameo_codes['eventrootcode'].astype(str).map(cameo_event_root_code)

print(selected_cameo_codes.shape)
selected_cameo_codes.head(2)

In [None]:
%%time

selected_cameo_codes_duplicates_sourceurl = selected_cameo_codes.groupby('sourceurl').agg(
    {'globaleventid': ['count'],
     'dateadded': ['min', 'max', set],
     'actiongeo_countrycode': [lambda x: x.value_counts().to_dict(), set],
     'eventbasecode_str': [lambda x: " | ".join(set(x))],
     'eventrootcode_str': [lambda x: " | ".join(set(x))],
     'eventcode': [lambda x: x.value_counts().to_dict(), set]}).reset_index()

selected_cameo_codes_duplicates_sourceurl.columns = \
    ["_".join(filter(lambda col: col, col)) for col in selected_cameo_codes_duplicates_sourceurl.columns.ravel()]

selected_cameo_codes_duplicates_sourceurl.rename(
    columns={'globaleventid_count': 'n_records_with_same_url',
             'actiongeo_countrycode_<lambda_0>': 'actiongeo_countrycode_top5',
             'eventbasecode_str_<lambda>': 'unique_eventbasecode_str_for_same_url',
             'eventrootcode_str_<lambda>': 'unique_eventrootcode_str_for_same_url',
             'eventcode_<lambda_0>': 'eventcode_top5'}, inplace=True)

selected_cameo_codes_duplicates_sourceurl = selected_cameo_codes_duplicates_sourceurl.sort_values(
    'n_records_with_same_url', ascending=False).reset_index(drop=True)

selected_cameo_codes_duplicates_sourceurl.head()

In [None]:
selected_cameo_codes['eventrootcode_str'].value_counts(normalize=True, dropna=False)

In [None]:
selected_cameo_codes['eventbasecode_str'].value_counts(normalize=True, dropna=False)

In [None]:
selected_cameo_codes['eventcode_str'].value_counts(normalize=True, dropna=False)

### Preparing dataset for Quality Check

In [None]:
selected_cameo_codes.groupby('sourceurl')['eventcode'].nunique().loc[lambda x: x > 1].sort_values(ascending=False)

In [None]:
selected_cameo_codes_for_check = selected_cameo_codes.drop_duplicates('sourceurl').groupby(
    ['eventcode']).sample(20, replace=True, random_state=1337)

selected_cameo_codes_for_check = selected_cameo_codes_for_check.drop_duplicates(
    subset=['eventcode', 'actiongeo_countrycode_iso2'])

selected_cameo_codes_for_check = selected_cameo_codes_for_check.sort_values(
    ['eventrootcode', 'dateadded']).reset_index(drop=True)

selected_cameo_codes_for_check['actiongeo_country_name'] = \
    selected_cameo_codes_for_check['actiongeo_countrycode_iso2'].map(map_iso2_to_country_name)

cols_of_interest = [
    'globaleventid', 'dateadded', 'actiongeo_country_name', 'eventrootcode', 'eventbasecode', 
    'eventcode', 'eventrootcode_str', 'eventbasecode_str', 'eventcode_str', 'sourceurl',
    # 'actiongeo_countrycode_iso2', 
]

selected_cameo_codes_for_check = selected_cameo_codes_for_check[cols_of_interest]

print(selected_cameo_codes_for_check.shape[0])
selected_cameo_codes_for_check.head()

In [None]:
selected_cameo_codes_for_check['eventcode_str'].value_counts(normalize=True, dropna=False)

In [None]:
selected_cameo_codes_for_check['eventbasecode_str'].value_counts(normalize=True, dropna=False)

In [None]:
selected_cameo_codes_for_check_final = selected_cameo_codes_for_check.merge(
    selected_cameo_codes_duplicates_sourceurl[['sourceurl', 'n_records_with_same_url', 'actiongeo_countrycode_set',
                                               'eventcode_set', 'unique_eventrootcode_str_for_same_url',
                                               'unique_eventbasecode_str_for_same_url']],
    how='left', on='sourceurl')

print(selected_cameo_codes_for_check_final.shape[0])
selected_cameo_codes_for_check_final.head()

save_file = False

if save_file:
    selected_cameo_codes_for_check_final.to_csv('selected_cameo_codes_for_check_v0.csv', index=False)

In [None]:
selected_cameo_codes_for_check_final.sort_values('n_records_with_same_url', ascending=False)

---
# 1.2 GDELT v2 Multi-lingua
---

**Get all links to be fetched from Gdelt v2 for a given period of time**

In [None]:
%%time

links_covering_period_ml_gdelt_v2 = get_links_for_time_range(start=start, end=end, multilingua_data=True)

**Download data for 1 datetime**

In [None]:
mask_events = links_covering_period_ml_gdelt_v2['feed_type'] == 'export'

events_v02_ml_export_df_1, errors = \
    fetch_events_data(gdelt_v02_events_link=links_covering_period_ml_gdelt_v2.loc[mask_events, 'url'].sample(
        1, random_state=13378).iloc[0])

events_v02_ml_export_df_1

### Parallel processing

In [None]:
%%time

path_to_gdelt = os.path.join(PATH_TO_DATA_ROOT_DIR, 'data_providers/gdelt')
fn = f'events_v02_ml_export_df_{str(start.date())}_{str(end.date())}.parquet'
create_output_dir(path_to_gdelt)
    
re_fetch_data = False

if not os.path.exists(os.path.join(path_to_gdelt, fn)) or re_fetch_data:
    print(f"Fetching GDELT data")
    mask_events = links_covering_period_ml_gdelt_v2['feed_type'] == 'export'
    temp = parallelize(data=links_covering_period_ml_gdelt_v2.loc[mask_events, 'url'].to_list(),
                       func=__parallel_fetch_events)

    events_v02_ml_export_df_list = []
    links_with_errors_list = []

    for l in temp:
        events_v02_ml_export_df_list.append(l[0])
        links_with_errors_list.append(l[1])

    events_v02_ml_export_df = pd.concat(events_v02_ml_export_df_list, ignore_index=True)   
    links_with_errors_list = list(itertools.chain(*links_with_errors_list))

    print(f"Number of Gdelt events for time range [{start}: {end}]: {events_v02_ml_export_df.shape[0]}")
    print(f"Number of broken links: {len(links_with_errors_list)}")

    assert events_v02_ml_export_df['globaleventid'].nunique() == events_v02_ml_export_df.shape[0], \
        "globaleventid is not unique"
    
    # Saving to parquet    
    print(f"\nSaving fetched GDELT data to {os.path.join(path_to_gdelt, fn)}")
    events_v02_ml_export_df.to_parquet(path=os.path.join(path_to_gdelt, fn), engine='auto', compression='snappy')
else:
    print(f"Reading GDELT data from {os.path.join(path_to_gdelt, fn)}")
    events_v02_ml_export_df = pd.read_parquet(os.path.join(path_to_gdelt, fn),
                                              engine='auto',
                                              columns=None)

- Fetching GDELT data
- [21:55:12] INFO generic-utils: Splitting input data into 96 batches
- [21:55:12] INFO generic-utils: Starting parallel processing using 96 CPU
- Number of Gdelt events for time range [2020-01-01 00:00:00: 2022-01-20 09:00:00]: 41909854
- Number of broken links: 0
- Saving fetched GDELT data to /home/sergii/workspace/data/data_providers/gdelt/events_v02_ml_export_df_2020-01-01_2022-01-20.parquet
- CPU times: user 54.4 s, sys: 21.9 s, total: 1min 16s
- Wall time: 2min 43s

### Loading data to RDS

**Check for duplicates**

In [None]:
%%time

duplicates_sourceurl_ml = events_v02_ml_export_df.groupby('sourceurl').agg(
    {'globaleventid': ['count'],
     'actiongeo_countrycode': [lambda x: x.value_counts().to_dict(), set],
     'eventcode': [lambda x: x.value_counts().to_dict(), set]}).reset_index()

duplicates_sourceurl_ml.columns = \
    ["_".join(filter(lambda col: col, col)) for col in duplicates_sourceurl_ml.columns.ravel()]

duplicates_sourceurl_ml.rename(columns={'globaleventid_count': 'count',
                                        'actiongeo_countrycode_<lambda_0>': 'actiongeo_countrycode_top5',
                                        'eventcode_<lambda_0>': 'eventcode_top5'}, inplace=True)

duplicates_sourceurl_ml = duplicates_sourceurl_ml[duplicates_sourceurl_ml['count'] > 1]
duplicates_sourceurl_ml = duplicates_sourceurl_ml.sort_values('count', ascending=False).reset_index(drop=True)
duplicates_sourceurl_ml.head()

---
# 2. GDELT v2 GKG (downloading 2020-2022)
---

Global Knowledge Graph (GKG) - is updated every 15 minutes and based on global news reporting

In [None]:
def extract_themes_categories(x):
    if isinstance(x, str):
        if x:
            return [c for c in sorted(set([c.split(',')[0] for c in x.split(';')])) if c]
    return np.nan


def extract_categories_from_count(x):
    if isinstance(x, str):
        return sorted(m for m in set([c.split('#')[0] if c else c for c in x.split(';')]) if m)
    return np.nan


def fetch_gkg_data(gdelt_v02_gkg_link: str) -> pd.DataFrame:

    use_cols_gkg = [
        'GKGRECORDID', 'DATE', 'SourceCommonName', 'DocumentIdentifier', 'V2Counts', 'V2Themes',
        'V2Locations', 'Dates', 
        'TranslationInfo'

        # These we may want to use when training NER
        # 'V2Persons', 
        # 'V2Organizations',
        # 'SourceCollectionIdentifier', 
        # 'Quotations', 
        # 'AllNames', 
        # 'Amounts', 
    ]

    cols_order = [
        'gkgrecordid', 'date', 'sourcecommonname', 'documentidentifier', 'v2locations', 
        'v2themes_names', 'v2counts_names', 'dates', 'translationinfo'
    ]

    try:
        gkg_v02_df = pd.read_csv(gdelt_v02_gkg_link, sep='\t', encoding = "ISO-8859-1", header=None, names=gkg_header)
        gkg_v02_df = gkg_v02_df[use_cols_gkg]
        gkg_v02_df.columns = gkg_v02_df.columns.str.lower()
        gkg_v02_df['date'] = pd.to_datetime(gkg_v02_df['date'], format='%Y%m%d%H%M%S')
        gkg_v02_df['v2themes_names'] = gkg_v02_df['v2themes'].apply(extract_themes_categories)
        gkg_v02_df['v2counts_names'] = gkg_v02_df['v2counts'].apply(extract_categories_from_count)
        gkg_v02_df = gkg_v02_df[cols_order]
        return gkg_v02_df, ''

    except:
        return pd.DataFrame(), gdelt_v02_gkg_link
    
    
def __parallel_fetch_gkg(gdelt_v02_gkg_link_list):
    
    gkg_v02_df_list = []
    links_with_errors_list = []
    
    for l in gdelt_v02_gkg_link_list:
        df_, links_with_errors = fetch_gkg_data(gdelt_v02_gkg_link=l)
    
        if not df_.empty:
            gkg_v02_df_list.append(df_)

        if links_with_errors:
            links_with_errors_list.append(links_with_errors)
        
    gkg_v02_df = pd.concat(gkg_v02_df_list, ignore_index=True)
    return gkg_v02_df, links_with_errors_list 

### 2.2 GDELT v2 GKG English

In [None]:
links_covering_period_en_gdelt_v2 = get_links_for_time_range(start=start, end=end, multilingua_data=False)

In [None]:
mask_gkg = links_covering_period_en_gdelt_v2['feed_type'] == 'gkg'

gkg_v02_en_gkg_df_1, errors = \
    fetch_gkg_data(gdelt_v02_gkg_link=links_covering_period_en_gdelt_v2.loc[mask_gkg, 'url'].sample(
        1, random_state=3378).iloc[0])

gkg_v02_en_gkg_df_1

In [None]:
%%time

path_to_gdelt = os.path.join(PATH_TO_DATA_ROOT_DIR, 'data_providers/gdelt')
fn = f'gkg_v02_en_df_{str(start.date())}_{str(end.date())}.parquet'
create_output_dir(path_to_gdelt)
    
re_fetch_data = False

if not os.path.exists(os.path.join(path_to_gdelt, fn)) or re_fetch_data:
    print(f"Fetching GDELT data")
    mask_gkg = links_covering_period_en_gdelt_v2['feed_type'] == 'gkg'
    temp = parallelize(data=links_covering_period_en_gdelt_v2.loc[mask_gkg, 'url'].to_list(),
                       func=__parallel_fetch_gkg)
    
    print("Extracting pairs from processed data")
    gkg_v02_en_df_list = []
    links_with_errors_list = []

    for l in temp:
        gkg_v02_en_df_list.append(l[0])
        links_with_errors_list.append(l[1])

    print("Concatenating results")
    gkg_v02_en_df = pd.concat(gkg_v02_en_df_list, ignore_index=True)   
    links_with_errors_list = list(itertools.chain(*links_with_errors_list))

    print(f"Number of Gdelt gkg records for time range [{start}: {end}]: {gkg_v02_en_df.shape[0]}")
    print(f"Number of broken links: {len(links_with_errors_list)}")

    # assert gkg_v02_en_df['gkgrecordid'].nunique() == gkg_v02_en_df.shape[0], "gkgrecordid is not unique"
    
    # Saving to parquet    
    print(f"\nSaving fetched GDELT GKG EN data to {os.path.join(path_to_gdelt, fn)}")
    gkg_v02_en_df.to_parquet(path=os.path.join(path_to_gdelt, fn), engine='auto', compression='snappy')
else:
    print(f"Reading GDELT GKG EN data from {os.path.join(path_to_gdelt, fn)}")
    gkg_v02_en_df = pd.read_parquet(os.path.join(path_to_gdelt, fn),
                                    engine='auto',
                                    columns=None)

- Fetching GDELT data
- [01:12:17] INFO generic-utils: Splitting input data into 96 batches
- [01:12:17] INFO generic-utils: Starting parallel processing using 96 CPU
- Extracting pairs from processed data
- Concatenating results
- Number of Gdelt gkg records for time range [2020-01-01 00:00:00: 2022-01-20 09:00:00]: 101022990
- Number of broken links: 1
- 
- Saving fetched GDELT GKG EN data to /home/sergii/workspace/data/data_providers/gdelt/gkg_v02_en_df_2020-01-01_2022-01-20.parquet
- CPU times: user 27min 56s, sys: 14min 48s, total: 42min 45s
- Wall time: 49min 32s

In [None]:
gkg_v02_en_df.head(10)

In [None]:
gkg_v02_en_df['documentidentifier'].nunique()

In [None]:
gkg_v02_en_df['sourcecommonname'].nunique()

In [None]:
# gkg_v02_en_df.head(100).explode(['v2themes_names']).value_counts(normalize=True).head(10).round(3)*100.

In [None]:
largest_sources_gkg_v02_en = \
    (gkg_v02_en_df['sourcecommonname'].value_counts(normalize=True, dropna=False) * 100.).reset_index()

largest_sources_gkg_v02_en.rename(columns={'sourcecommonname': 'pct_url_covered', 'index': 'sourcecommonname'},
                                  inplace=True)

largest_sources_gkg_v02_en['pct_url_covered_cumulative'] = largest_sources_gkg_v02_en['pct_url_covered'].cumsum()

print(f"Number of News sources covering 90% of all URLs (EN): "
      f"{largest_sources_gkg_v02_en[largest_sources_gkg_v02_en['pct_url_covered_cumulative'] <= 90.0].shape[0]} "
      f"/ {largest_sources_gkg_v02_en.shape[0]}")

print(f"Number of News sources covering 95% of all URLs (EN): "
      f"{largest_sources_gkg_v02_en[largest_sources_gkg_v02_en['pct_url_covered_cumulative'] <= 95.0].shape[0]} "
      f"/ {largest_sources_gkg_v02_en.shape[0]}")

print(f"Number of News sources covering 99% of all URLs (EN): "
      f"{largest_sources_gkg_v02_en[largest_sources_gkg_v02_en['pct_url_covered_cumulative'] <= 99.0].shape[0]} "
      f"/ {largest_sources_gkg_v02_en.shape[0]}\n")

largest_sources_gkg_v02_en.head(100)

### 2.2 GDELT v2 GKG Multi-lingua

In [None]:
links_covering_period_ml_gdelt_v2 = get_links_for_time_range(start=start, end=end, multilingua_data=True)

In [None]:
mask_gkg = links_covering_period_ml_gdelt_v2['feed_type'] == 'gkg'

gkg_v02_ml_gkg_df_1, errors = \
    fetch_gkg_data(gdelt_v02_gkg_link=links_covering_period_ml_gdelt_v2.loc[mask_gkg, 'url'].sample(
        1, random_state=13378).iloc[0])

gkg_v02_ml_gkg_df_1

In [None]:
%%time

path_to_gdelt = os.path.join(PATH_TO_DATA_ROOT_DIR, 'data_providers/gdelt')
fn = f'gkg_v02_ml_df_{str(start.date())}_{str(end.date())}.parquet'
create_output_dir(path_to_gdelt)
    
re_fetch_data = True

if not os.path.exists(os.path.join(path_to_gdelt, fn)) or re_fetch_data:
    print(f"Fetching GDELT data")
    mask_gkg = links_covering_period_ml_gdelt_v2['feed_type'] == 'gkg'
    temp = parallelize(data=links_covering_period_ml_gdelt_v2.loc[mask_gkg, 'url'].to_list(),
                       func=__parallel_fetch_gkg)

    gkg_v02_ml_df_list = []
    links_with_errors_list = []

    for l in temp:
        gkg_v02_ml_df_list.append(l[0])
        links_with_errors_list.append(l[1])

    gkg_v02_ml_df = pd.concat(gkg_v02_ml_df_list, ignore_index=True)   
    links_with_errors_list = list(itertools.chain(*links_with_errors_list))

    print(f"Number of Gdelt gkg records for time range [{start}: {end}]: {gkg_v02_ml_df.shape[0]}")
    print(f"Number of broken links: {len(links_with_errors_list)}")

    # assert gkg_v02_ml_df['gkgrecordid'].nunique() == gkg_v02_ml_df.shape[0], "gkgrecordid is not unique"
    
    # Saving to parquet    
    print(f"\nSaving fetched GDELT GKG ML data to {os.path.join(path_to_gdelt, fn)}")
    gkg_v02_ml_df.to_parquet(path=os.path.join(path_to_gdelt, fn), engine='auto', compression='snappy')
else:
    print(f"Reading GDELT GKG ML data from {os.path.join(path_to_gdelt, fn)}")
    gkg_v02_ml_df = pd.read_parquet(os.path.join(path_to_gdelt, fn),
                                    engine='auto',
                                    columns=None)

- Fetching GDELT data
- [02:07:30] INFO generic-utils: Splitting input data into 96 batches
- [02:07:31] INFO generic-utils: Starting parallel processing using 96 CPU
- Number of Gdelt gkg records for time range [2020-01-01 00:00:00: 2022-01-20 09:00:00]: 164772020
- Number of broken links: 0
- 
- Saving fetched GDELT GKG ML data to /home/sergii/workspace/data/data_providers/gdelt/gkg_v02_ml_df_2020-01-01_2022-01-20.parquet
- CPU times: user 41min 21s, sys: 23min 50s, total: 1h 5min 11s
- Wall time: 1h 10min 43s

---
# 3 Gdelt V2 - 15 minutes update
---

**It has the following structure:**
- xxx.translation.export.CSV.zip
- xxx.translation.mentions.CSV.zip
- xxx.translation.gkg.csv.zip

In [None]:
def get_links_to_fetch_gdelt_v2_data(base_url):
    html = requests.get(base_url)
    htmlParse = BeautifulSoup(html.text, 'lxml')
    htmlParse.find_all()

    links = []
    for link in htmlParse.get_text().split('\n'):
        if link and 'http://' in link:
            links.append(link.split()[-1])
    return links

### 3.1. English

In [None]:
base_url = "http://data.gdeltproject.org/gdeltv2/lastupdate.txt"

en_links = get_links_to_fetch_gdelt_v2_data(base_url=base_url)
en_links

In [None]:
en_export_links = [l for l in en_links if 'export' in en_links]
mentions = [l for l in en_links if 'mentions' in en_links]
gkg = [l for l in en_links if 'gkg' in en_links]

**3.1.1 Event**

In [None]:
events_v02_en_export_15min = 'http://data.gdeltproject.org/gdeltv2/20220118204500.export.CSV.zip'

use_cols_en_export = [
    'GLOBALEVENTID', 'SQLDATE', 'EventCode', 'EventBaseCode', 'EventRootCode',
    'GoldsteinScale', 'ActionGeo_FullName', 'ActionGeo_CountryCode', 
    'ActionGeo_Lat', 'ActionGeo_Long', 'DATEADDED', 'SOURCEURL'
]

events_v02_en_export_df_15min = pd.read_csv(events_v02_en_export_15min, sep='\t', encoding = "ISO-8859-1",
                                            header=None, names=event_export_header)

events_v02_en_export_df_15min = events_v02_en_export_df_15min[use_cols_en_export]
events_v02_en_export_df_15min.columns = events_v02_en_export_df_15min.columns.str.lower()

print(events_v02_en_export_df_15min.shape)
events_v02_en_export_df_15min.head(2)

In [None]:
events_v02_en_export_df_15min[events_v02_en_export_df_15min['eventrootcode'].isin([14, 18, 19, 20])]

**3.1.2 GKG**

In [None]:
events_v02_en_gkg_15min = 'http://data.gdeltproject.org/gdeltv2/20220118204500.gkg.csv.zip'

use_cols_en_gkg = [
    'GKGRECORDID', 'DATE', 'SourceCommonName', 'SourceCollectionIdentifier', 'DocumentIdentifier', 'V2Counts', 'V2Themes',
    'V2Locations', 'V2Persons', 'V2Organizations', 'Dates', 'Quotations', 'AllNames', 'Amounts', 
    'TranslationInfo'
]

events_v02_en_gkg_df_15min = pd.read_csv(events_v02_en_gkg_15min, sep='\t', encoding = "ISO-8859-1",
                                         header=None, names=gkg_header)

# events_v02_en_gkg_df_15min = events_v02_en_gkg_df_15min[use_cols_en_gkg]
# events_v02_en_gkg_df_15min.columns = events_v02_en_gkg_df_15min.columns.str.lower()

# print(events_v02_en_gkg_df_15min.shape)
events_v02_en_gkg_df_15min.head(2)

In [None]:
# events_v02_en_gkg_df_15min[['Counts', 'V2Counts']].head(1).to_dict()

In [None]:
# events_v02_en_gkg_df_15min[['Themes', 'V2Themes']].head(1).to_dict()

In [None]:
# events_v02_en_gkg_df_15min[['Persons', 'V2Persons']].head(1).to_dict()

In [None]:
# events_v02_en_gkg_df_15min[['Organizations', 'V2Organizations']].head(1).to_dict()

**3.1.3 Events vs GKG**

In [None]:
len(set(events_v02_en_gkg_df_15min['documentidentifier']).intersection(set(events_v02_en_export_df_15min['sourceurl'])))

In [None]:
merged_df = events_v02_en_export_df_15min.merge(events_v02_en_gkg_df_15min,
                                                how='left',
                                                left_on='sourceurl',
                                                right_on='documentidentifier')
merged_df

In [None]:
merged_df[merged_df['eventrootcode'].isin([18])]

### 3.2 Translingual [60+ languages]

In [None]:
base_url = "http://data.gdeltproject.org/gdeltv2/lastupdate-translation.txt"

ml_links = get_links_to_fetch_gdelt_v2_data(base_url=base_url)
ml_links

In [None]:
events_v02_multilingua_export = 'http://data.gdeltproject.org/gdeltv2/20220118181500.translation.export.CSV.zip'

events_v02_multilingua_export_df = pd.read_csv(events_v02_multilingua_export, sep='\t', encoding = "ISO-8859-1",
                                               header=None, names=event_export_header)

events_v02_multilingua_export_df.columns = events_v02_multilingua_export_df.columns.str.lower()
print(events_v02_multilingua_export_df.shape)
events_v02_multilingua_export_df.head(2)

In [None]:
# events_v02_multilingua_export_df[events_v02_multilingua_export_df['eventrootcode'].isin([14, 18, 19, 20])]
events_v02_multilingua_export_df[events_v02_multilingua_export_df['eventrootcode'].isin([19])]

In [None]:
events_v02_multilingua_gkg = 'http://data.gdeltproject.org/gdeltv2/20220118181500.translation.gkg.csv.zip'

events_v02_multilingua_gkg_df = pd.read_csv(events_v02_multilingua_gkg, sep='\t', encoding = "ISO-8859-1",
                                            header=None, names=gkg_header)

print(events_v02_multilingua_gkg_df.shape)
events_v02_multilingua_gkg_df.head(2)

In [None]:
filename = '/tmp/20220118180000.gkg.csv'
gdelt = pd.read_csv(filename, sep='\t', encoding = "ISO-8859-1", header=None)
gdelt

### 3.3 How it was implemented by Jad (Optional cell)

In [None]:
def down():
    """This function scrapes the GDELT website and get the latest 15 min update file"""
    base_url = "http://data.gdeltproject.org/gdeltv2/lastupdate.txt"
    html = requests.get(base_url)
    htmlParse = BeautifulSoup(html.text, 'lxml')

    gkg_download_link = htmlParse.find_all("p")[0].get_text().split('\n')[2].split(' ')[-1]
    zip_link = gkg_download_link
    response = requests.get(zip_link, stream=True)
    place = '/tmp/gdelt_15_min.zip'
    f = open(place, "wb+")
    for chunk in response.iter_content(chunk_size=512):
        if chunk:
            f.write(chunk)
    f.close()
    with zipfile.ZipFile(place, 'r') as zip_ref:
        zip_ref.extractall('/tmp/')

In [None]:
# a = down()