In [12]:
import pandas as pd
import numpy as np


pd.set_option('display.max_colwidth', None)  # Show full content in each cell
pd.set_option('display.max_rows', None)      # Show all rows
pd.set_option('display.max_columns', None)   # Show all columns


sourceDf = pd.read_csv("./data/Europe-Central-Asia_2018-2024_Sep27.csv")
sourceDf["notes"] = sourceDf["notes"].str.strip()
unique_sub_event = sourceDf['sub_event_type'].unique()
unique_sub_event = np.sort(unique_sub_event)

non_war_related_event  = [
    'Agreement',
    'Arrests',
    'Mob violence',
    'Excessive force against protesters',
    'Peaceful protest',
    'Protest with intervention',
    'Sexual violence',
    'Violent demonstration',
    'Looting/property destruction'
]

war_related_event =  [event for event in unique_sub_event if event not in non_war_related_event]
project_related_event = [
    'Armed clash',
    'Shelling/artillery/missile attack',
    'Disrupted weapons use',
    'Air/drone strike',
    'Government regains territory'
]

ukraine_russia_events = sourceDf[(sourceDf["country"] == "Ukraine") | (sourceDf["country"] == "Russia")]
ukraine_war_events = ukraine_russia_events[ukraine_russia_events["sub_event_type"].isin(project_related_event)]

count_by_event_type = ukraine_war_events.groupby("sub_event_type").size()
print(count_by_event_type)

sub_event_type
Air/drone strike                      25235
Armed clash                           49467
Disrupted weapons use                  9187
Government regains territory            350
Shelling/artillery/missile attack    102879
dtype: int64


In [7]:
import warnings
# if u hate warnings

# Suppress all warnings
warnings.filterwarnings("ignore")

## Actor Nationality

In [13]:
def get_actor_ntnlty(ukraine_war_events):
    old_col = set(ukraine_war_events.columns)
    ukraine_war_events['event_date'] = pd.to_datetime(ukraine_war_events['event_date'])
    ukraine_war_events['actor1_ntnlty'] = np.where(ukraine_war_events['actor1'].str.upper().str.contains('RUSSIA'), 'RUSSIA',
                                           np.where(ukraine_war_events['actor1'].str.upper().str.contains('UKRAINE'), 'UKRAINE', 'OTHER'))
    ukraine_war_events['actor2_ntnlty'] = np.where(ukraine_war_events['actor2'].str.upper().str.contains('RUSSIA'), 'RUSSIA',
                                            np.where(ukraine_war_events['actor2'].str.upper().str.contains('UKRAINE'), 'UKRAINE', 'OTHER'))

    ukraine_war_events['actor1_ntnlty'] = np.where(ukraine_war_events['actor1'].isna(), 'NO_ACTOR', ukraine_war_events['actor1_ntnlty'])
    ukraine_war_events['actor2_ntnlty'] = np.where(ukraine_war_events['actor2'].isna(), 'NO_ACTOR', ukraine_war_events['actor2_ntnlty'])
    # hard coded
    ukraine_war_events.loc[ukraine_war_events['actor1'] == 'Atesh', 'actor1_ntnlty'] = 'UKRAINE'
    ukraine_war_events.loc[ukraine_war_events['actor1'] == 'NAF: United Armed Forces of Novorossiya', 'actor1_ntnlty'] = 'RUSSIA'
    ukraine_war_events.loc[ukraine_war_events['actor1'] == 'Wagner Group', 'actor1_ntnlty'] = 'RUSSIA'
    ukraine_war_events.loc[ukraine_war_events['actor1'] == 'Right Sector', 'actor1_ntnlty'] = 'UKRAINE'

    ukraine_war_events.loc[ukraine_war_events['actor2'] == 'NAF: United Armed Forces of Novorossiya', 'actor2_ntnlty'] = 'RUSSIA'
    ukraine_war_events.loc[ukraine_war_events['actor2'] == 'Militia (Kharachinsky)  ', 'actor2_ntnlty'] = 'RUSSIA'
    ukraine_war_events.loc[ukraine_war_events['actor2'] == 'Wagner Group', 'actor2_ntnlty'] = 'RUSSIA'
    ukraine_war_events.loc[ukraine_war_events['actor2'] == 'Right Sector', 'actor2_ntnlty'] = 'UKRAINE'
    new_col = set(ukraine_war_events.columns)
    add_col = new_col-old_col
    print(f'added {len(add_col)} columns: {add_col}')
    return ukraine_war_events

