# Extract (Depuis AWS S3) - Transform - Load (Dans MongoDB) STEP

In [1]:
# IMPORTS

import findspark
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

findspark.init() 

# AWS - HADOOP CONFIGURATION

hadoopConf = sc._jsc.hadoopConfiguration()
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
hadoopConf.set("fs.s3a.region",  "eu-west-3")
hadoopConf.set("fs.s3a.endpoint", "s3.eu-west-3.amazonaws.com")
hadoopConf.set("com.amazonaws.services.s3a.enableV4", "true")
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.store.audit.AuditSpanSource", "org.apache.hadoop.fs.store.audit.AuditSpanSource")
hadoopConf.set('fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')

# Spark Session
sql = SparkSession(sc) 

In [3]:
# Lecture de tout les csv dans les dossier export / mentions / gkg (La concaténation se fait automatiquement :D)

dfe = sql.read.csv('s3a://gdelt-nosql/export/*.CSV', sep =',')   # Un seul dataframe (export) complet 
dfm = sql.read.csv('s3a://gdelt-nosql/mentions/*.CSV', sep =',') # Un seul dataframe (mentions) complet 
dfg = sql.read.csv('s3a://gdelt-nosql/gkg/*.csv', sep =',') # Un seul dataframe (gkg) complet 

                                                                                

## Preprocessing

### Preprocessing dataframe mentions 

In [4]:
mentions = dfm \
    .selectExpr("_c1 as GLOBALEVENTID", "_c2 as EventTimeDate", "_c3 as MentionTimeDate") \
    .filter(F.col('GLOBALEVENTID') != '0') \
    .withColumn('EventTimeDate', F.to_timestamp(F.col('EventTimeDate'), 'yyyMMddHHmmss')) \
    .withColumn('MentionTimeDate', F.to_timestamp(F.col('MentionTimeDate'), 'yyyMMddHHmmss'))\
    .withColumn('Day_mentions', F.dayofmonth(F.col('EventTimeDate'))) \
    .withColumn('Month_mentions', F.month(F.col('EventTimeDate'))) \
    .withColumn('Year_mentions', F.year(F.col('EventTimeDate'))) \
    .na.drop() 

mentions.show(10)

+-------------+-------------------+-------------------+------------+--------------+-------------+
|GLOBALEVENTID|      EventTimeDate|    MentionTimeDate|Day_mentions|Month_mentions|Year_mentions|
+-------------+-------------------+-------------------+------------+--------------+-------------+
|    410514364|2015-02-19 09:30:00|2015-02-19 09:30:00|          19|             2|         2015|
|    410514365|2015-02-19 09:30:00|2015-02-19 09:30:00|          19|             2|         2015|
|    410476968|2015-02-19 03:15:00|2015-02-19 09:30:00|          19|             2|         2015|
|    410514366|2015-02-19 09:30:00|2015-02-19 09:30:00|          19|             2|         2015|
|    410514367|2015-02-19 09:30:00|2015-02-19 09:30:00|          19|             2|         2015|
|    410476969|2015-02-19 03:15:00|2015-02-19 09:30:00|          19|             2|         2015|
|    410514368|2015-02-19 09:30:00|2015-02-19 09:30:00|          19|             2|         2015|
|    410514369|2015-

                                                                                

### Preprocessing dataframe export

In [5]:
export = dfe \
        .selectExpr("_c1 as GLOBALEVENTID", "_c2 as SQLDATE", "_c4 as Actor1Code", "_c5 as Actor2Code", "_c6 as ActionGeo_CountryCode") \
        .withColumn('SQLDATE', F.to_date(F.col('SQLDATE'), "yyyyMMdd")) \
        .withColumn('Day_export', F.dayofmonth(F.col('SQLDATE'))) \
        .na.drop()

export.show(10)

+-------------+----------+----------+----------+---------------------+----------+
|GLOBALEVENTID|   SQLDATE|Actor1Code|Actor2Code|ActionGeo_CountryCode|Day_export|
+-------------+----------+----------+----------+---------------------+----------+
|    410514394|2014-02-19|       AFG|       IRN|                   AF|        19|
|    410514395|2014-02-19|       AFG|       IRN|                   IR|        19|
|    410514396|2014-02-19|       AFG|       IRN|                   IR|        19|
|    410514397|2014-02-19|       AFR|       ARE|                   SF|        19|
|    410514399|2014-02-19|       ARE|    AREGOV|                   AE|        19|
|    410514400|2014-02-19|       ARE|       GOV|                   AE|        19|
|    410514401|2014-02-19|       ARE|       GOV|                   BA|        19|
|    410514402|2014-02-19|       ARE|       GOV|                   AE|        19|
|    410514403|2014-02-19|       ARE|       GOV|                   BA|        19|
|    410514404|2

