# gdelt: Does Everyone Really Hate Belgium?
# Dr. Nick Elias
# 2018 April 24

# -----

## Description

### In this notebook, we investigate the avgTone quantity from a small subset of gdelt data during January 2017.  We perform a pyspark SQL query to our geomesa/HBase data store, calculate derived quantities, and display some results using bubble and choropleth maps.

# -----

### Helpful imports for data manipulation and visualization
### NB: The first four imports come with the standard python installation.  The remaining ones may need to be installed manually.

In [1]:
import csv
import json
import math
import os

from branca.colormap import linear
import folium
import matplotlib.pyplot as plotter

import numpy

import pandas
import geopandas

### Spark and SQL initialization via pyspark

In [2]:
spark_home = "/usr/lib/spark/"
os.environ["SPARK_HOME"] = spark_home
os.environ["GEOPYSPARK_JARS_PATH"] = "/opt/geomesa/geopyspark/jars/:/usr/lib/spark/jars/:/usr/lib/hadoop/:/usr/share/aws/emr/emrfs/lib/"

import geopyspark as gps
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

conf = gps.geopyspark_conf( appName="test", master="yarn" )
sc = SparkContext( conf=conf )
sqlc = SQLContext( sc )

### Create a persisted spark DataFrame of gdelt features in data store

In [3]:
#params = { "hbase.catalog": "catalog" } 
params = { "hbase.catalog": "catalogSmall" } # for testing

feature = "gdelt"
df = sqlc.read.format("geomesa").options(**params).option("geomesa.feature", feature).load().persist()

### List fields in DataFrame, insuring that there are no access issues

In [4]:
df.dtypes

[('__fid__', 'string'),
 ('globalEventId', 'string'),
 ('eventCode', 'string'),
 ('eventBaseCode', 'string'),
 ('eventRootCode', 'string'),
 ('isRootEvent', 'int'),
 ('actor1Name', 'string'),
 ('actor1Code', 'string'),
 ('actor1CountryCode', 'string'),
 ('actor1GroupCode', 'string'),
 ('actor1EthnicCode', 'string'),
 ('actor1Religion1Code', 'string'),
 ('actor1Religion2Code', 'string'),
 ('actor2Name', 'string'),
 ('actor2Code', 'string'),
 ('actor2CountryCode', 'string'),
 ('actor2GroupCode', 'string'),
 ('actor2EthnicCode', 'string'),
 ('actor2Religion1Code', 'string'),
 ('actor2Religion2Code', 'string'),
 ('quadClass', 'int'),
 ('goldsteinScale', 'double'),
 ('numMentions', 'int'),
 ('numSources', 'int'),
 ('numArticles', 'int'),
 ('avgTone', 'double'),
 ('dtg', 'timestamp'),
 ('geom', 'udt')]

### Create a view called "gdelt" (treat it as a virtual SQL database)

In [5]:
df.createOrReplaceTempView( "gdelt" )

### Main SQL query

### An actor is a person, organization, political entity, etc.  Actor 1 acts upon actor 2.  actor1Name and actor2Name are the names of actor 1 and actor2, respectively.  Similarly, actor1CountryCode and actor2CountryCode are the corresponding countries of actor 1 and actor2 (ISO 3166-1 alpha-3 format).  avgTone is a quantity which quantifies the goodness/badness of the interaction from actor1 to actor2.

### The query steps are as follows.  First, reject all records where actor1Name, actor1CountryCode, actor2Name, and actor2CountryCode are null because they cannot be classified.  Also, reject all records where the actor1CountryCode are the same as actor2CountryCode to insure that we are using only "exterior" avgTone.  Second, for each actor2CountryCode, calculate the minimum, first quartile, median, third quartile, and maximum of avgTone.  Also, save the number of records that went into those calculations.  Last, order the results by number of records since we are most interested in "lively" actor1 and actor2 interactions.

### We display the top query results (arranged by number of records in descending order).  Changing the numCountry variable will change the number of top query results.

In [6]:
numCountry = 20

qString = "SELECT actor2CountryCode AS country, "
qString += "ROUND(MIN(avgTone),3) AS minTone, "
qString += "ROUND(PERCENTILE_APPROX(avgTone,0.25),3) AS q1Tone, "
qString += "ROUND(PERCENTILE_APPROX(avgTone,0.50),3) AS medianTone, "
qString += "ROUND(PERCENTILE_APPROX(avgTone,0.75),3) AS q3Tone, "
qString += "ROUND(MAX(avgTone),3) AS maxTone, "
qString += "COUNT(*) as number "
qString += "FROM gdelt "
qString += "WHERE actor1Name!='' AND actor1CountryCode!='' AND actor2Name!='' AND actor2CountryCode!='' "
qString += "AND actor1Name!=actor2Name AND actor1CountryCode!=actor2CountryCode "
qString += "GROUP BY country "
qString += "ORDER BY number DESC"

