In [20]:
val df = spark.read.option("header", true)
            .csv("/home/panos/Downloads/Greek_Parliament_Proceedings_1989_2020_DataSample.csv")
            .select("member_name", "sitting_date", "speech")
            .na
            .drop

df: org.apache.spark.sql.DataFrame = [member_name: string, sitting_date: string ... 1 more field]


In [21]:
df.printSchema

root
 |-- member_name: string (nullable = true)
 |-- sitting_date: string (nullable = true)
 |-- speech: string (nullable = true)



In [22]:
// keep rows of a specific year
import org.apache.spark.sql.functions.{to_date, to_timestamp}

val df_date = df.withColumn("date_y", to_date($"sitting_date", "dd/MM/yyyy")).drop("sitting_date")

import org.apache.spark.sql.functions.{to_date, to_timestamp}
df_date: org.apache.spark.sql.DataFrame = [member_name: string, speech: string ... 1 more field]


In [23]:
df_date.printSchema

root
 |-- member_name: string (nullable = true)
 |-- speech: string (nullable = true)
 |-- date_y: date (nullable = true)



In [60]:
val year = 2015 // args(0).toInt

val speechesDF = df_date.where(s"year(date_y) == ${year}").groupBy("member_name")
                    .agg(concat_ws(",", collect_list("speech")).as("speeches"))

year: Int = 2015
speechesDF: org.apache.spark.sql.DataFrame = [member_name: string, speeches: string]


In [61]:
val cleanSpeechesDF = speechesDF.withColumn("speechesClean", regexp_replace($"speeches", "[\\_,\\*,\\$,\\#,\\@,\\&]", ""))

cleanSpeechesDF: org.apache.spark.sql.DataFrame = [member_name: string, speeches: string ... 1 more field]


In [62]:
cleanSpeechesDF.show

+-------------------+--------------------+--------------------+
|        member_name|            speeches|       speechesClean|
+-------------------+--------------------+--------------------+
|σακοραφα ηλια σοφια| Κλείστε, κύριε σ...| Κλείστε κύριε συ...|
+-------------------+--------------------+--------------------+



In [63]:
import org.apache.spark.ml.feature.RegexTokenizer

val speechesDF_tok = new RegexTokenizer().setInputCol("speechesClean")
                                            .setOutputCol("speechesTok")
                                            .setMinTokenLength(4)
                                            .setToLowercase(true)
                                            .setPattern("[\\s.,!-~'\";*^%$@()&<>/+_ ]")
                                            .transform(cleanSpeechesDF)

import org.apache.spark.ml.feature.RegexTokenizer
speechesDF_tok: org.apache.spark.sql.DataFrame = [member_name: string, speeches: string ... 2 more fields]


In [64]:
speechesDF_tok.show

