#Start up the project

In [None]:
#library and code setup
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark
import pyspark, os
from pyspark import SparkConf, SparkContext
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64/"

!pip install gdelt

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages [2,182 kB]

In [None]:
#start spark local server
import sys, os
from operator import add
import time

os.environ["PYSPARK_PYTHON"]="python3"

import pyspark
from pyspark import SparkConf, SparkContext

#connects our python driver to a local Spark JVM running on the Google Colab server virtual machine
try:
  conf = SparkConf().setMaster("local[1]").set("spark.executor.memory", "1g")
  sc = SparkContext(conf = conf)
except ValueError:
  #it's ok if the server is already started
  pass

def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)

In [None]:
from concurrent.futures import ProcessPoolExecutor
from datetime import date, timedelta
import pandas as pd
import gdelt
import os

# set up gdeltpyr for version 2
gd = gdelt.gdelt(version=2)

# multiprocess the query
e = ProcessPoolExecutor()

#The program itself

##Obtaining the data

In [None]:
# generic functions to pull and write data to disk based on date
def get_filename(x):
  date = x.strftime('%Y%m%d')
  return "{}_gdeltdataEvent.csv".format(date)

def intofile(filename):
    try:
        if not os.path.exists(filename):
          date = filename.split("_")[0]
          d = gd.Search(date, table='events',coverage=True) #not updata at 15mins
          d.to_csv(filename,encoding='utf-8',index=False)
    except:
        print("Error occurred")

# # pull the data from gdelt into multi files; this may take a long time
annoncement = [get_filename(x) for x in pd.date_range('2017 Sep 24','2017 Oct 09')] #The date when Mohammed Bin Salman announced he will lift the ban is on 26th of Sep 2017 on Arabian Standard Time. Added ranges to take timezones into an account and delayed articles.
liftBanPeriod = [get_filename(x) for x in pd.date_range('2018 June 23','2018 July 07')] #The date that the ban was lifted is on 24th of June 2018 on Arabian Standard Time. Added ranges to take timezones into an account and delayed articles.

results = list(e.map(intofile,annoncement+liftBanPeriod))
print(results)

[None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]


In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

announcementData = sqlContext.read.option("header", "true").csv(annoncement)
liftBanPeriodData = sqlContext.read.option("header", "true").csv(liftBanPeriod)

ARAB_COUNTRY_CODES = ["egy", "jor", "yem", "pse", "omn", "are", "qat", "bhr", "kwt", "irq", "syr", "lbn", "sdn", "lby", "dza", "tun", "mar", "mrt"]


##The needed functions

In [None]:
import numpy as np

def filterRelevant(rawData):
  """
  Filter the relevant data, then replace the url with a number so I can reduce them into counts of positive, neutral, negative events by country
  """
  rawData = rawData.filter(lambda row: (("women" in row[1].lower() or "woman" in row[1].lower()) and ("drive" in row[1].lower() or "driving" in row[1].lower()) and ("saud" in row[1].lower())))
  rawData = rawData.map(lambda row: ((row[0][0], row[0][1]), 1)).reduceByKey(lambda a, b: a+b) #No longer need articles
  return rawData

def filterByInterestRegions(rawData):
  """
  Filter out regions of interest and combine them again to be able to process them as vectors.
  """
  result1 = rawData.filter(lambda row: row[0][0] is not None and 'sau' in row[0][0].lower())
  result2 = rawData.filter(lambda row: row[0][0] is not None and row[0][0].lower() in ARAB_COUNTRY_CODES).map(lambda row: (('ARB', row[0][1]), row[1])).reduceByKey(lambda a, b: a+b)
  result3 = rawData.filter(lambda row: (row[0][0] is not None and row[0][0].lower() not in ARAB_COUNTRY_CODES and "sau" not in row[0][0].lower()) or row[0][0] is None).map(lambda row: (('INT', row[0][1]), row[1])).reduceByKey(lambda a, b: a+b)
  result = result1.union(result2).union(result3)
  return {k:v for k,v in result.collect()}

