# LT3_BDCC Final Project_Data Preprocessing


The purpose of this notebook is to preprocess the GDELT data, specifically, getting the per day average of the news and events tones for a particular year. Take note that this notebook was run and downloaded from the created EMR cluster in AWS.

In [1]:
# initialize spark
sc

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1610003585605_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-1>

In [2]:
# load the data
root = "s3://bdcc2021-aids/bdcc_gdelt_v2/"
df = spark.read.csv(root, header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+-------------------+--------+--------------------+--------------------+--------------------+--------------------+
|               id|          timestamp|sourceID|          sourceName|              themes|            location|                tone|
+-----------------+-------------------+--------+--------------------+--------------------+--------------------+--------------------+
| 20180327004500-2|2018-03-27 00:45:00|       1|https://www.genea...|TAX_FNCACT;TAX_FN...|1#Australia#AS#AS...|1.73800774654881,...|
| 20180327004500-3|2018-03-27 00:45:00|       1|https://www.oilan...|ENV_BIOFUEL;WB_53...|2#New York, Unite...|0,1.8454440599769...|
| 20180327004500-5|2018-03-27 00:45:00|       1|https://www.reute...|TAX_FNCACT;TAX_FN...|3#Washington, Was...|0,3.3898305084745...|
| 20180327004500-7|2018-03-27 00:45:00|       1|https://durangohe...|URBAN;EPU_ECONOMY...|2#Arizona, United...|-1.51975683890577...|
|20180327004500-11|2018-03-27 00:45:00|       1|https://www.genea...|

In [3]:
df = df[["id", "tone"]]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# count
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

39858339

In [4]:
# %spark -o dfm
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType, ArrayType, FloatType
import numpy as np
import re

# get per day
pattern = '^([0-9]{8})'
get_date = UserDefinedFunction(lambda x: re.match(pattern, str(x))[0] if re.match(pattern, str(x)) else None, returnType=StringType())
dfm = df.select(*[get_date("id").alias("id") if column == "id" else column for column in df.columns])

# split tone values
split_tones = UserDefinedFunction(lambda x: [float(i) for i in str(x).split(",")],
                                  returnType=ArrayType(FloatType()))
dfm = dfm.select(*[split_tones("tone").alias("tone_list") if column == "tone" else column for column in dfm.columns])

# dfm.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# drop null values
dfm = dfm.na.drop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# dfm.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

39858339

In [7]:
dfm.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+
|      id|           tone_list|
+--------+--------------------+
|20180327|[1.7380078, 2.661...|
|20180327|[0.0, 1.8454441, ...|
|20180327|[0.0, 3.3898306, ...|
|20180327|[-1.5197568, 0.30...|
|20180327|[0.16454133, 2.26...|
+--------+--------------------+
only showing top 5 rows

In [6]:
from pyspark.sql import functions as F

# group per day
dfm_grouped = (dfm.groupby("id").agg(F.collect_list("tone_list")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
dfm_grouped = (dfm_grouped.withColumnRenamed('id', 'timestamp')
                          .withColumnRenamed('collect_list(tone_list)', 'tone_list'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# check schema
dfm_grouped.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- timestamp: string (nullable = true)
 |-- tone_list: array (nullable = true)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: float (containsNull = true)

In [9]:
import numpy as np

# get average of tones
tone_avg = UserDefinedFunction(lambda x: np.array(x).mean(axis=0).tolist(),
                           returnType=ArrayType(FloatType()))
dfm_grouped = dfm_grouped.withColumn('tone_avg', tone_avg('tone_list'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# check schema
dfm_grouped.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- timestamp: string (nullable = true)
 |-- tone_list: array (nullable = true)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: float (containsNull = true)
 |-- tone_avg: array (nullable = true)
 |    |-- element: float (containsNull = true)

In [13]:
# get grouped count
dfm_grouped.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

471

In [11]:
df_tone = dfm_grouped.select(['timestamp', 'tone_avg'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# combine tone scores for converting to csv
combine_tones = UserDefinedFunction(lambda x: ','.join([str(i) for i in x]),
                                  returnType=StringType())
df_tone = df_tone.withColumn('tone_avg_str', combine_tones('tone_avg'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df_tone = df_tone.select('timestamp', 'tone_avg_str')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df_tone.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+
|timestamp|        tone_avg_str|
+---------+--------------------+
| 20160825|-1.16842446960340...|
| 20161026|-1.02650763139838...|
| 20160427|-0.97052719919314...|
| 20161219|-0.88337425480853...|
| 20160715|-1.32596008945819...|
+---------+--------------------+
only showing top 5 rows

In [14]:
df_tone.write.parquet('s3://bdcc2021-aids/bdcc_gdelt_v2_tone2018-19')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# read saved parquet
pq = spark.read.parquet('s3://bdcc2021-aids/bdcc_gdelt_v2_tone2016')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
pq.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+
|timestamp|        tone_avg_str|
+---------+--------------------+
| 20160304|-1.17667216189584...|
| 20160811|-1.20098561018168...|
| 20160804|-1.30564572411778...|
| 20161011|-1.3193570595416,...|
| 20160709|-2.08602981701622...|
+---------+--------------------+
only showing top 5 rows

In [26]:
#save csv
(pq.coalesce(1).write.format("com.databricks.spark.csv")
                      .option("header", "true")
                      .save("s3://bdcc2021-aids/bdcc_gdelt_v2_tone2016CSV"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…