q = sqlc.sql( qString )
qData = q.limit( numCountry ).collect()
qDataAll = q.collect()

q.show( numCountry )

+-------+-------+------+----------+------+-------+------+
|country|minTone|q1Tone|medianTone|q3Tone|maxTone|number|
+-------+-------+------+----------+------+-------+------+
|    USA|-12.903|-3.935|    -1.779| 0.279|  9.268|  2727|
|    RUS|-13.462|-4.602|    -3.051|-1.205| 10.027|  1374|
|    TUR|-17.117|-7.534|    -5.197|-3.203|    4.8|  1126|
|    SYR|-14.504|-4.962|    -3.557|-1.468|  7.895|   948|
|    ISR|-15.625|-4.317|    -2.784|-1.408|   6.25|   853|
|    GBR|-21.098|  -3.4|    -0.917|   1.0|  12.06|   740|
|    CHN| -12.54|-2.994|    -1.141| 1.034| 10.027|   636|
|    IRN|-11.881|-4.403|    -2.623|-0.508|   4.67|   582|
|    FRA|  -14.0|-4.135|    -1.191| 1.394|  7.143|   355|
|    EUR|-12.438|-3.735|    -1.837| -0.14|  9.375|   342|
|    AFR| -11.65| -5.95|    -3.152|-0.691|  6.494|   341|
|    IRQ|-17.355|-6.143|     -3.55|-1.031|    4.0|   333|
|    PAK|-12.389|-3.661|    -1.938|-0.423|  7.143|   326|
|    DEU|-12.143|-3.376|    -0.954| 0.691|  6.502|   303|
|    JPN| -6.1

### Create dictionary of countries with their central longitudes and latitudes , which is needed for the bubble map

In [7]:
country = dict()

with open( './country_centers.csv', 'r' ) as handle:
  reader = csv.reader( handle, delimiter=',' )
  for field in reader:
    country[field[2]] = { 'name':field[0], 'longitude':float(field[5]), 'latitude':float(field[4]) }

### Create pandas DataFrame (not the same as a spark DataFrame), then add one-sided acrimony, country name, central longitude, and central latitude columns.  The "one-sided acrimony" is the median tone linearly transformed such that minimum value goes to 100 and maximum value goes to slightly larger than 0.

### NB: Only the records shown in the previous frame are displayed in the bubble map to insure that it does not become too crowded

In [8]:
qMin = 1000.0
qMax = -1000.0
for r in qData:
  d = r.asDict()
  if d['medianTone'] < qMin: qMin = d['medianTone']
  if d['medianTone'] > qMax: qMax = d['medianTone']

acrimony1 = list()
for r in qData:
  d = r.asDict()
  acrimony1.append( -100.0*d['medianTone']/(qMax-qMin) + 100.1*qMax/(qMax-qMin) )

lqNew = list()
for i in range(len(qData)):
  d = qData[i].asDict()
  d['acrimony1'] = acrimony1[i]
  try:
    d['name'] = country[d['country']]['name']
    d['longitude'] = country[d['country']]['longitude']
    d['latitude'] = country[d['country']]['latitude']
    lqNew.append( d )
  except:
    pass

pq = pandas.DataFrame( lqNew )

### Create a bubble map where the most negative total median tones have the largest bubbles.  Clicking on a bubble displays the abbreviated name of the country, the number of records used in the calculations (in parentheses), and the minimum, first quartile, median, third quartile, and maximum aveTone.

In [9]:
map = folium.Map( location=[20.0,0.0], tiles='Mapbox Bright', zoom_start=2 )

for i in range(len(pq)):
  popupText = pq.iloc[i]['country']
  popupText += ' (' + str(pq.iloc[i]['number']) + ')'
  popupText += ': {'
  popupText += str(pq.iloc[i]['minTone']) + ','
  popupText += str(pq.iloc[i]['q1Tone']) + ','
  popupText += str(pq.iloc[i]['medianTone']) + ','
  popupText += str(pq.iloc[i]['q3Tone']) + ','
  popupText += str(pq.iloc[i]['maxTone'])
  popupText += "}"
  radius = 10000 * pq.iloc[i]['acrimony1'] * math.cos(math.pi/180.0*pq.iloc[i]['latitude'])
  folium.Circle( location=[pq.iloc[i]['latitude'], pq.iloc[i]['longitude']], popup=popupText,
      radius=radius, color='crimson', fill=True, fill_color='crimson' ).add_to( map )

