# PEC 3: Trending Topics

## 1. Trending Topics Básico

- Use Structured Streaming to implement a simple version of "trending topics".

- CSV files in `data/tweets`. Each CSV line corresponds to a tweet.

- Do a Spark Structured Streaming 'job' that will process the files one by one, and maintain a table in memory with the 20 most commented terms.

- We will consider the tweets that come in the correct order, ie, its not necessary to use data 'timestamp'

- Solve this exercise using Structured Streaming, not a static Spark job.

- The result of the calculations on the stream should store them in a memory table called `trending`.

- The following query should show the final result. It will change with time, as new files are processed.
```python
spark.sql("select * from trending").show()
```


In [16]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [17]:
# I tried really hard to make it NOT static, like in the instructions
# I followed a step-by-step guide to have this running: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
# Did not work at ALL. I had to make a static DF.

# nc -lk 9999
# lines = spark \
#     .readStream \
#     .format("socket") \
#     .option("host", "localhost") \
#     .option("port", 9999) \
#     .load()
# lines.printSchema()

# SPARK.READ the static stream
    # ex static = spark.read.json("data/tweets/") --> CSV
static = spark.read.option("header","true").option("sep",";").csv("data/tweets/")

In [18]:
# Print static Schema
static.printSchema()

root
 |-- username: string (nullable = true)
 |-- date: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- favorites: string (nullable = true)
 |-- text: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- id: string (nullable = true)
 |-- permalink: string (nullable = true)



In [19]:
static.take(1)

[Row(username='librarian2277', date='2017-10-14 00:54', retweets='0', favorites='0', text='How to write blog headlines that drive search traffic https:// searchenginewatch.com/2017/10/07/how -to-write-blog-headlines-that-drive-search-traffic/ …', geo=None, mentions=None, hashtags=None, id='919063962876104704', permalink='https://twitter.com/librarian2277/status/919063962876104704')]

In [20]:
# Stream the data
    # `readStream` =  monitor tweets directory
    # `maxFilesPerTrigger`: flow the new files 1 by 1.
    # streaming = spark.readStream.schema(static.schema).option("maxFilesPerTrigger", 1).json("data/activity-data")

streaming = spark.readStream.option("sep",";")\
    .option("maxFilesPerTrigger", 10)\
    .schema(static.schema)\
    .csv("data/tweets")

# Confirm that it is indeed streaming
streaming.isStreaming

True

In [21]:
streaming.printSchema()

root
 |-- username: string (nullable = true)
 |-- date: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- favorites: string (nullable = true)
 |-- text: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- id: string (nullable = true)
 |-- permalink: string (nullable = true)



In [22]:
# SPLIT /separate the words
from pyspark.sql.functions import split
from pyspark.sql.functions import col

# EXPLODE each word into a new DataFrame row
from pyspark.sql.functions import explode

# how to do it all in one line: spark.read.csv("data/twitterBios").select(explode(split("_c0", "\s+")).alias("word")).groupBy("word").count().orderBy("count", ascending=False).limit(10).show()

In [23]:
# ex: words = streaming.select(explode(split(lines.value, " ")).alias("word"))
words = streaming.withColumn("split", split(col("text"), " ")).withColumn("word", explode(col("split")))
words.printSchema()

root
 |-- username: string (nullable = true)
 |-- date: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- favorites: string (nullable = true)
 |-- text: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- split: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- word: string (nullable = true)



In [24]:
# WORDCOUNT: groupBy words, then count groups

wordCounts = words.groupBy("word").count()
wordCounts.printSchema()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



In [25]:
# STREAMQUERY: 
# ex: activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
    # indicate OUTPUTMODE: complete
    # indicate output FORMAT: memory
    # NAME it with a unique number, for consulting later. streaming_wordcount --> trending
    # START streaming process

query = wordCounts\
    .writeStream\
    .outputMode("complete")\
    .queryName("trending")\
    .format("memory")\
    .start()

In [26]:
# ACTIVE: Confirm that the streaming is active
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7fd9eaa7ab00>]

In [28]:
# Query over results in memory
for x in range(5):
    spark.sql("SELECT * FROM trending").show()
    
# Unfortunately this never worked:
# spark.sql("select * from trending").show()

+----+-----+
|word|count|
+----+-----+
+----+-----+

+----+-----+
|word|count|
+----+-----+
+----+-----+

+----+-----+
|word|count|
+----+-----+
+----+-----+

+----+-----+
|word|count|
+----+-----+
+----+-----+

+----+-----+
|word|count|
+----+-----+
+----+-----+



In [13]:
# to STOP the streaming
query.stop()

## 2. Eliminando "stopwords"


- Find the trending topics again. Avoid "stopwords", which can be found in folder `data/stopwords`
- Stopwords are common words that don't mean very much

- You have to do this calculation over the flow of tweets. Again you must process the files one by one. 
- But this time you should avoid for the stopwords to appear as trending topics. 
- Keep in mind that no combination of capitalization should appear.

In [29]:
# For Spark Joins, we need a static DF.
#  Read the data as a dataframe
myStaticDF = spark.read.csv("data/stopwords/stopwords.txt").toDF("stopWords")

In [30]:
myStaticDF.show()

+-----------+
|  stopWords|
+-----------+
|          a|
|       able|
|      about|
|      above|
|  according|
|accordingly|
|     across|
|   actually|
|      after|
| afterwards|
|      again|
|    against|
|        all|
|      allow|
|     allows|
|     almost|
|      alone|
|      along|
|    already|
|       also|
+-----------+
only showing top 20 rows



In [None]:
# ATTEMPT TO JOIN THE DATAFRAMES:
    # add a 3rd column, "stopWords". Will contain only stop-words and nulls.
    # if value in the new 3rd column = empty, then this means it was never a stop word
    # if value is NOT empty, then it is a stop word. Drop it!
    
streamingJoin = wordCounts\
    .join(myStaticDF, col("word") == col("stopWords"),"left_outer")\
    .where("stopWords is null")\
    .drop("stopWords")\
    .writeStream\
    .queryName("stopWords_join").format("memory")\
    .outputMode("complete")\
    .start()

#spark.sql("select * from stopWords_join").show()


## 3. Evitar signos de puntuación


- We have eliminated the stop words
- For our 'trending topics' we want to prevent the appearance of words that are solely punctuation marks

In [31]:
# REGEX PATTERNS
    # ^ match must start at the beginning of the string or line
    # \A  match must occur at the start of the string
    # \p{ name } Matches any single character that IS Unicode general
    # \P{ name } Matches any single character that is NOT  Unicode 
    # \p{Punct} Punctuation: 1 of !"#$%&'()*+,-./:;<=>?@[\]^_`{|}~

# How to match a word that contains no letters, only punctuation
punctuationWord = "[^\p{Punct}]"

In [None]:
# Modify the previous dataframe to drop punctuationWord

noPunctuationWords = streamingJoin\
    .where(col("word")\
    .rlike(punctuationWord))\
    .drop("word")\
    .writeStream.queryName("punctuation_join")\
    .format("memory")\
    .outputMode("complete")\
    .start()

In [None]:
# Start at another failed attempt...
import org.apache.spark.sql.expressions.Window
streamingJoin.where(value == punctuationWord).drop(row)