def get_vector(result, country):
  """
  Obtains a vector of 
  """
  return np.array([result[(country, 'Positive')],
          result[(country, 'Negative')],
          result[(country, 'Neutral')]])  
  
def event_tone(tone):
  return ("Positive" if tone > 0 else
          "Neutral" if tone == 0 else
          "Negative")
  
def cos_similarity(v1, v2):
  return v1.dot(v2) / (np.linalg.norm(v1)*np.linalg.norm(v2))


##Calls to filter the raw data to data of interest

In [None]:
announcementRawData = announcementData.rdd.map(lambda row: ((row['Actor1CountryCode'], event_tone(float(row['AvgTone']))), row['SOURCEURL']))
liftBanPeriodRawData = liftBanPeriodData.rdd.map(lambda row: ((row['Actor1CountryCode'], event_tone(float(row['AvgTone']))), row['SOURCEURL']))

In [None]:
announcementReactions = filterByInterestRegions(filterRelevant(announcementRawData))
liftBanReactions = filterByInterestRegions(filterRelevant(liftBanPeriodRawData))

##Answer to the research question

In [None]:
saudiAnnounceReactions = get_vector(announcementReactions, "SAU")
arabAnnounceReactions = get_vector(announcementReactions, "ARB")
interAnnounceReactions = get_vector(announcementReactions, "INT")
print(f"Vector of Saudi positive, negative, and neutral articles about the announcement {saudiAnnounceReactions}")
print(f"Vector of Arab positive, negative, and neutral articles about the announcement {arabAnnounceReactions}")
print(f"Vector of International positive, negative, and neutral articles about the announcement {interAnnounceReactions}")

Vector of Saudi positive, negative, and neutral articles about the announcement [416 547  17]
Vector of Arab positive, negative, and neutral articles about the announcement [21 45  2]
Vector of International positive, negative, and neutral articles about the announcement [415 574  23]


In [None]:
saudiLiftBanReactions = get_vector(liftBanReactions, "SAU")
arabLiftBanReactions = get_vector(liftBanReactions, "ARB")
interLiftBanReactions = get_vector(liftBanReactions, "INT")
print(f"Vector of Saudi positive, negative, and neutral articles about the ban lift {saudiLiftBanReactions}")
print(f"Vector of Arab positive, negative, and neutral articles about the ban lift {arabLiftBanReactions}")
print(f"Vector of International positive, negative, and neutral articles about the ban lift {interLiftBanReactions}")

Vector of Saudi positive, negative, and neutral articles about the ban lift [174 454  13]
Vector of Arab positive, negative, and neutral articles about the ban lift [ 6 10  3]
Vector of International positive, negative, and neutral articles about the ban lift [151 574  13]


In [None]:
print(f"Saudi reaction similarity to announcement compared to Saudi Arabia {cos_similarity(saudiAnnounceReactions,saudiAnnounceReactions)}")
print(f"Saudi reaction similarity to announcement compared to the rest of the Arab world {cos_similarity(saudiAnnounceReactions,arabAnnounceReactions)}")
print(f"Saudi reaction similarity to announcement compared to the rest of the world {cos_similarity(saudiAnnounceReactions,interAnnounceReactions)}")


Saudi reaction similarity to announcement compared to Saudi Arabia 1.0000000000000002
Saudi reaction similarity to announcement compared to the rest of the Arab world 0.9771858781250261
Saudi reaction similarity to announcement compared to the rest of the world 0.9996775512955165


In [None]:
print(f"Saudi reaction similarity to ban lift compared to Saudi Arabia {cos_similarity(saudiLiftBanReactions,saudiLiftBanReactions)}")
print(f"Saudi reaction similarity to ban lift compared to the rest of the Arab world {cos_similarity(saudiLiftBanReactions,arabLiftBanReactions)}")
print(f"Saudi reaction similarity to ban lift compared to the rest of the world {cos_similarity(saudiLiftBanReactions,interLiftBanReactions)}")

Saudi reaction similarity to ban lift compared to Saudi Arabia 1.0000000000000002
Saudi reaction similarity to ban lift compared to the rest of the Arab world 0.9600912434220401
Saudi reaction similarity to ban lift compared to the rest of the world 0.9940839035469081