In [14]:
ukraine_war_events = get_actor_ntnlty(ukraine_war_events)

added 2 columns: {'actor2_ntnlty', 'actor1_ntnlty'}


In [17]:
ukraine_war_events[['actor1', 'actor1_ntnlty', 'actor2', 'actor2_ntnlty']].head(10)

Unnamed: 0,actor1,actor1_ntnlty,actor2,actor2_ntnlty
70,Military Forces of Russia (2000-) Chechen Battalion of Ramzan Kadyrov,RUSSIA,Military Forces of Ukraine (2019-),UKRAINE
71,Military Forces of Russia (2000-),RUSSIA,Military Forces of Ukraine (2019-) Air Force,UKRAINE
72,Military Forces of Russia (2000-),RUSSIA,Military Forces of Ukraine (2019-) Air Force,UKRAINE
73,Military Forces of Ukraine (2019-) Air Force,UKRAINE,,NO_ACTOR
74,Military Forces of Ukraine (2019-) Air Force,UKRAINE,Civilians (Russia),RUSSIA
76,Military Forces of Ukraine (2019-) Air Force,UKRAINE,Civilians (Russia),RUSSIA
77,Military Forces of Ukraine (2019-) Air Force,UKRAINE,Civilians (Russia),RUSSIA
78,Military Forces of Ukraine (2019-) Air Force,UKRAINE,Civilians (Russia),RUSSIA
79,Military Forces of Ukraine (2019-) Air Force,UKRAINE,,NO_ACTOR
80,Military Forces of Ukraine (2019-) Air Force,UKRAINE,Civilians (Russia),RUSSIA


## Armed Clash

In [25]:
armed_clash_df = ukraine_war_events[ukraine_war_events['sub_event_type'] == 'Armed clash']

In [26]:
def armed_clash_extraction(armed_clash_df):
    old_col = set(armed_clash_df.columns)

    armed_clash_df['is_supported'] = armed_clash_df['notes'].str.lower().str.contains('supported')
    armed_clash_df['is_shelling'] = armed_clash_df['notes'].str.lower().str.contains('shelling')

    # extract number of item losses
    # can't guaranteee it's the loss of which side

    # dont need this anymore, these data can be found on the spreadsheet/ do it just as a back-up
    keywords = ['vehicle', 'armored_vehicle', 'car', 'boat', 'drone', 'cannon', 'uav', 'mortar']
    # create columns
    for e in keywords:
        armed_clash_df[f'{e}_losses'] = 0

    # equipment losses
    for idx, r in armed_clash_df.iterrows():
        notes = r.notes.lower()
        tmp_list = notes.split(' ')
        for i in range(len(tmp_list)):
            for e in keywords: 
                if e in tmp_list[i]: # keyword detected
                    if tmp_list[i-1] == 'armored' and 'vehicle' in tmp_list[i]: # armored vehicle detected
                        item = 'armored_vehicle'
                    else:
                        item = e
                    n_loss = 0
                    for j in range(1,4): # moves back 4 idx
                        if tmp_list[i-j].isdigit():
                            n_loss = int(tmp_list[i-j])
                            break
                        elif tmp_list[i-j] in ['a', 'an']:
                            n_loss = 1
                            break
                    armed_clash_df.at[idx, f'{item}_losses'] += n_loss

    # supported
    support_types = ['air_forces', 'artillery', 'aviation', 'air_units']
    # ensure if this is close by the word 'supported, have to perform indexing
    # create columns
    for e in support_types:
        armed_clash_df[f'{e}_supported'] = False

    for idx, r in armed_clash_df[armed_clash_df['is_supported'] == True].iterrows():
        notes = r.notes.lower()
        tmp_list = notes.split(' ')
        for i in range(len(tmp_list)):
            if tmp_list[i] == 'supported':
                quota = min(5, len(tmp_list)-(i+1)) # scanning next 5 words
                pointer = i+1
                while quota > 0:
                    if 'artillery' in tmp_list[pointer]:
                        armed_clash_df.at[idx, f'artillery_supported'] = True
                        quota = min(5, len(tmp_list)-(pointer+1))
                    elif 'aviation' in tmp_list[pointer]:
                        armed_clash_df.at[idx, f'aviation_supported'] = True
                        quota = min(5, len(tmp_list)-(pointer+1))
                    elif 'air' in tmp_list[pointer] and 'force' in tmp_list[pointer+1]:
                        armed_clash_df.at[idx, 'air_forces_supported'] = True
                        quota = min(5, len(tmp_list)-(pointer+1))
                    elif 'air' in tmp_list[pointer] and 'unit' in tmp_list[pointer+1]:
                        armed_clash_df.at[idx, 'air_units_supported'] = True
                        quota = min(5, len(tmp_list)-(pointer+1))
                    else:
                        quota -= 1
                    pointer += 1
    
    new_col = set(armed_clash_df.columns)
    add_col = new_col-old_col
    print(f'added {len(add_col)} columns: {add_col}')
    return armed_clash_df