+-------------------+--------------------+--------------------+--------------------+
|        member_name|            speeches|       speechesClean|         speechesTok|
+-------------------+--------------------+--------------------+--------------------+
|σακοραφα ηλια σοφια| Κλείστε, κύριε σ...| Κλείστε κύριε συ...|[κλείστε, κύριε, ...|
+-------------------+--------------------+--------------------+--------------------+



#### Filter stopwords!

In [65]:
val stopwords : Set[String] = sc.textFile("stopwords.txt").collect.toSet[String]

import spark.implicits._

val filter_stopwords_udf = udf ( (v : scala.collection.mutable.WrappedArray[String]) => v.filterNot(w => stopwords contains w) )

val speechesFilteredDF = speechesDF_tok.withColumn("speechesTok1", filter_stopwords_udf(speechesDF_tok("speechesTok")))

stopwords: Set[String] = Set(πολιτική, όλα, κάνουν, οποίο, πριν, στην, μέσω, ελληνική, ελλάδα, λίγο, ίδιο, εδώ, όλους, πως, ζωή, μου, όταν, για, ώστε, πολλές, θέμα, αποτέλεσμα, πάνω, χωρίς, νέα, υπάρχει, απόφαση, τότε, γιατί, αυτή, του, ήδη, περισσότερο, επειδή, άλλο, ίδια, έχει, ακόμα, εάν, χρόνια, δηλαδή, λοιπόν, τώρα, στις, είπε, περιοχή, ώρα, πρώτο, χώρα, ούτε, παρά, είτε, βάση, χώρας, σύστημα, πάντα, απο, σου, τον, στον, ένα, την, τόσο, όλο, καθώς, πολλά, μεγάλο, γίνει, πώς, φορές, κάτω, οποία, μεταξύ, είχε, γίνεται, σχέση, σχετικά, της, ενός, όχι, όπως, έχουμε, έτσι, εις, δύο, εκεί, κάποια, περίπου, γεγονός, κόσμο, κάθε, αφορά, εργασίας, πρόκειται, χρήση, τέλος, δεν, έχω, μόνο, επίσης, ένας, ενώ, πολύ, έγινε, όσο, κάτι, κάνει, ναι, τελευταία, είχαν, μετά, λόγω, μία, μέσα, τρόπο, α...

In [66]:
speechesFilteredDF.show

+-------------------+--------------------+--------------------+--------------------+--------------------+
|        member_name|            speeches|       speechesClean|         speechesTok|        speechesTok1|
+-------------------+--------------------+--------------------+--------------------+--------------------+
|σακοραφα ηλια σοφια| Κλείστε, κύριε σ...| Κλείστε κύριε συ...|[κλείστε, κύριε, ...|[κλείστε, κύριε, ...|
+-------------------+--------------------+--------------------+--------------------+--------------------+



In [67]:
import org.apache.spark.ml.feature.{CountVectorizerModel, CountVectorizer}

val cvModel : CountVectorizerModel = new CountVectorizer().setInputCol("speechesTok1")
                                        .setOutputCol("features")
                                        .setMinTF(2)
                                        .setMaxDF(10) 
                                        .setVocabSize(10)
                                        .fit(speechesFilteredDF)


import org.apache.spark.ml.feature.{CountVectorizerModel, CountVectorizer}
cvModel: org.apache.spark.ml.feature.CountVectorizerModel = cntVec_64888e5900be


In [68]:
val cvDF = cvModel.transform(speechesFilteredDF).drop("speeches", "speechesClean", "speechesTok")

cvDF: org.apache.spark.sql.DataFrame = [member_name: string, speechesTok1: array<string> ... 1 more field]


In [69]:
cvDF.show

+-------------------+--------------------+--------------------+
|        member_name|        speechesTok1|            features|
+-------------------+--------------------+--------------------+
|σακοραφα ηλια σοφια|[κλείστε, κύριε, ...|(10,[0,1,2,3,4],[...|
+-------------------+--------------------+--------------------+



In [70]:
import org.apache.spark.ml.linalg.Vector 
import org.apache.spark.rdd.RDD

val n_most_freq = 5

val zippedVoc = cvModel.vocabulary.zipWithIndex

val mostFreq_rdd : RDD[Array[String]]  = cvDF.select("features")
                .rdd
                .map(_.getAs[Vector](0))
                .map(_.toSparse)
                .map{ row => 
                        row.indices.zip(row.values)
                            .sortBy(_._2).take(n_most_freq).map(_._1) }
                .map(arr => {
                        
                        zippedVoc.map { case (word, idx) => 
                            if (arr.contains(idx)) 
                                word.toString
                        }
                    }
                .filter(_.!=()))
                .map(arr => arr.map(_.toString))
                

import org.apache.spark.ml.linalg.Vector
import org.apache.spark.rdd.RDD
n_most_freq: Int = 5
zippedVoc: Array[(String, Int)] = Array((κύριε,0), (συνάδελφε,1), (ευχαριστώ,2), (λόγο,3), (δημοκρατία,4), (αμέσως,5), (κλείστε,6), (κοινοβουλευτικός,7), (εκπρόσωπος,8), (συριζα,9))
mostFreq_rdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[280] at map at <console>:83


In [71]:
mostFreq_rdd.take(5)

res46: Array[Array[String]] = Array(Array(κύριε, συνάδελφε, ευχαριστώ, λόγο, δημοκρατία))


In [72]:
import org.apache.spark.sql.expressions.Window 

val members = speechesDF.select("member_name").rdd.map(w => w.toString.replaceAll("[\\[\\]]","").capitalize).toDF("name").withColumn("id", row_number().over(Window.orderBy("name"))).cache()

import org.apache.spark.sql.expressions.Window
members: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, id: int]


In [73]:
val df2 = mostFreq_rdd.toDF(s"Most_Frequent_${year}")

df2.show(30)

+--------------------+
|  Most_Frequent_2015|
+--------------------+
|[κύριε, συνάδελφε...|
+--------------------+



df2: org.apache.spark.sql.DataFrame = [Most_Frequent_2015: array<string>]


In [74]:
val mostFreqDF = df2.withColumn("id", row_number().over(Window.orderBy(s"Most_Frequent_${year}")))

mostFreqDF.show

+--------------------+---+
|  Most_Frequent_2015| id|
+--------------------+---+
|[κύριε, συνάδελφε...|  1|
+--------------------+---+



mostFreqDF: org.apache.spark.sql.DataFrame = [Most_Frequent_2015: array<string>, id: int]


In [75]:
members.show

+-------------------+---+
|               name| id|
+-------------------+---+
|Σακοραφα ηλια σοφια|  1|
+-------------------+---+



In [76]:
val finalDF = members.join(mostFreqDF, "id").drop("id")

finalDF.show(10, false)

+-------------------+-----------------------------------------------+
|name               |Most_Frequent_2015                             |
+-------------------+-----------------------------------------------+
|Σακοραφα ηλια σοφια|[κύριε, συνάδελφε, ευχαριστώ, λόγο, δημοκρατία]|
+-------------------+-----------------------------------------------+



finalDF: org.apache.spark.sql.DataFrame = [name: string, Most_Frequent_2015: array<string>]


In [171]:
import scala.collection.mutable.WrappedArray


finalDF.rdd.
        map { r : org.apache.spark.sql.Row => 
            (r.getAs[String](0), s"(${year},(" + (
                r.getAs[WrappedArray[String]](1).mkString(",").toString) + ")")
                }.saveAsTextFile(s"results_${year}")


/*

finalDF.rdd.
    map { r : org.apache.spark.sql.Row => 
        ((r.getAs[String](0), Array((year : Int, Array( 
            r.getAs[WrappedArray[String]](1).toArray.mkString(","))))))}.take(1)//.saveAsTextFile("test3")
*/

import scala.collection.mutable.WrappedArray


In [170]:
val x = sc.textFile("results_2015/part-00000").map(x => x.split(",")).map(x => x(1)).collect
                    

x: Array[String] = Array(Array[2015)
