# Exploratory Analysis of GDELT Dataset using Spark

In [1]:
%matplotlib inline
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SQLContext




In [2]:
sc

<pyspark.context.SparkContext at 0xbd62e80>

In [None]:
#import findspark
#findspark.init() 

import pyspark
sc = pyspark.SparkContext()

sqlContext = SQLContext(sc)

In [6]:
rawEvents = sc.textFile('gdelt/20151105.export.CSV').map(lambda x: x.split('\t'))

In [None]:
#Schema info from: https://bigquery.cloud.google.com/table/gdelt-bq:full.events?pli=1
eventsRDDWithDataTypes = rawEvents.map(lambda p: (int(p[0]), int(p[1]), int(p[2]), int(p[3]), float(p[4]) , p[5], p[6], p[7], p[8], p[9], p[10],p[11],p[12], p[13], p[14], p[15], p[16],p[17],p[18],p[19],p[20],p[21],p[22],p[23],p[24],int(p[25]),p[26],p[27],p[28],int(p[29]),p[30],int(p[31]),int(p[32]),int(p[33]),float(p[34]),int(p[35]),p[36],p[37],p[38],p[39],p[40],p[41],int(p[42]),p[43],p[44],p[45],p[46],p[47],p[48],p[49],p[50],p[51],p[52],p[53],p[54],p[55],p[56],p[57]) ) #.toDF(schema)
#note: some of the lat/long fields are null - so the bove code is not casting it into Float. In the final code we need to perform type conversion properly
#eventsDF.cache()
print "Event Count: ", eventsRDDWithDataTypes.count()

## File Format
* The Historical Backfile collection, which runs January 1, 1979 through March 31, 2013 contains 57 fields for each record. 
* The Daily Updates collection, which begins April 1, 2013 and runs through present, contains an additional field at the end of each record, for a total of 58 fields for each record. 
* Recordsare stored one per line, separated by a newline (\n) and are tab-delimited (note that files have a “.csv”extension, but are actually tab-delimited).


## Data Model
* capturing two actors and the action performed by Actor1 upon Actor2
* Goldstien score: this score is based on the type of event, not the specifics of the actual event record being recorded – thus two riots, one with 10 people and one with 10,000, will both receive the same Goldstein score
* AvgTone. (numeric) This is the average “tone” of all documents containing one or more mentions of this event. The score ranges from -100 (extremely negative) to +100 (extremely positive). Common values range between -10 and +10, with 0 indicating neutral.

## Ideas
* Goldstien score can be aggregated to various levels of time resolution to yield an approximation of the stability of a location over time.
* NumMentions: This is the total number of mentions of this event across all source documents. Multiple references to an event within a single document also contribute to this count. This can be used as a method of assessing the “importance” of an event: the more discussion of that event, the more likely it is to be significant.
    * Normalize NumMentions, NumArticles & NumSources: The total universe of source documents and the density of events within them vary over time, so it is recommended that this field be normalized by the average or other measure of the universe of events during the timeperiod of interest. 
* Remove duplicate events: NOTE from NumMentions/NumSources/NumArticles definition: this field is updated over time if news articles published later discuss this event (for example, in the weeks after a major bombing there will likely be numerous news articles published mentioning the original bombing as context to new developments, while on the one-year anniversary there will likely be further coverage). At this time the daily event stream only includes new event records found each day and does not include these updates; a special “updates” stream will be released in Fall 2013 that will include these.

## Data Exploration

* Total events: 360,142,405
* Total root events: 232,115,641
* Total root events where ActionGeo_CountryCode is null: 7,307,887
* Total root events where ActionGeo_CountryCode is null: 27,775,769
* Total root events were EventRootCode is not valud (not in "01", "02".."20) : 58
* First event date: 1979-01-01
* Last event date: 2016-01-31

In [None]:
from pyspark.sql.types import *
schemaString =  ["GLOBALEVENTID", "SQLDATE", "MonthYear", "Year", "FractionDate", "Actor1Code", "Actor1Name", "Actor1CountryCode", "Actor1KnownGroupCode", 
                 "Actor1EthnicCode", "Actor1Religion1Code", "Actor1Religion2Code", "Actor1Type1Code", "Actor1Type2Code", "Actor1Type3Code", "Actor2Code", 
                 "Actor2Name", "Actor2CountryCode", "Actor2KnownGroupCode", "Actor2EthnicCode", "Actor2Religion1Code", "Actor2Religion2Code", "Actor2Type1Code", "Actor2Type2Code", "Actor2Type3Code", "IsRootEvent", "EventCode", "EventBaseCode", "EventRootCode", "QuadClass", "GoldsteinScale", "NumMentions", "NumSources", "NumArticles", "AvgTone", "Actor1Geo_Type", "Actor1Geo_FullName", "Actor1Geo_CountryCode", "Actor1Geo_ADM1Code", "Actor1Geo_Lat", "Actor1Geo_Long", "Actor1Geo_FeatureID", "Actor2Geo_Type", "Actor2Geo_FullName", "Actor2Geo_CountryCode", "Actor2Geo_ADM1Code", "Actor2Geo_Lat", "Actor2Geo_Long", "Actor2Geo_FeatureID", "ActionGeo_Type", "ActionGeo_FullName", "ActionGeo_CountryCode", "ActionGeo_ADM1Code", "ActionGeo_Lat", "ActionGeo_Long", "ActionGeo_FeatureID", "DATEADDED", "SOURCEURL"]
fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
fields[0].dataType =IntegerType()
fields[1].dataType =IntegerType()
fields[2].dataType =IntegerType()
fields[3].dataType =IntegerType()
fields[4].dataType =FloatType()
fields[25].dataType =IntegerType()
fields[29].dataType =IntegerType()
fields[31].dataType =IntegerType()
fields[32].dataType =IntegerType()
fields[34].dataType =FloatType()
fields[42].dataType =IntegerType()
schema = StructType(fields)

In [None]:

#pyspark.sql.types.DateConverter

In [None]:
from pyspark.sql import SQLContext
eventsDF = sqlContext.createDataFrame(eventsRDDWithDataTypes, schema)

In [7]:
rawEvents.count()

236093