In [27]:
armed_clash_df = armed_clash_extraction(armed_clash_df)

added 14 columns: {'artillery_supported', 'drone_losses', 'armored_vehicle_losses', 'air_units_supported', 'cannon_losses', 'mortar_losses', 'car_losses', 'uav_losses', 'boat_losses', 'vehicle_losses', 'is_shelling', 'air_forces_supported', 'is_supported', 'aviation_supported'}


In [34]:
# to_show = ['artillery_supported', 'drone_losses', 'armored_vehicle_losses', 'air_units_supported', 'cannon_losses', 'mortar_losses', 'car_losses', 'uav_losses', 'boat_losses', 'vehicle_losses', 'is_shelling', 'air_forces_supported', 'is_supported', 'aviation_supported']
# to_show += ['notes']
# for c in to_show:
#     if 'supported' in c:
#         print(c)
#         tmp_df = armed_clash_df[armed_clash_df[c] == True]['notes'].head(3)
#         print(tmp_df)
#         print()


## Shelling/artillery/missile attack

In [32]:
attack_df = ukraine_war_events[ukraine_war_events['sub_event_type'] == 'Shelling/artillery/missile attack']

In [33]:
def shelling_extraction(attack_df):
    # does {word} exist?
    attack_keywords = ['shelling', 'artillery', 'missile', 'mortar']

    # indicator to stop searching for missile types
    stoppers = ['launched', 'out', 'fired', 'ukrain', 'russia', 'likely', 'with', 'mortar', 'artillery', 'suspected', 'intercepted', 'conducted']
    articles = ['a', 'an', 'the', 'one', 'two', 'three', 'some']

    # type of missile

    for e in attack_keywords:
        attack_df[f'{e}_flag'] = False

    attack_df['missile_type'] = ''

    for idx, r in attack_df.iterrows():
        tmp_list = r.notes.lower().split(' ')
        i = 0
        for p in tmp_list:
            for e in attack_keywords:
                if e in p:
                    attack_df.at[idx, f'{e}_flag'] = True
                    if e == 'missile' and p != 'shelling/artillery/missile':
                        # try search for model
                        n_lookback = 5
                        model = ''
                        for j in range(1, n_lookback+1):
                            curr_word = tmp_list[i-j]
                            stop_flag = False
                            for stp in stoppers:
                                if stp in curr_word:
                                    stop_flag = True
                            if stop_flag == True or curr_word in articles or curr_word.isdigit():
                                break
                            else:
                                model = curr_word + ' ' + model
                        if model[:4] == 'and ': model = model[4:]
                        attack_df.at[idx, 'missile_type'] = model.replace("""'""",'').strip()
            i += 1
    missile_catgs = ['anti-tank', 'anti-aircraft', 'anti-ship', 'anti-air', 'anti-radar']
    for e in missile_catgs:
        attack_df[e] = attack_df['missile_type'].str.contains(e) | attack_df['missile_type'].str.contains(e.replace('-', ''))
    # last filter 
    missile_topkey = ['grad', 'himars', 's-300', 'grad', 'uragan', 'iskander','c-300', 'mlrs', 'ballistic', 'tochka', 'smerch', 'kalibr', 'high-precision', 'storm shadow']
    for e in missile_topkey:
        attack_df[e.replace(' ', '-')] = attack_df['missile_type'].str.contains(e)
    
    return attack_df

