## The GDELT dataset v1.0

The GDELT 1.0 event dataset comprises over 3.5 billion mentions of over 364 million distinct events from almost every corner of the earth spanning January 1979 to present and updated daily. GDELT 1.0 includes only hand translated foreign language content – for those wanting access to the full-volume machine translation feed, GDELT 2.0 should be used instead. You can download the complete collection as a series of CSV files or query it through Google BigQuery. The file format documentation describes the various fields and their structure. You can also download column headers for the 1979 – March 2013 Back File and April 1, 2013 – Present Front File, as well as the Column IDs.

The GDELT Event Database uses the CAMEO event taxonomy, which is a collection of more than 300 types of events organized into a hierarchical taxonomy and recorded in the files as a numeric code. You can learn more about the schema itself in the CAMEO Code Reference. In addition, tab-delimited lookup files are available that contain the human-friendly textual labels for each of those codes to make it easier to work with the data for those who have not previously worked with CAMEO. Lookups are available for both Event Codes and the Goldstein Scale. In addition, details recording characteristics of the actors involved in each event are stored as a sequence of 3 character codes. Lookups are available for Event Country Codes, Type Codes, Known Group Codes, Ethnic Codes and Religion Codes.

There are also a number of normalization files available. These comma-delimited (CSV) files are updated daily and record the total number of events in the GDELT 1.0 Event Database across all event types broken down by time and country. This is important for normalization tasks, to compensate the exponential increase in the availability of global news material over time. Available normalization files include Daily, Daily by Country, Monthly, Monthly by Country, Yearly and Yearly by

Events code = http://data.gdeltproject.org/documentation/ISA.2013.GDELT.pdf
<p>to download some events: $wget -r --no-parent -A zip http://data.gdeltproject.org/events/  <p>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "4g").\
        getOrCreate()

21/08/07 19:35:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
events = spark.read.option("inferSchema","true")\
.option("delimiter", "\t")\
.csv("../../data/excluded/events-dataset/")

                                                                                

In [4]:
events = events.withColumnRenamed("_c0","GLOBALEVENTID")
events = events.withColumnRenamed("_c1","SQLDATE")
events = events.withColumnRenamed("_c2","MonthYear")
events = events.withColumnRenamed("_c3","Year")
events = events.withColumnRenamed("_c4","FractionDate")
events = events.withColumnRenamed("_c5","Actor1Code")
events = events.withColumnRenamed("_c6","Actor1Name")
events = events.withColumnRenamed("_c7","Actor1CountryCode")
events = events.withColumnRenamed("_c8","Actor1KnownGroupCode")
events = events.withColumnRenamed("_c9","Actor1EthnicCode")
events = events.withColumnRenamed("_c10","Actor1Religion1Code")
events = events.withColumnRenamed("_c11","Actor1Religion2Code")
events = events.withColumnRenamed("_c12","Actor1Type1Code")
events = events.withColumnRenamed("_c13","Actor1Type2Code")
events = events.withColumnRenamed("_c14","Actor1Type3Code")
events = events.withColumnRenamed("_c15","Actor2Code")
events = events.withColumnRenamed("_c16","Actor2Name")
events = events.withColumnRenamed("_c17","Actor2CountryCode")
events = events.withColumnRenamed("_c18","Actor2KnownGroupCode")
events = events.withColumnRenamed("_c19","Actor2EthnicCode")
events = events.withColumnRenamed("_c20","Actor2Religion1Code")
events = events.withColumnRenamed("_c21","Actor2Religion2Code")
events = events.withColumnRenamed("_c22","Actor2Type1Code")
events = events.withColumnRenamed("_c23","Actor2Type2Code")
events = events.withColumnRenamed("_c24","Actor2Type3Code")
events = events.withColumnRenamed("_c25","IsRootEvent")
events = events.withColumnRenamed("_c26","EventCode")
events = events.withColumnRenamed("_c27","EventBaseCode")
events = events.withColumnRenamed("_c28","EventRootCode")
events = events.withColumnRenamed("_c29","QuadClass")
events = events.withColumnRenamed("_c30","GoldsteinScale")
events = events.withColumnRenamed("_c31","NumMentions")
events = events.withColumnRenamed("_c32","NumSources")
events = events.withColumnRenamed("_c33","NumArticles")
events = events.withColumnRenamed("_c34","AvgTone")
events = events.withColumnRenamed("_c35","Actor1Geo_Type")
events = events.withColumnRenamed("_c36","Actor1Geo_FullName")
events = events.withColumnRenamed("_c37","Actor1Geo_CountryCode")
events = events.withColumnRenamed("_c38","Actor1Geo_ADM1Code")
events = events.withColumnRenamed("_c39","Actor1Geo_Lat")
events = events.withColumnRenamed("_c40","Actor1Geo_Long")
events = events.withColumnRenamed("_c41","Actor1Geo_FeatureID")
events = events.withColumnRenamed("_c42","Actor2Geo_Type")
events = events.withColumnRenamed("_c43","Actor2Geo_FullName")
events = events.withColumnRenamed("_c44","Actor2Geo_CountryCode")
events = events.withColumnRenamed("_c45","Actor2Geo_ADM1Code")
events = events.withColumnRenamed("_c46","Actor2Geo_Lat")
events = events.withColumnRenamed("_c47","Actor2Geo_Long")
events = events.withColumnRenamed("_c48","Actor2Geo_FeatureID")
events = events.withColumnRenamed("_c49","ActionGeo_Type")
events = events.withColumnRenamed("_c50","ActionGeo_FullName")
events = events.withColumnRenamed("_c51","ActionGeo_CountryCode")
events = events.withColumnRenamed("_c52","ActionGeo_ADM1Code")
events = events.withColumnRenamed("_c53","ActionGeo_Lat")
events = events.withColumnRenamed("_c54","ActionGeo_Long")
events = events.withColumnRenamed("_c55","ActionGeo_FeatureID")
events = events.withColumnRenamed("_c56","DATEADDED")
events = events.withColumnRenamed("_c57","SOURCEURL")

