In [0]:
import sys.process._
import java.net.{URL, HttpURLConnection}
import java.nio.file.{Files,StandardCopyOption}
import java.io.File

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SQLContext
import org.apache.spark.input.PortableDataStream
import java.util.zip.ZipInputStream
import java.io.BufferedReader
import java.io.InputStreamReader

In [1]:
// MENTIONS : Données en anglais + translingual
val Mentions_RDD = sc.binaryFiles("/Volumes/ARMAND/gdelt_project_data/2021*.mentions.CSV.zip")
    .flatMap { case (name: String, content: PortableDataStream) =>
    val zis = new ZipInputStream(content.open)
    Stream.continually(zis.getNextEntry)
          .takeWhile(_ != null)
          .flatMap { _ =>
              val br = new BufferedReader(new InputStreamReader(zis))
              Stream.continually(br.readLine()).takeWhile(_ != null)
          }
  }
val Mentions_tmp = Mentions_RDD.map(x => x.split("\t")).toDF()   


// EVENTS: Données en anglais + translingual
val Events_RDD = sc.binaryFiles("/Volumes/ARMAND/gdelt_project_data/2021*.export.CSV.zip")
   .flatMap { case (name: String, content: PortableDataStream) =>
    val zis = new ZipInputStream(content.open)
    Stream.continually(zis.getNextEntry)
          .takeWhile(_ != null)
          .flatMap { _ =>
              val br = new BufferedReader(new InputStreamReader(zis))
              Stream.continually(br.readLine()).takeWhile(_ != null)
          }
  }
val Events_tmp = Events_RDD.map(x => x.split("\t")).toDF()


// GKG : Données en anglais
val Gkg_RDD = sc.binaryFiles("/Volumes/ARMAND/gdelt_project_data/2021*.gkg.csv.zip").
  flatMap {  // decompresser les fichiers
       case (name: String, content: PortableDataStream) =>
          val zis = new ZipInputStream(content.open)
          Stream.continually(zis.getNextEntry).
                takeWhile{case null => zis.close(); false
                       case _ => true}.
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
val Gkg_tmp = Gkg_RDD.map(_.split("\t")).toDF()

In [2]:
// Champs de la table events récupérés :
val Events_DF_bis = Events_tmp.select(
    $"value".getItem(0).as("Globaleventid"),
    $"value".getItem(1).as("YearMonthDay"),
    $"value".getItem(2).as("YearMonth"),
    $"value".getItem(3).as("Year"),
    $"value".getItem(31).as("NumMention"),
    $"value".getItem(53).as("Country"),
    $"value".getItem(60).as("SourceUrl"),
    $"value".getItem(30).as("GoldsteinScore")
    )
    
    
// Champs de la table Mentions récupérés :
val Mentions_DF_bis = Mentions_tmp.select(
    $"value".getItem(0).as("Globaleventid"),
    $"value".getItem(1).as("EventTimeDate"),
    $"value".getItem(2).as("MentionTimeDate"),
    $"value".getItem(5).as("MentionIdentifier"),
    $"value".getItem(14).as("ArticleLanguage")
    )
    
// Champs de la table Gkg récupérés :  
val Gkg_DF_bis = Gkg_tmp.select(
    $"value".getItem(0).as("GkgRecordId"), 
    //$"value".getItem(1).as("DATE"),
    $"value".getItem(3).as("SourceCommonName"),
    $"value".getItem(4).as("DocumentIdentifier"),
    $"value".getItem(7).as("Themes"),
    $"value".getItem(9).as("Locations"),
    $"value".getItem(11).as("Persons"),
    $"value".getItem(15).as("Tone")
    )
   

   



In [3]:
// Preprocessing de la colonne Day, MonthYear et NumMention pour Events      
val Events_DF = Events_DF_bis.withColumn("NumMention", Events_DF_bis("NumMention").cast("Int"))

/*.withColumn("Day", substring(col("Day"), 7 , 2))    .withColumn("MonthYear", substring(col("MonthYear"), 5 , 2))*/

// Preprocessing de la colonne Language pour Mentions
val Mentions_DF = Mentions_DF_bis.withColumn("ArticleLanguage", when(col("ArticleLanguage").isNull, "eng").otherwise(substring_index(substring_index(col("ArticleLanguage"), ";",1) , ":", -1)))

// Preprocessing de la colonne Locations et Tone pour Gkg
val Gkg_DF_bis2 = Gkg_DF_bis.withColumn("Locations", when(col("Locations").isNull, "").otherwise(substring_index(substring_index(col("Locations"), "#", 2) , "#", -1))).withColumn("Tone", substring_index(col("Tone"), "," , 1))

In [4]:
// Preprocessing de la colonne Tone pour Gkg
val Gkg_DF = Gkg_DF_bis2.withColumn("Tone", Gkg_DF_bis2("Tone").cast("float"))

In [5]:
Events_DF.show()

In [6]:
Mentions_DF.show()

In [7]:
Gkg_DF.show()

In [8]:
// Nombre d'articles parlant du covid + nombre d'évènements liés au covid

val Q4_Gkg_int = Gkg_DF.filter($"Themes".like("%CORONAVIRUS%"))
val Q4_join = Mentions_DF.join(Q4_Gkg_int, $"MentionIdentifier" === $"DocumentIdentifier")
val Q4_final = Events_DF.join(Q4_join, "Globaleventid")
Q4_final.createOrReplaceTempView("Table4")
z.show(spark.sql(""" SELECT count(Globaleventid) as ArticlesNumber, count(DISTINCT Globaleventid) as EventsNumber  FROM Table4 WHERE Country = 'FR' """))

In [9]:
// Nombre d'évènements totaux sur ces 2j

Events_DF.createOrReplaceTempView("Table_Events")
z.show(spark.sql(""" SELECT count(DISTINCT Globaleventid) as EventsNumber  FROM Table_Events WHERE Country = 'FR' """))

In [10]:
// Nombre d'articles totaux sur ces 2j

val Q4_join_1 = Mentions_DF.join(Gkg_DF, $"MentionIdentifier" === $"DocumentIdentifier")
val Q4_final_1 = Events_DF.join(Q4_join_1, "Globaleventid")
Q4_final_1.createOrReplaceTempView("Table5")
//Nombre de mentions
z.show(spark.sql(""" SELECT count(Globaleventid) as ArticlesNumber  FROM Table5 WHERE Country = 'FR'"""))

In [11]:
// Score GS et Ton moyen sur ces 2j

z.show(spark.sql(""" SELECT mean(GoldsteinScore) as MeanGS, mean(Tone) as MeanTone  FROM Table4 WHERE Country = 'FR'"""))