In [35]:
attack_df = shelling_extraction(attack_df)

In [37]:
# attack_df.head(50)

In [38]:
# attack_df[attack_df['missile_flag'] == True].head(50)

## Disrupted Weapon Used

In [41]:
disrupt_df = ukraine_war_events[ukraine_war_events['sub_event_type'] == 'Disrupted weapons use']

In [39]:
def disrupt_extraction(disrupt_df):
    # mine only drone's model
    objects = ['drone', 'kamikaze-drone', 'air-bomb', 'missile', 'uxo', 'artillery-shell', 'aviation-bomb', 'ballistic', 'missile', 'bomb', 'explosive', 'glider-bomb']
    int_stoppers = ['down', 'intercepted', 'defused', 'neutralized', 'destroyed']
    lnch_stoppers = ['launched']
    universal_stopper = ['and']
    word_to_num = {
        "zero": 0, "one": 1, "two": 2, "three": 3, "four": 4,
        "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9,
        "ten": 10
    }

    for e in objects:
        disrupt_df[f'n_{e}_launched'] = 0
        disrupt_df[f'flag_{e}_launched'] = False
        disrupt_df[f'n_{e}_intercepted'] = 0
        disrupt_df[f'flag_{e}_intercepted'] = False

    for idx, r in disrupt_df.iterrows():
        tmp_list = r.notes.lower().split(' ')
        for i in range(len(tmp_list)):
            cond = False
            for obj in objects:
                cond = cond | (obj in tmp_list[i])
            # corner cases
            if i+1 <= len(tmp_list)-1:
                cond = cond | ('air' in tmp_list[i] and 'bomb' in tmp_list[i+1])
                cond = cond | ('artillery' in tmp_list[i] and 'shell' in tmp_list[i+1])
                cond = cond | ('aviation' in tmp_list[i] and 'bomb' in tmp_list[i+1])
                cond = cond | ('glider' in tmp_list[i] and 'bomb' in tmp_list[i+1])
            if cond == True:
                curr_col = ''
                for obj in objects:
                    if (obj in tmp_list[i]):
                        curr_col = obj
                if i+1 <= len(tmp_list)-1:
                    if ('artillery' in tmp_list[i] and 'shell' in tmp_list[i+1]):
                        curr_col = 'artillery-shell'
                    elif ('aviation' in tmp_list[i] and 'bomb' in tmp_list[i+1]):
                        curr_col = 'aviation-bomb'
                    elif ('glider' in tmp_list[i] and 'bomb' in tmp_list[i+1]):
                        curr_col = 'glider-bomb'
                    elif ('air' in tmp_list[i] and 'bomb' in tmp_list[i+1]):
                        curr_col = 'air-bomb'
                
                # lookback
                n_limit = 1
                n_max = 5
                num = 0
                while n_limit <= n_max: # wasteful lookback
                    if i-n_limit < 0: break
                    curr_word = tmp_list[i-n_limit]
                    if curr_word.isdigit() and num == 0: # avoid reassigning
                        num = int(curr_word)
                    elif curr_word in word_to_num and num == 0:
                        num = word_to_num[curr_word]
                    elif curr_word in ['a', 'an'] and num == 0:
                        num = 1
                
                    # verbs
                    elif curr_word in int_stoppers: # interception
                        disrupt_df.at[idx, f'n_{curr_col}_intercepted'] += num
                        disrupt_df.at[idx, f'flag_{curr_col}_intercepted'] = True
                        break

                    elif curr_word in lnch_stoppers: # interception
                        disrupt_df.at[idx, f'n_{curr_col}_launched'] += num
                        disrupt_df.at[idx, f'flag_{curr_col}_launched'] = True
                        break


                    elif curr_word in universal_stopper or curr_word[-1] == ',':
                        n_max += 10
                    n_limit += 1

    # only drone models, otherwise too complicated
    stoppers = int_stoppers+lnch_stoppers+universal_stopper
    disrupt_df['drone_model'] = ''
    for idx, r in disrupt_df.iterrows():
        tmp_list = r.notes.lower().split(' ')
        for i in range(len(tmp_list)):
            if 'drone' in tmp_list[i]:
                n_lookback = 5
                model = ''
                for j in range(1, n_lookback+1):
                    curr_word = tmp_list[i-j]
                    stop_flag = False
                    for stp in stoppers:
                        if stp in curr_word:
                            stop_flag = True
                    if stop_flag == True or curr_word in ['a', 'an'] or curr_word.isdigit() or curr_word in word_to_num:
                        break
                    else:
                        model = curr_word + ' ' + model
                disrupt_df.at[idx, 'drone_model'] = model.replace("""'""",'').strip()
                break # done if find one
    
    return disrupt_df