In [5]:
events.printSchema()

root
 |-- GLOBALEVENTID: integer (nullable = true)
 |-- SQLDATE: integer (nullable = true)
 |-- MonthYear: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- FractionDate: double (nullable = true)
 |-- Actor1Code: string (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1CountryCode: string (nullable = true)
 |-- Actor1KnownGroupCode: string (nullable = true)
 |-- Actor1EthnicCode: string (nullable = true)
 |-- Actor1Religion1Code: string (nullable = true)
 |-- Actor1Religion2Code: string (nullable = true)
 |-- Actor1Type1Code: string (nullable = true)
 |-- Actor1Type2Code: string (nullable = true)
 |-- Actor1Type3Code: string (nullable = true)
 |-- Actor2Code: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2CountryCode: string (nullable = true)
 |-- Actor2KnownGroupCode: string (nullable = true)
 |-- Actor2EthnicCode: string (nullable = true)
 |-- Actor2Religion1Code: string (nullable = true)
 |-- Actor2Religion2Code: 

Vamos a reparticionar el dataframe a  (tenemos 5 años) para demostrar cómo seria el data-skew. Tenemos dos años con muchísimos más articulos que los otros. Esto debería causar que spark distribuya no uniformenente las tareas.

In [6]:
1)leer la data
2)guardar particionando por otro campo (global_id) 


SyntaxError: invalid syntax (1784095425.py, line 1)

In [8]:
events.select("year").distinct().show()

21/08/07 19:41:16 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+----+
|year|
+----+
|2007|
|2018|
|2019|
|2020|
|2009|
|2016|
|1920|
|2010|
|2011|
|2008|
|2017|
|2021|
+----+



In [9]:
bad_partitioned = events.repartition(12,"year")

In [11]:
bad_partitioned.groupby("year").agg(F.sum("NumArticles").alias("cantidad")).show()



+----+---------+
|year| cantidad|
+----+---------+
|2019| 10448993|
|2021|310180246|
|2010|    52727|
|2009|     2511|
|2007|     2874|
|2016|   212436|
|2017| 31049903|
|2020|393183795|
|2018| 12254364|
|1920|      851|
|2011|    49013|
|2008|     1897|
+----+---------+



                                                                                

![unbalanced](images/unbalanced.png)

In [12]:
events.write.partitionBy("ActionGeo_CountryCode").option("header", "true").csv("../../data/excluded/partitioned_events")

21/08/08 17:57:08 ERROR TaskSchedulerImpl: Lost executor 2 on 172.19.0.6: worker lost
21/08/08 17:57:08 ERROR TaskSchedulerImpl: Lost executor 1 on 172.19.0.4: worker lost
21/08/08 17:57:08 ERROR TaskSchedulerImpl: Lost executor 0 on 172.19.0.5: worker lost


In [None]:
## leer particionado por cc 
## hacer group by por año (debería ser más lento)

In [21]:
well_partitioned = events.repartition(1200,"year")

In [22]:
well_partitioned.groupby("year").agg(F.sum("NumArticles").alias("cantidad")).show()



+----+---------+
|year| cantidad|
+----+---------+
|2020|132018050|
|2021|285954394|
|2011|    46935|
|2019|   798676|
|2010|    14898|
+----+---------+



                                                                                