**Initial Imports**

In [None]:
#library and code setup
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark
import pyspark, os
from pyspark import SparkConf, SparkContext
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64/"
!pip install gdelt

**Initialise pyspark**

In [None]:
#start spark local server
import sys, os
from operator import add
import time

os.environ["PYSPARK_PYTHON"]="python3"

import pyspark
from pyspark import SparkConf, SparkContext

#connects our python driver to a local Spark JVM running on the Google Colab server virtual machine
try:
  conf = SparkConf().setMaster("local[*]").set("spark.executor.memory", "1g")
  sc = SparkContext(conf = conf)
except ValueError:
  #it's ok if the server is already started
  pass

def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)

**Import Data**

In [None]:
from concurrent.futures import ProcessPoolExecutor
from datetime import date, timedelta
import datetime
import pandas as pd
import gdelt
import os
from pyspark.sql import SQLContext

# set up gdeltpyr for version 2
gd = gdelt.gdelt(version=2)

# multiprocess the query
e = ProcessPoolExecutor()


# generic functions to pull and write data to disk based on date
def get_filename(x):
  date = x.strftime('%Y%m%d')
  return "{}_gdeltdata.csv".format(date)

def intofile(filename):
    try:
        if not os.path.exists(filename):
          date = filename.split("_")[0]
          d = gd.Search(date, table='events',coverage=True) #not updata at 15mins
          d.to_csv(filename,encoding='utf-8',index=False)
    except:
        print("Error occurred")

# pull the data from gdelt into multi files; this may take a long time
# change date range here
dates = [get_filename(x) for x in pd.date_range('2022 01 01', '2022 01 31')]


results = list(e.map(intofile,dates))

sqlContext = SQLContext(sc)

data = sqlContext.read.option("header", "true").csv(dates)

!gdown 'https://drive.google.com/u/0/uc?id=1AlyqTzz8HR1qoeFkaxTUJ_IUU5UUMNVH&export=download'

Full Code - Outputs top 10 pairs of event types of interest from Actor1CountryCode = 'USA' for highest and lowest values

In [None]:
import time

time_start = time.time()

def is_good(result):
  if result['Actor1CountryCode'] == 'USA':
    return True

#transforms full months data from csv into rdd with ('USA', '*country*') keys and a list of unique event type codes as values
events_by_country = data.rdd.map(lambda row: row).filter(lambda row: is_good(row) == True).map(lambda row: ((row['Actor1CountryCode'], row['Actor2CountryCode']), row['EventCode'])).groupByKey().map(lambda x : (x[0], list(set(x[1]))))

#Filters the event types to only pairs containing one of these elements
#for example, if we only wanted to test event types of pairs
#where one pair is in the provide aid catagory,  we put them into filteredgroup
filteredgroup = ['070','071','072','073','074','075']

# Maps Rdd into an rdd with the event types and the number of countries it
# was associated with, filters to >= 5, and broadcasts as a map
eventfreq = events_by_country.flatMap(lambda x: x[1]).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).filter(lambda x: x[1] >= 5)
eventfreq_b = sc.broadcast(eventfreq.collectAsMap())

#eventfreq2 = events_by_country.flatMap(lambda x: x[1]).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).filter(lambda x: x[1] >= 5).filter(lambda x: x[0] in filteredgroup)
#eventfreq2_b = sc.broadcast(eventfreq2.collectAsMap())

def srtTup(a,b):
  return (a,b) if a<b else (b,a)

def Pairs(listy):
  """Returns pairs from a given list and checks through filtered list to reduce run time"""
  ret = list()
  for a in listy:
    for b in listy:
      if a in eventfreq_b.value and b in eventfreq_b.value:
        if a != b:
          if (srtTup(a,b), 1) not in ret:
            ret.append((srtTup(a,b), 1))
  return ret

def Filtered_Pairs(listy):
  ret = list()
  for a in listy:
    for b in listy:
      if a in eventfreq_b.value and b in eventfreq_b.value:
        if a in eventfreq2_b.value or b in eventfreq2_b.value:
          if a != b:
            if (srtTup(a,b), 1) not in ret:
              ret.append((srtTup(a,b), 1))
  return ret

# NOTE : Uncomment the pairs with Filtered_Pairs if using Specific event types
# Creates Pair and Frequency Tuples using the Pairs function
EF2 = events_by_country.map(lambda x: x[1])
pairs = EF2.flatMap(lambda x: Pairs(x))
#pairs = EF2.flatMap(lambda x: Filtered_Pairs(x))
final = pairs.reduceByKey(lambda a, b: a + b)

lenfile = len(events_by_country.collect())

# Calculates Support, Confidence and Interest Scores
suppscore = final.map(lambda x: (x[0], x[1] / lenfile))
suppscore2 = final.map(lambda x: (tuple(sorted(x[0], reverse = True)), x[1] / lenfile))
confscore = suppscore.map(lambda x: (x[0], x[1] / (eventfreq_b.value[x[0][0]] / lenfile) ))
confscore2 = suppscore2.map(lambda x: (x[0], x[1] / (eventfreq_b.value[x[0][0]] / lenfile) ))
conf_fin = confscore.union(confscore2).sortBy(lambda x: -x[1])

interest_fin = conf_fin.map(lambda x: (x[0], x[1] - (eventfreq_b.value[x[0][1]] / lenfile))).sortBy(lambda x: -x[1])
interest_fin2 = interest_fin.sortBy(lambda x: x[1])

# creates dictionary from event type codes and description pairs
transdict = sc.textFile('/content/translation.txt').map(lambda line: line.split('\t')).collectAsMap()

# Gives a list of top 10 event type pairings by interest
interesting = sc.parallelize(interest_fin.take(10)).map(lambda data: ((transdict[data[0][0]], transdict[data[0][1]]), data[1]))
interesting2 = sc.parallelize(interest_fin2.take(10)).map(lambda data: ((transdict[data[0][0]], transdict[data[0][1]]), data[1]))
 
print(interesting.collect())
print(interesting2.collect())

time_end = time.time()
print("elapsed time is %s" % str(time_end-time_start))