map

### Create pandas DataFrame (not the same as a spark DataFrame), then add two-sided acrimony, country name, central longitude, and central latitude columns.  The "two-sided acrimony" is the median tone linearly transformed such that minimum value goes to 100 and maximum value goes to -100.

### NB: All records are displayed in the choropleth map

In [10]:
qMin = 1000.0
qMax = -1000.0
for r in qDataAll:
  d = r.asDict()
  if d['medianTone'] < qMin: qMin = d['medianTone']
  if d['medianTone'] > qMax: qMax = d['medianTone']

acrimony2 = list()
for r in qDataAll:
  d = r.asDict()
  acrimony2.append( -200.0*d['medianTone']/(qMax-qMin) + 100.0*(qMax+qMin)/(qMax-qMin) )

lqAll = list()
for i in range(len(qDataAll)):
  d = qDataAll[i].asDict()
  d['acrimony2'] = acrimony2[i]
  try:
    d['name'] = country[d['country']]['name']
    d['longitude'] = country[d['country']]['longitude']
    d['latitude'] = country[d['country']]['latitude']
    lqAll.append( d )
  except:
    pass

pqAll = pandas.DataFrame( lqAll )

### Create the base map for the choropleth map layers.  There are two map layers, two-sided acrimony and number of records.  We add the number of records layer because some countries may indicate a high level of acrimony but only a few records were available to calculate those values.

In [11]:
cmap = folium.Map( [20.0, 0.0], zoom_start=2 )

### Create a pandas DataFrame of country shapes from a geoJSON file, which is needed for the choropleth maps

In [12]:
gdf = geopandas.read_file( './countries.geo.json' )

### Create the two-sided acrimony choropleth layer

In [13]:
acrimony2_dict = pqAll.set_index( 'country' )['acrimony2']

colormapAcrimony2 = linear.YlGn.scale( pqAll.acrimony2.min(), pqAll.acrimony2.max() )
colormapAcrimony2.caption = 'Acrimony Index'

colorAcrimony2_dict = { key:colormapAcrimony2(acrimony2_dict[key]) for key in acrimony2_dict.keys() }

index = list()
for i in range(len(gdf)):
  if gdf['id'].iat[i] not in colorAcrimony2_dict: index.append( i )
gdf = gdf.drop( index )

acrimony2 = folium.GeoJson( gdf,
    style_function=lambda feature:
    {
      'fillColor': colorAcrimony2_dict[feature['properties']['id']],
      'color': 'black',
      'weight': 0.5,
      'fillOpacity': 0.5
    },
    smooth_factor=0.1
)

acrimony2.layer_name = 'Acrimony Index'

cmap.add_child( colormapAcrimony2 )
cmap.add_child( acrimony2 )

dummy = 0

### Create the number of records choropleth layer

In [14]:
numRecords_dict = pqAll.set_index( 'country' )['number']

colormapNumRecords = linear.YlGn.scale( pqAll.number.min(), pqAll.number.max() )
colormapNumRecords.caption = 'Number of Records'

colorNumRecords_dict = { key:colormapNumRecords(numRecords_dict[key]) for key in numRecords_dict.keys() }

numRecords = folium.GeoJson( gdf,
    style_function=lambda feature:
    {
      'fillColor': colorNumRecords_dict[feature['properties']['id']],
      'color': 'black',
      'weight': 0.5,
      'fillOpacity': 1.0
    },
    smooth_factor=0.1
)

numRecords.layer_name = 'Number of Records'

cmap.add_child( colormapNumRecords )
cmap.add_child( numRecords )

dummy = 0

### Enable the layer control and display the choropleth maps

### The best way to interact with this map is to look at the acrimony layer by itself, find the desired country and note its acrimony, then add the number of records layer on top to see if the number of records is significant.

### NB: The number of records layer is completely opaque, so when it is present the other layers cannot be seen.

In [15]:
folium.LayerControl().add_to( cmap )

cmap

### Stop the spark job

### NB: You can comment this statement out before running the notebook if you want to fiddle with it and rerun certain fields.  Before you rerun the entire notebook, however, this statement must be run, otherwise stale spark jobs will hoard computer resources.

In [16]:
sc.stop()

# So, does everyone really hate Belgium?

### Not really.  Granted, it's acrimony index is high, but the number of records used to estimate the acrimony value is small compared to other countries.  It would be more accurate to say that "Some people hate Belgium."