## Création des RDD puis des dataframes à partir des fichiers stockés dans S3

In [2]:
import spark.implicits._
import sys.process._
import java.net.URL
import java.io.File
import java.io.File
import java.nio.file.{Files, StandardCopyOption}
import java.net.HttpURLConnection 
import org.apache.spark.sql.functions._

// Import connexion à S3
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs.s3.S3FileSystem
import com.amazonaws.services.s3.transfer.TransferManager

// Imports connexion Cassandra
import org.apache.spark.input.PortableDataStream
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._
import org.apache.spark.sql.types.IntegerType

// Imports chargement dataframe
import org.apache.spark.input.PortableDataStream
import java.util.zip.ZipInputStream
import java.io.BufferedReader
import java.io.InputStreamReader

import org.apache.spark.sql.functions.regexp_replace
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions

import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.FloatType

In [3]:
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.rdd.ReadConf

In [4]:
val connector = CassandraConnector(sc.getConf)

In [5]:
// dataframe event

val RDD_event = sc.binaryFiles("s3://celine-drevet-telecom/202003[0-9]*.export.CSV.zip"). // charger quelques fichers via une regex
   flatMap {  // decompresser les fichiers
       case (name: String, content: PortableDataStream) =>
          val zis = new ZipInputStream(content.open)
          Stream.continually(zis.getNextEntry).
                takeWhile(_ != null).
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
    
val DF_event = RDD_event.map(x=>x.split("\t")).toDF()

In [6]:
// dataframe event_trans

val RDD_event_trans = sc.binaryFiles("s3://celine-drevet-telecom/202003[0-9]*.translation.export.CSV.zip"). // charger quelques fichers via une regex
   flatMap {  // decompresser les fichiers
       case (name: String, content: PortableDataStream) =>
          val zis = new ZipInputStream(content.open)
          Stream.continually(zis.getNextEntry).
                takeWhile(_ != null).
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
    
val DF_event_trans = RDD_event_trans.map(x=>x.split("\t")).toDF()

In [7]:
// dataframe gkg

val RDD_gkg = sc.binaryFiles("s3://celine-drevet-telecom/202003[0-9]*.gkg.csv.zip"). // charger quelques fichers via une regex
   flatMap {  // decompresser les fichiers
       case (name: String, content: PortableDataStream) =>
          val zis = new ZipInputStream(content.open)
          Stream.continually(zis.getNextEntry).
                takeWhile{ case null => zis.close(); false
                case _  => true }.
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
    
val DF_gkg = RDD_gkg.map(x=>x.split("\t")).toDF()

In [8]:
// dataframe gkg_trans

val RDD_gkg_trans = sc.binaryFiles("s3://celine-drevet-telecom/202003[0-9]*.translation.gkg.csv.zip"). // charger quelques fichers via une regex
   flatMap {  // decompresser les fichiers
       case (name: String, content: PortableDataStream) =>
          val zis = new ZipInputStream(content.open)
          Stream.continually(zis.getNextEntry).
                takeWhile{ case null => zis.close(); false
                case _  => true }.
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
    
val DF_gkg_trans = RDD_gkg_trans.map(x=>x.split("\t")).toDF()

In [9]:
// dataframe mention

val RDD_mention = sc.binaryFiles("s3://celine-drevet-telecom/202003[0-9]*.mentions.CSV.zip"). // charger quelques fichers via une regex
   flatMap {  // decompresser les fichiers
       case (name: String, content: PortableDataStream) =>
          val zis = new ZipInputStream(content.open)
          Stream.continually(zis.getNextEntry).
                takeWhile(_ != null).
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
val DF_mention = RDD_mention.map(x=>x.split("\t")).toDF()

In [10]:
// dataframe mention_trans

val RDD_mention_trans = sc.binaryFiles("s3://celine-drevet-telecom/202003[0-9]*.translation.mentions.CSV.zip"). // charger quelques fichers via une regex
   flatMap {  // decompresser les fichiers
       case (name: String, content: PortableDataStream) =>
          val zis = new ZipInputStream(content.open)
          Stream.continually(zis.getNextEntry).
                takeWhile(_ != null).
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
val DF_mention_trans = RDD_mention_trans.map(x=>x.split("\t")).toDF()

In [11]:
// première sélection de colonnes des dataframes mention, event et gkg

//table mention
val mention_trans = DF_mention_trans.withColumn("_tmp", $"value").select(
    $"_tmp".getItem(0).as("globaleventid").cast("String"),
    $"_tmp".getItem(1).as("eventtimedate").cast("String"),
    $"_tmp".getItem(5).as("documentidentifier").cast("String")
//    $"_tmp".getItem(13).as("meantone"),
//    $"_tmp".getItem(14).as("language")
    )
    
val mention = DF_mention.withColumn("_tmp", $"value").select(
    $"_tmp".getItem(0).as("globaleventid").cast("String"),
    $"_tmp".getItem(1).as("eventtimedate").cast("String"),
    $"_tmp".getItem(5).as("documentidentifier").cast("String")
//    $"_tmp".getItem(13).as("meantone"),
//    $"_tmp".getItem(14).as("language")
    )

//table event
val event = DF_event.withColumn("_tmp", $"value").select(
    $"_tmp".getItem(0).as("globaleventid").cast("String"),
    $"_tmp".getItem(1).as("day").cast("String"),
//    $"_tmp".getItem(33).as("numarticles"),
    $"_tmp".getItem(52).as("fullnameactioncountry").cast("String"),
    $"_tmp".getItem(53).as("actioncountry").cast("String")
    )
    
val event_trans = DF_event_trans.withColumn("_tmp", $"value").select(
    $"_tmp".getItem(0).as("globaleventid").cast("String"),
    $"_tmp".getItem(1).as("day").cast("String"),
//    $"_tmp".getItem(33).as("numarticles"),
    $"_tmp".getItem(52).as("fullnameactioncountry").cast("String"),
    $"_tmp".getItem(53).as("actioncountry").cast("String")
    )
    
//table gkg
val gkg = DF_gkg.withColumn("_tmp", $"value").select(
//    $"_tmp".getItem(0).as("recordid"),
    $"_tmp".getItem(1).as("date").cast("String"),
    $"_tmp".getItem(3).as("sourcecommonname").cast("String"),
    $"_tmp".getItem(4).as("documentidentifier").cast("String"),
    $"_tmp".getItem(7).as("themes").cast("String"),
    $"_tmp".getItem(9).as("locations").cast("String"),
    $"_tmp".getItem(11).as("persons").cast("String"),
    $"_tmp".getItem(15).as("tone").cast("String"),
    $"_tmp".getItem(25).as("translationinfo").cast("String")
    )
    
val gkg_trans = DF_gkg_trans.withColumn("_tmp", $"value").select(
//    $"_tmp".getItem(0).as("recordid"),
    $"_tmp".getItem(1).as("date").cast("String"),
    $"_tmp".getItem(3).as("sourcecommonname").cast("String"),
    $"_tmp".getItem(4).as("documentidentifier").cast("String"),
    $"_tmp".getItem(7).as("themes").cast("String"),
    $"_tmp".getItem(9).as("locations").cast("String"),
    $"_tmp".getItem(11).as("persons").cast("String"),
    $"_tmp".getItem(15).as("tone").cast("String"),
    $"_tmp".getItem(25).as("translationinfo").cast("String")
    )

In [12]:
// union des dataframes traduits et en anglais

val df_event_tot = event.union(event_trans)
val df_mention_tot = mention.union(mention_trans)
val df_gkg_tot = gkg.union(gkg_trans)

## Requête 1

Afficher le nombre d’articles/événements qui parlent de COVID qu’il y a eu pour chaque triplet :
Jour (de publication de l'article)
pays de l’évènement
langue de l’article

In [15]:
//Description des étapes de transformation pour arriver à la table associée à la requête 1 :

//Etape 1 
//deuxième sélection des champs à partir de df_event_tot
val event_r1 = df_event_tot.select("globaleventid","day","fullnameactioncountry","actioncountry")

In [16]:
//Etape 2
//deuxième sélection des champs à partir de df_gkg_tot
val gkg_r1 = df_gkg_tot.select("documentidentifier","themes","translationinfo")

In [17]:
//traitement de la colonne translationinfo
def extract_language(translationinfo:String ) : String = {
    if (translationinfo == "")
        return "eng"
    else 
        return translationinfo.slice(6, 9)
}
val extract_language_udf = udf(extract_language _)
val gkg_r1_clean = gkg_r1.withColumn("translationinfo", extract_language_udf($"translationinfo")).drop("translationinfo2")

In [18]:
//Etape 3
//deuxième sélection des champs à partir de df_mention_tot
val mention_r1 = df_mention_tot.select("globaleventid","documentidentifier")

In [19]:
//Etape 4
//Jointure gauche table mention_r1 avec gkg_r1_clean 
val joint_r1 = mention_r1.join(gkg_r1_clean,Seq("documentidentifier"), "LeftOuter")
//--> champs de joint_r1 : documentidentifier, themes, translationinfo, globaleventid

In [20]:
//Etape 5
//Explode du champ theme
val joint_r1_clean = joint_r1.na.drop().withColumn("themes2",split(regexp_replace($"themes", "(^\\[\\[\\[)|(\\]\\]\\]$)", ""), ";")).drop("themes")
val explode_r1 = joint_r1_clean.withColumn("themes3", explode($"themes2")).drop("themes2")

In [21]:
//Etape 6
//Filtre sur themes qui correspondent au COVID
val filtre_r1 = explode_r1.where(upper($"themes3") like "%CORONAVIRUS%")
//.or(upper($"theme3") like "TAX_DISEASE_CORONAVIRUSES").or(upper($"theme3") like "TAX_DISEASE_CORONAVIRUS_INFECTIONS")

In [22]:
//Etape 7
//jointure gauche filtre_r1  & event_r1
val joint2_r1 = filtre_r1.join(event_r1,Seq("globaleventid"),"LeftOuter").drop("themes3")
//--> champs de joint2_r1 : documentidentifier, translationinfo, globaleventid, day, fullnameactioncountry, actioncountry

In [23]:
//Etape 7
//final_r1 : à partir de filtre_r1, count(article) group by (globaleventid, day, actioncountry, translation_info) --> nouveau champ numarticles
val final_r1 = joint2_r1.groupBy("globaleventid","day", "actioncountry", "translationinfo").agg(countDistinct("documentidentifier").as("numarticles"))

In [24]:
//connector.withSessionDo { session => session.execute("CREATE KEYSPACE Gdelt WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};") }

In [25]:
connector.withSessionDo { session => session.execute("DROP TABLE IF EXISTS gdelt.table_r1;") }

In [26]:
connector.withSessionDo { session => session.execute("CREATE TABLE gdelt.table_r1 (globaleventid text, day text, actioncountry text, language text, numarticles int, PRIMARY KEY ((day, actioncountry, language),globaleventid));") }

In [27]:
val final_r1_bis = final_r1.na.drop().withColumnRenamed("translationinfo","language")

In [28]:
final_r1_bis.write.cassandraFormat("table_r1", "gdelt").mode("overwrite").option("confirm.truncate","true").save()

## Requête 2

Pour un pays donné en paramètre, affichez les évènements qui y ont eu place triés par le nombre de mentions (tri décroissant)
Permettez une agrégation par jour/mois/année


In [31]:
def compute_month(date:String) : String = {
    if (date.length > 6)
        return date.slice(0,6)
    else
        return ""
}

val compute_month_udf = udf(compute_month _)

def compute_year(date:String) : String = {
        if (date.length > 4)
        return date.slice(0,4)
    else
        return ""
}

val compute_year_udf = udf(compute_year _)

In [32]:
//Description des étapes de transformation pour arriver à la table associée à la requête 2 :

//Etape 1 
//deuxième sélection des champs à partir de df_event_tot
val event_r2 = df_event_tot.select("globaleventid","day","fullnameactioncountry","actioncountry")

In [33]:
val event_r2_bis = event_r2.withColumn("monthyear", compute_month_udf($"day")).withColumn("year", compute_year_udf($"day"))

In [34]:
//Etape 2
//mention_r2 : on garde les champs globaleventid, mentionidentifier
val mention_r2 = df_mention_tot.select("globaleventid","documentidentifier")

In [35]:
//Etape 3
//joint_r2: jointure entre mention_r2 et event_r2 sur globaleventid
//1 event peut être présent plusieurs fois dans mention --> on veut conserver toutes les lignes de mention et y ajouter la date et actioncountry selon l'événement concerné
val joint_r2 = mention_r2.join(event_r2_bis,Seq("globaleventid"),"LeftOuter")

In [36]:
//Etape 4
//on groupe par événement/pays/jour/mois/annee/evenement avec un count du nombre de ligne pour chacun de ces groupements 
//(étant donné qu'on souhaite le nombre de mentions pas besoin de faire un count distinct sur documentidentifier car on veut bien compter les mentions d'un mm événement faites dans un mm article)
val final_r2 = joint_r2.groupBy("globaleventid","actioncountry","day","monthyear","year").count().sort(desc("count"))//.as("nummentions")

In [37]:
val final_r2_bis = final_r2.na.drop().withColumnRenamed("count","nummentions")

In [38]:
connector.withSessionDo { session => session.execute("DROP TABLE IF EXISTS gdelt.table_r2;") }

In [39]:
connector.withSessionDo { session => session.execute("CREATE TABLE gdelt.table_r2 (actioncountry text, day int, monthyear int, year int, nummentions int, globaleventid text, PRIMARY KEY ((actioncountry, day, monthyear, year),globaleventid));") }

In [40]:
final_r2_bis.write.cassandraFormat("table_r2", "gdelt").mode("overwrite").option("confirm.truncate","true").save()

## Requête 3

Pour une source de donnés passée en paramètre, affichez les thèmes, personnes, lieux dont les articles de cette source parlent ainsi que le nombre d’articles et le ton moyen des articles pour chaque thème/personne/lieu); permettez une agrégation par jour/mois/année.

### Themes

In [44]:
//Description des étapes de transformation pour arriver à la table associée à la requête 2 :

//Etape 0 (commune aux 3 sous-requêtes)
//mise en forme des colonnes tone et date
val gkg_r3 = df_gkg_tot.select("date","sourcecommonname","documentidentifier","themes","locations","persons","tone")
    
def split_tone(tone:String ) : Float = {
    val res = tone split ',' take 1
    return res(0).toFloat
}
val split_tone_udf = udf(split_tone _)

def compute_month(date:String) : String = {
    if (date.length > 6)
        return date.slice(0,6)
    else
        return ""
}

val compute_month_udf = udf(compute_month _)

def compute_year(date:String) : String = {
        if (date.length > 4)
        return date.slice(0,4)
    else
        return ""
}

val compute_year_udf = udf(compute_year _)

val gkgDF_R3_clean_tone = gkg_r3.na.drop().withColumn("tone2", split_tone_udf($"tone")).withColumn("date2", $"date".cast(StringType)).withColumn("monthyear", compute_month_udf($"date")).withColumn("year", compute_year_udf($"date")).drop("tone","date")

val gkgDF_R3_clean_tone_2 = gkgDF_R3_clean_tone.withColumn("tone3", $"tone2".cast(FloatType)).drop("tone2")

In [45]:
//Thèmes
//Etape 1
//deuxième sélection des champs à partir de df_gkg_tot
val gkg_r3_theme = gkgDF_R3_clean_tone_2.select("date2","monthyear","year","sourcecommonname","documentidentifier","themes","tone3")
val gkg_r3_theme_clean = gkg_r3_theme.na.drop().withColumn("themes2",split(regexp_replace($"themes", "(^\\[\\[\\[)|(\\]\\]\\]$)", ""), ";")).drop("themes")
val gkg_theme_explode_r3 = gkg_r3_theme_clean.withColumn("themes3", explode($"themes2")).drop("themes2")

In [46]:
//Etape 2
//deuxième sélection des champs à partir de df_mention_tot
val mention_r3 = df_mention_tot.select("documentidentifier")

In [47]:
//Etape 3
//jointure gauche entre mention_r3 et gkg_theme_explode_r3
val joint_r3_theme = mention_r3.join(gkg_theme_explode_r3,Seq("documentidentifier"),"LeftOuter")

In [48]:
//Etape 4
//groupby
val final_r3_theme = joint_r3_theme.groupBy("sourcecommonname","date2","monthyear","year","themes3").agg(count(($"documentidentifier")).as("numarticles"),mean($"tone3").as("meantone"))

In [49]:
val final_r3_theme_bis = final_r3_theme.na.drop()

In [50]:
connector.withSessionDo { session => session.execute("DROP TABLE IF EXISTS gdelt.table_r3_theme;") }

In [51]:
connector.withSessionDo { session => session.execute("CREATE TABLE gdelt.table_r3_theme (sourcecommonname text, date2 text, monthyear text, year text, themes3 text, numarticles int, meantone float, PRIMARY KEY ((sourcecommonname, date2, monthyear, year),themes3));") }

In [52]:
final_r3_theme_bis.write.cassandraFormat("table_r3_theme", "gdelt").mode("overwrite").option("confirm.truncate","true").save()

In [53]:
final_r3_theme_bis.show()

## Persons

In [55]:
//Persons
//Etape 1
//deuxième sélection des champs à partir de df_gkg_tot
val gkg_r3_persons = gkgDF_R3_clean_tone_2.select("date2","monthyear","year","sourcecommonname","documentidentifier","persons","tone3")
val gkg_r3_persons_bis = gkg_r3_persons.na.drop().withColumn("persons2",split(regexp_replace($"persons", "(^\\[\\[\\[)|(\\]\\]\\]$)", ""), ";")).drop("persons")
val gkg_persons_explode_r3 = gkg_r3_persons_bis.withColumn("persons3",explode($"persons2")).drop("persons2")

In [56]:
//Etape 2
//jointure gauche entre mention_r3 et gkg_persons_explode_r3
val joint_r3_person = mention_r3.join(gkg_persons_explode_r3,Seq("documentidentifier"),"LeftOuter")

In [57]:
//Etape 3
//groupby
val final_r3_persons = joint_r3_person.groupBy("sourcecommonname","date2","monthyear","year","persons3").agg(countDistinct(($"documentidentifier")).as("numarticles"),mean($"tone3").as("meantone"))
val final_r3_persons_bis = final_r3_persons.na.drop()

In [58]:
connector.withSessionDo { session => session.execute("DROP TABLE IF EXISTS gdelt.table_r3_persons;") }

In [59]:
connector.withSessionDo { session => session.execute("CREATE TABLE gdelt.table_r3_persons (sourcecommonname text, date2 text, monthyear text, year text, persons3 text, numarticles int, meantone float, PRIMARY KEY ((sourcecommonname, date2 , monthyear , year) ,persons3));") }

In [60]:
final_r3_persons_bis.write.cassandraFormat("table_r3_persons", "gdelt").mode("overwrite").option("confirm.truncate","true").save()

## Locations

In [62]:
//Locations
//Etape 1
//deuxième sélection des champs à partir de df_gkg_tot
val gkg_r3_locations = gkgDF_R3_clean_tone_2.select("date2","monthyear","year","sourcecommonname","documentidentifier","locations","tone3")
val gkg_r3_locations_clean = gkg_r3_locations.na.drop().withColumn("locations2",split(regexp_replace($"locations", "(^\\[\\[\\[)|(\\]\\]\\]$)", ""), ";")).drop("locations")
val gkg_locations_explode_r3 = gkg_r3_locations_clean.withColumn("locations3",explode($"locations2")).drop("locations2")

def split_location(location:String) : String = {
    if (location == null) {
        return ""
    }
    else if (location.length >= 1 && (location contains "#")) {
        var res = location split '#'
        var res2 = res(1)
        if (location contains ",") {
            var res3 = res2 split ','
            var res4 = res3(0)
            return res4
        }
        return res2
    }
    else {
        return location
    }
}

val split_location_udf = udf(split_location _)

val gkg_r3_locations_final = gkg_locations_explode_r3.na.drop().withColumn("locations4", split_location_udf($"locations3")).drop("locations3")

In [63]:
//Etape 2
//jointure gauche entre mention_r3 et gkg_r3_locations_final
val joint_r3_locations = mention_r3.join(gkg_r3_locations_final,Seq("documentidentifier"),"LeftOuter")

In [64]:
//Etape 3
//groupby
val final_r3_locations = joint_r3_locations.groupBy("sourcecommonname","date2","monthyear","year","locations4").agg(countDistinct(($"documentidentifier")).as("numarticles"),mean($"tone3").as("meantone"))
val final_r3_locations_bis = final_r3_locations.na.drop()

In [65]:
connector.withSessionDo { session => session.execute("DROP TABLE IF EXISTS gdelt.table_r3_locations;") }

In [66]:
connector.withSessionDo { session => session.execute("CREATE TABLE gdelt.table_r3_locations (sourcecommonname text, date2 text, monthyear text, year text, locations4 text, numarticles int, meantone float, PRIMARY KEY ((sourcecommonname, date2 , monthyear , year) ,locations4));") }

In [67]:
final_r3_locations_bis.write.cassandraFormat("table_r3_locations", "gdelt").mode("overwrite").option("confirm.truncate","true").save()