### Preprocessing dataframe GKG

In [6]:
gkg = \
    dfg.selectExpr("_c1 as GKGRECORDID", "_c2 as DATE", "_c3 as SourceCommonName", "_c4 as Themes", 
               "_c5 as Locations", "_c6 as Persons", '_c7 as V2Tone') \
    .na.drop()\
    .filter(F.col('GKGRECORDID') != '0') \
    .withColumn('DATE', F.to_timestamp(F.col('DATE'), 'yyyMMddHHmmss'))\
    .withColumn('Day_gkg', F.dayofmonth(F.col('DATE'))) \
    .withColumn('Month_gkg', F.month(F.col('DATE'))) \
    .withColumn('Year_gkg', F.year(F.col('DATE'))) \
    .withColumn('Themes', F.split(F.col('Themes'), ';'))\
    .withColumn('Persons', F.split(F.col('Persons'), ';'))

#.withColumn('Locations', F.regexp_replace(F.split(F.col('Locations'), '#').getItem(3),"\d",''))\

gkg.show(10)

[Stage 12:>                                                         (0 + 1) / 1]

+-----------------+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+-------+---------+--------+
|      GKGRECORDID|               DATE|SourceCommonName|              Themes|           Locations|             Persons|              V2Tone|Day_gkg|Month_gkg|Year_gkg|
+-----------------+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+-------+---------+--------+
| 20150219093000-2|2015-02-19 09:30:00|  BBC Monitoring|[GENERAL_GOVERNME...|4#Berlin, Berlin,...|    [denys pushylin]|1.91387559808612,...|     19|        2|    2015|
| 20150219093000-3|2015-02-19 09:30:00|  BBC Monitoring|[MEDIA_MSM, LEADE...|3#Washington, Dis...|[barack obama, ba...|-5.40540540540541...|     19|        2|    2015|
| 20150219093000-5|2015-02-19 09:30:00|  BBC Monitoring|[TERROR, ARMEDCON...|1#Russia#AF#RS#60...|  [petro poroshenko]|-7.11462450592885...|     19|        2|  

                                                                                

## Requêtes

### Requête 1)

Afficher le nombre d’articles/évènements qu’il y a eu pour chaque triplet (jour, pays de l’évènement, langue de l’article).

In [7]:
requete_1 =  export \
    .groupby('ActionGeo_CountryCode', 'SQLDATE') \
    .agg(F.count('GLOBALEVENTID').alias('count')) 

requete_1.show(10)



+---------------------+----------+-----+
|ActionGeo_CountryCode|   SQLDATE|count|
+---------------------+----------+-----+
|                   SY|2015-01-31|   19|
|                   MG|2015-02-27|   51|
|                   QA|2015-02-23|  479|
|                   FR|2015-01-22|   13|
|                   DR|2015-02-21|   29|
|                   SW|2015-02-19|  295|
|                   SP|2015-02-26|  914|
|                   VE|2015-03-04|  410|
|                   CH|2015-02-23| 2660|
|                   ET|2015-02-25|  244|
+---------------------+----------+-----+
only showing top 10 rows



                                                                                

### Requête 2)

Pour un pays donné en paramètre, affichez les évènements qui y ont eu place triées par le nombre de mentions (tri décroissant); permettez une agrégation par jour/mois/année

In [8]:
requete_2 = mentions \
        .join(export, on = 'GLOBALEVENTID') \
        .select('GLOBALEVENTID', 'Actor1Code', 'EventTimeDate', 'Year_mentions', 'Month_mentions', 'Day_mentions') \
        .na.drop() \
        .groupby('Actor1Code', 'GLOBALEVENTID') \
        .agg(F.count('EventTimeDate').alias('count')) \
        .orderBy(F.desc('count')) 

requete_2.show(10)



+----------+-------------+-----+
|Actor1Code|GLOBALEVENTID|count|
+----------+-------------+-----+
|       TUR|    412380110|  738|
|       USA|    412375992|  737|
|    USAMED|    412116558|  644|
|       JUD|    414062207|  621|
|    USAMED|    414025995|  598|
|       TUR|    412386134|  593|
|       USA|    412383429|  587|
|       USA|    412383430|  587|
|       JUD|    412383419|  585|
|       OPP|    413009589|  571|
+----------+-------------+-----+
only showing top 10 rows



                                                                                

### Requête 3)

Pour une source de donnés passée en paramètre (gkg.SourceCommonName) affichez les thèmes, personnes, lieux dont les articles de cette sources parlent ainsi que le nombre d’articles et le ton moyen des articles (pour chaque thème/personne/lieu); permettez une agrégation par jour/mois/année.

In [9]:
requete_3 = gkg \
        .withColumn('V2Tone', F.split(F.col('V2Tone'), ',').getItem(0)) \
        .select('DATE', 'SourceCommonName', 'Themes', 'Locations', 'Persons', 'V2Tone', 'Year_gkg', 'Month_gkg', 'Day_gkg')

requete_3.show(10)

+-------------------+----------------+--------------------+--------------------+--------------------+-----------------+--------+---------+-------+
|               DATE|SourceCommonName|              Themes|           Locations|             Persons|           V2Tone|Year_gkg|Month_gkg|Day_gkg|
+-------------------+----------------+--------------------+--------------------+--------------------+-----------------+--------+---------+-------+
|2015-02-19 09:30:00|  BBC Monitoring|[GENERAL_GOVERNME...|4#Berlin, Berlin,...|    [denys pushylin]| 1.91387559808612|    2015|        2|     19|
|2015-02-19 09:30:00|  BBC Monitoring|[MEDIA_MSM, LEADE...|3#Washington, Dis...|[barack obama, ba...|-5.40540540540541|    2015|        2|     19|
|2015-02-19 09:30:00|  BBC Monitoring|[TERROR, ARMEDCON...|1#Russia#AF#RS#60...|  [petro poroshenko]|-7.11462450592885|    2015|        2|     19|
|2015-02-19 09:30:00|  BBC Monitoring|        [ELECTION, ]|1#Afghanistan#AF#...|    [zakaria hasani]|  1.0989010989011

### Requête 4) 

Étudiez l’évolution des relations entre deux pays (specifies en paramètre) au cours de l’année. Vous pouvez vous baser sur la langue de l’article, le ton moyen des articles, les themes plus souvent citées, les personalités ou tout element qui vous semble pertinent.

In [9]:
requete_4_bis = gkg \
        .withColumn('V2Tone', F.split(F.col('V2Tone'), ',').getItem(0)) \
        .withColumn('country2', F.regexp_replace(F.split(F.col('Locations'), '#').getItem(3),"\d",''))\
        .withColumn('country1', F.regexp_replace(F.split(F.col('Locations'), '#').getItem(2),"\d",''))\
        .withColumn('V2Tone', F.round(F.col('V2Tone'), 3)) \
        .select('DATE', 'V2Tone', 'country1', 'country2') \
        .sort(F.col('country1'), F.col('country2')) \
        .filter(F.col('country1') != '') \
        .withColumn('country_pair', F.split(F.concat_ws('_', F.col('country1'), F.col('country2')), "_")) \
        .withColumn('country_pair', F.sort_array(F.col('country_pair'))) \
        .withColumn('day', F.dayofmonth(F.col('DATE'))) \
        .withColumn('month', F.month(F.col('DATE'))) \
        .groupby('month','day','country_pair') \
        .agg(F.mean('V2Tone').alias('avg_V2tone')) \
        .na.drop() \
        
#.filter((F.col('country1') != '')) \

requete_4_bis.show()



+-----+---+------------+--------------------+
|month|day|country_pair|          avg_V2tone|
+-----+---+------------+--------------------+
|    3|  2|    [AE, AE]|  0.5487471482889731|
|    2| 22|    [AE, MU]|   3.356333333333333|
|    2| 22|    [AE, MX]|               1.749|
|    2| 23|    [AE, SP]|               1.692|
|    2| 26|    [AF, GM]| -0.2873333333333334|
|    2| 25|    [AF, GM]| -3.9320000000000004|
|    2| 26|   [AF, UKR]|              -4.724|
|    2| 19|   [AG, RPD]|               1.682|
|    2| 23|  [AG, USNY]|              -8.543|
|    2| 19|    [AL, AS]|               2.496|
|    2| 25|    [AL, JA]|              -1.615|
|    2| 24|    [AM, RS]| -1.9733333333333332|
|    2| 21|  [AO, USVT]|               0.262|
|    3|  1|    [AQ, WS]|              -1.145|
|    2| 21|    [AR, CD]|              -4.582|
|    2| 26|    [AR, US]|  1.3786923076923072|
|    2| 23|  [AR, USDC]| -1.9340285714285719|
|    2| 27|    [AC, CA]|              -1.142|
|    2| 26|   [AC, UKH]|          

                                                                                

### Requête4

In [None]:
"""dfg_req4 = gkg.select("DATE", "SourceCommonName", "Themes", "Persons", "Locations", "V2Tone") \
    .withColumn('date', F.date_format(F.col('DATE'), 'yyyy-MM-dd')) \
    .withColumn('V2Tone', F.split(F.col('V2Tone'), ',').getItem(0)) \
    .withColumn('Themes', F.array_remove(F.col('Themes'), "")) 

total_freq_per_loca_th = dfg_req4 \
    .select("Locations", "Themes") \
    .withColumn('theme_value', F.explode(F.col('Themes'))) \
    .groupby("Locations") \
    .agg(F.count('theme_value').alias('total_frequence')) \
    .orderBy("Locations",'total_frequence')

requete_4 = dfg_req4 \
    .select("Locations", "Themes", "date") \
    .withColumn('theme_value', F.explode(F.col('Themes'))) \
    .groupby("Locations", "date", "theme_value") \
    .count() \
    .join(total_freq_per_loca_th, on = ['Locations']) \
    .withColumn('perc_frequence_th', F.format_string('%.5f', F.col('count') / F.col('total_frequence'))) \
    .select('date', F.col('Locations').alias('locations'), F.col('theme_value').alias('themes'), 'perc_frequence_th') \
    .withColumn('date', F.to_date(F.col('date'))) \
    .sort(F.col('perc_frequence_th').desc(), F.col('date').asc()) \
    .na.drop() 

requete_4.show(truncate = False, n = 10)"""

## Push des dataframes (requete1, requete2, requete3 et requete4, requete4bis) sur notre cluster MongoDB

### On obtient par exemple sur mongoDB dans la base de données gdelt la collection requete1

In [20]:
### REQUETE 1 
requete_1 \
    .write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://192.168.3.99:27019/gdelt.requete1") \
    .mode("append") \
    .save()

                                                                                

In [24]:
### REQUETE 2 
requete_2 \
    .write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://192.168.3.99:27019/gdelt.requete2") \
    .mode("append") \
    .save()

                                                                                

In [27]:
### REQUETE 3
requete_3 \
    .write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://192.168.3.99:27019/gdelt.requete3") \
    .mode("append") \
    .save()

                                                                                

In [28]:
### REQUETE 4
requete_4 \
    .write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://192.168.3.99:27019/gdelt.requete4") \
    .mode("append") \
    .save() 

                                                                                

In [10]:
### REQUETE 4 BIS
requete_4_bis \
    .write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://192.168.3.99:27019/gdelt.requete4bis") \
    .mode("append") \
    .save() 

                                                                                

In [None]:
"""total_freq_per_loca_pe = dfg_req3 \
    .select('Locations', 'Persons') \
    .withColumn('theme_value', F.explode(F.col('Persons'))) \
    .groupby('Locations') \
    .agg(F.count('Persons').alias('total_frequence')) \
    .orderBy('Locations', 'total_frequence')

person_perc_per_loc = dfg_req3 \
    .select('Locations', 'Persons') \
    .withColumn('names', F.explode(F.col('Persons'))) \
    .groupby('Locations', 'names') \
    .count() \
    .join(total_freq_per_loca_pe, on = "Locations") \
    .withColumn('perc_frequence_pe', F.format_string('%.5f', F.col('count') / F.col('total_frequence'))) \
    .select(F.col('Locations').alias('locations'), 'names', 'perc_frequence_pe')
"""



"""dfg_req4 = gkg.select("DATE", "SourceCommonName", "Themes", "Persons", "Locations", "V2Tone") \
    .withColumn('date', F.date_format(F.col('DATE'), 'yyyy-MM-dd')) \
    .withColumn('V2Tone', F.split(F.col('V2Tone'), ',').getItem(0)) \
    .withColumn('Themes', F.array_remove(F.col('Themes'), "")) 

total_freq_per_loca_th = dfg_req4 \
    .select("Locations", "Themes") \
    .withColumn('theme_value', F.explode(F.col('Themes'))) \
    .groupby("Locations") \
    .agg(F.count('theme_value').alias('total_frequence')) \
    .orderBy("Locations",'total_frequence')

requete_4 = dfg_req4 \
    .select("Locations", "Themes", "date") \
    .withColumn('theme_value', F.explode(F.col('Themes'))) \
    .groupby("Locations", "date", "theme_value") \
    .count() \
    .join(total_freq_per_loca_th, on = ['Locations']) \
    .withColumn('perc_frequence_th', F.format_string('%.5f', F.col('count') / F.col('total_frequence'))) \
    .select('date', F.col('Locations').alias('locations'), F.col('theme_value').alias('themes'), 'perc_frequence_th') \
    .withColumn('date', F.to_date(F.col('date'))) \
    .sort(F.col('perc_frequence_th').desc(), F.col('date').asc()) \
    .na.drop() 

requete_4.show(truncate = False, n = 10)"""