In [42]:
disrupt_df = disrupt_extraction(disrupt_df)

In [44]:
# objects = ['drone', 'kamikaze-drone', 'air-bomb', 'missile', 'uxo', 'artillery-shell', 'aviation-bomb', 'ballistic', 'missile', 'bomb', 'explosive', 'glider-bomb']
# for obj in objects:
#     print(obj)
#     tmp_df = disrupt_df[disrupt_df[f'flag_{obj}_intercepted'] == True][['notes', f'flag_{obj}_intercepted', f'n_{obj}_intercepted']].head(2)
#     print(tmp_df)
#     tmp_df = disrupt_df[disrupt_df[f'flag_{obj}_launched'] == True][['notes', f'flag_{obj}_launched', f'n_{obj}_launched']].head(2)
#     print(tmp_df)
#     print(' ')

In [46]:
# disrupt_df[['notes', 'drone_model']].head(20)

## Air/drone strike

In [47]:
air_df = ukraine_war_events[ukraine_war_events['sub_event_type'] == 'Air/drone strike']

In [48]:
def airstrike_extraction(air_df):
    word_to_num = {
    "zero": 0, "one": 1, "two": 2, "three": 3, "four": 4,
    "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9,
    "ten": 10
    }


    keywords = ['drone', 'drone-strike', 'mortar', 'air-strike', 'kamikaze-drone', 'artillery']
    for c in keywords:
        air_df[f'flag_{c}'] = False

    # only these twos are important
    air_df['n_drone-strike'] = 0
    air_df['n_artillery'] = 0

    for idx, r in air_df.iterrows():
        tmp_list = r.notes.lower().split(' ')
        for i in range(len(tmp_list)):
            col = ''
            for e in keywords:
                if e in tmp_list[i]:
                    col = e
            if 'airstrike' in tmp_list[i]:
                col = 'air-strike'
            elif i+1 <= len(tmp_list) - 1:
                if tmp_list[i] == 'drone' and 'strike' in tmp_list[i+1]:
                    col = 'drone-strike'
                elif 'drone' in tmp_list[i] and 'struck' in tmp_list[i+1]:
                    col = 'drone-strike'
                elif tmp_list[i] == 'air' and 'strike' in tmp_list[i+1]:
                    col = 'air-strike'
            
            if col != '':
                air_df.at[idx, f'flag_{col}'] = True
                if col in ['drone-strike', 'artillery']: # look for n
                    for j in range(5):
                        if i-j < 0: break
                        curr_word = tmp_list[i-j]
                        if curr_word in ['a', 'an']:
                            air_df.at[idx, f'n_{col}'] += 1
                            break
                        elif curr_word.isdigit():
                            air_df.at[idx, f'n_{col}'] += int(curr_word)
                            break
                        elif curr_word in word_to_num:
                            air_df.at[idx, f'n_{col}'] += word_to_num[curr_word]
                            break
    return air_df

In [49]:
air_df = airstrike_extraction(air_df)

In [51]:
# air_df.head(50)

Disrupted weapon used
- Mine for number of drones/missiles being intercepted(also their model, if any) <----- (extract only drone's model)

Airdrone strike
- Air strike or Drone strike
- (opt.) equipment <----- (I didn't do this part)

Armed Clash
- (opt.) supported by any weapon/support types?

Shelling
- done already,look for aviation support
- type of missile

Government regain territory
- no worries


In [None]:
# test