## GDELT Databuilder

### Initial data extraction and filtering (GKG)

- We filter out our desired time period using `spark.gdelt` in `scala`. 
- We stack a number of searchstrings that relate to our topic of interest, and get all news items where any of the keywords are in the url, which often reflects the article heading.
- Then drop the unwanted columns.
- Write to parquet.

In [0]:
%scala
import com.aamend.spark.gdelt._ 

// OUR TIMEFRAME ----------------------
val start_date = "2021-01-01 00:00:00"
val end_date = "2021-06-30 00:00:00"
// ------------------------------------

val raw_gkg = spark.read.format("delta").load("s3a://osint-gdelt-reado/GDELT/delta/bronze/v1/gkg/").as[GKGEventV1]
val filtered_gkg = raw_gkg.filter($"publishDate">start_date && $"publishDate"<end_date)

// OUR SEARCHSTRINGS ------------------
val topics_gkg = filtered_gkg.filter("sourceUrls[0] like '%covid%' OR sourceUrls[0] like '%vaccine%' OR sourceUrls[0] like '%vaxx%'")
// ------------------------------------

val topics_gkg_clean = topics_gkg.drop("numArticles").drop("counts").drop("hash").drop("errors").drop("date")
topics_gkg_clean.write.format("parquet").mode("overwrite").save("dbfs:/FileStore/temp.parquet")

### Dig deeper into the data (GKG)
- Read the parquet with `koalas` in Python.
- Get the _top themes_ from this initial dataset.
- Perform a manual inspection of this list and construct a search string for another round of data extraction.

In [0]:
%python
import databricks.koalas as ks
dataDF = ks.read_parquet("dbfs:/FileStore/temp.parquet")
topthemesDF = ks.DataFrame(dataDF['themes'].explode().value_counts())
topthemesDF.reset_index(level=0, inplace=True)
topthemes = topthemesDF['index'][:19].to_numpy()

In [0]:
%python
## Edit this print output manually for pasting into the next cell
## For example: delete any themes deemed too generic or irrelevant
print(list(topthemes))

In [0]:
%python
# more_themes = your edited list
more_themes = ['TAX_DISEASE_CORONAVIRUS', 'WB_2165_HEALTH_EMERGENCIES', 'WB_2166_HEALTH_EMERGENCY_PREPAREDNESS_AND_DISASTER_RESPONSE', 'WB_2167_PANDEMICS', 'HEALTH_PANDEMIC', 'KILL']

themestring = "filtered_gkg.filter(c =>"
for i in more_themes:
    themestring = themestring + ' c.themes.contains("' + i + '"' + ") ||"
themestring = themestring[:-4] + '))'

print(themestring)

### Expand the dataset (GKG) — more rows

**First**: Paste the printed search string above into the appropriate place in the cell below. <br><br>
- Get the themes based data.
- Drop unwanted columns.
- Merge with the previous data for a new larger dataframe (`union()`).
- Remove duplicates (`.distinct()`).

In [0]:
%scala
// val more_themes = the themestring
val gkg_themes = filtered_gkg.filter(c => c.themes.contains("TAX_DISEASE_CORONAVIRUS") || c.themes.contains("WB_2165_HEALTH_EMERGENCIES") || c.themes.contains("WB_2166_HEALTH_EMERGENCY_PREPAREDNESS_AND_DISASTER_RESPONSE") || c.themes.contains("WB_2167_PANDEMICS") || c.themes.contains("HEALTH_PANDEMIC") || c.themes.contains("KILL"))

val gkg_themes_clean = gkg_themes.drop("numArticles").drop("counts").drop("hash").drop("errors").drop("date")
val all_gkg = topics_gkg_clean.union(gkg_themes_clean).distinct()

### Enrich our data (EVENTS) – more columns
- Harvest `eventIds` – that were identified in the GKG dataset – from the EVENTS data.
- Explode the GKG data on `eventIds` to get one eventId per row and thus prepare for inner join.
- Join the dataframes into one, and drop clean up columns.

In [0]:
%scala
// Load the GDELT `events` dataset for the same time period as for the GKG above
val raw_events = spark.read.format("delta").load("s3a://osint-gdelt-reado/GDELT/delta/bronze/v1/events").as[EventV1] // now we read the events table, rather than gkg
val filtered_events = raw_events.filter($"date">start_date && $"date"<end_date).distinct()
val filtered_events_clean = filtered_events.drop("actor1Code").drop("actor2Code").drop("isRoot").drop("cameoEventBaseCode").drop("cameoEventRootCode").drop("quadClass").drop("actor1Geo").drop("actor2Geo").drop("dateAdded").drop("hash").drop("errors").drop("date")

// Explode the gkg on eventIds to create an eventId column for inner join
import org.apache.spark.sql.functions.explode
val all_gkg_prep = all_gkg.withColumn("eventId", explode($"eventIds")).drop("eventIds")

// Perform the inner join
val joinedDF = all_gkg_prep.join(filtered_events_clean, Seq("eventId")) // This method avoids duplicate columns after join

// Column clean-up
val finalDF = joinedDF.withColumn("geoName",$"eventGeo.geoName").withColumn("latitude",$"eventGeo.geoPoint.latitude").withColumn("longitude",$"eventGeo.geoPoint.longitude").withColumn("countryCode",$"eventGeo.countryCode").withColumn("ttone",$"tone.tone").withColumn("positiveScore",$"tone.positiveScore").withColumn("negativeScore",$"tone.negativeScore").withColumn("polarity",$"tone.polarity").withColumn("activityReferenceDensity",$"tone.activityReferenceDensity").withColumn("selfGroupReferenceDensity",$"tone.selfGroupReferenceDensity").drop("locations").drop("eventGeo").drop("tone").drop("avgTone").withColumn("tone", $"ttone").drop("ttone").distinct()

In [0]:
%scala
finalDF.count

### Final dataset
- Write to parquet file.

In [0]:
%scala
finalDF.write.format("parquet").mode("overwrite").save("dbfs:/FileStore/dataset.parquet")