## Twitter Word-Count

#### Installation von pyspark und nltk

In [1]:
!pip install pyspark
!pip install nltk

Collecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 2.8MB/s eta 0:00:01
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.7
Collecting nltk
[?25l  Downloading https://files.pythonhosted.org/packages/f6/1d/d925cfb4f324ede997f6d47bea4d9babba51b49e87a767c170b77005889d/nltk-3.4.5.zip (1.5MB)
[K     |████████████████████████████████| 1.5MB 5.6MB/s eta 0:00:01
Building wheels for collected packages: nltk
  Building wheel for nltk (setup.py) ... [?25ldone
[?25h  Created wheel for nltk: filename=nltk-3.4.5-cp37-none-any.whl size=1449905 sha256=ec306e159f7cf7bae6de23dbdccf7ec398f57ca047a7ef3e3416051b0e3228fd
  Stored in directory: /home/jovyan/.cache/pip/wheels/96/86/f6/68ab24c23f207c0077381a5e3904b2815136b879538a24b483
Successfully built nltk
Installing collected p

#### Environment konfigurieren

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,com.databricks:spark-avro_2.11:3.2.0 pyspark-shell'

#### Imports

In [2]:
import sys
import json
import nltk
import datetime
import re

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

#### Download nltk packages

In [3]:
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")

[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

#### Tweet Schema Definition

In [4]:
schema = StructType([                                                                                          
    StructField("text", StringType(), True)
])

In [5]:
def getSqlContextInstance(sparkContext):
    if ("sqlContextSingletonInstance" not in globals()):
        globals()["sqlContextSingletonInstance"] = SQLContext(sparkContext)
    return globals()["sqlContextSingletonInstance"]

In [6]:
def clean_tweet(tweet):
    # Remove Retweet text
    tweet = re.sub(r'^RT[\s]+', '', tweet)
    # Remove Hyperlinks
    tweet = re.sub(r'https?:\/\/.*[\r\n]*', '', tweet)
    # Remove Hashtag from the word
    tweet = re.sub(r'#', '', tweet)
    
    return tweet

In [7]:
def extract_noun(text):
    is_noun = lambda pos: pos[:2] == "NN" 

    tweet_tokenizer = nltk.tokenize.TweetTokenizer(preserve_case=False, strip_handles=True, reduce_len=True)
    tokens = tweet_tokenizer.tokenize(text)
    return [word for (word, pos) in nltk.pos_tag(tokens) if is_noun(pos)] 

### Übung 1

Implementation der "processTweets" Methode. Die Kommentare defineren welche aktion dort implementiert werden muss.

In [8]:
def processTweets(time, rdd):
    print("========= %s =========" % str(time))
    try:
        sql_context = getSqlContextInstance(rdd.context)

        # Parse the Tweet Json
        tweet_dataframe = sql_context.read.schema(schema).json(rdd)

        # Extract the relevant properties
        extract_dataframe = tweet_dataframe.select("text")

        # Define the user defined function
        clean_text = udf(clean_tweet, StringType())
        text_to_pos = udf(extract_noun, ArrayType(StringType()))

        # Extract the data from the text
        text_noun_dataframe = extract_dataframe.withColumn("nouns", text_to_pos(clean_text("text")))

        # Preparing the output dataset
        output_dataframe = text_noun_dataframe.select(explode("nouns"))
        output_dataframe.show()

        # Collect and print
        for p in output_dataframe.collect():
            print("Persist noun: " + p[0] + " Date: " + datetime.datetime.now().date().isoformat())

    except Exception as e:
        print("Error process tweet: " + e)
        pass

### Übung 1

Erstellen eines SparkContext und eines StreamingContext

In [9]:
spark_context = SparkContext(appName="TwitterTrendAnalyses") 
stream_context = StreamingContext(spark_context, 10)

### Übung 2

Erstellen eines Kafka Direct Stream mittels den KafkaUtils

In [10]:
kafka_stream = KafkaUtils.createDirectStream(stream_context, ["Tweets"], {"metadata.broker.list": "broker:9093"}) 
kafka_stream.map(lambda rawTweet: rawTweet[1]).foreachRDD(processTweets)

### Übung 3

Streaming starten

In [None]:
stream_context.start() 
stream_context.awaitTermination()

+---+
|col|
+---+
+---+

+---+
|col|
+---+
+---+

+---+
|col|
+---+
+---+

+-----------+
|        col|
+-----------+
|  president|
|      trump|
|  president|
|       vice|
|  president|
|        joe|
|      biden|
|    trump's|
|       kind|
|        guy|
|   patriots|
|       bill|
|yovanovitch|
|      fiona|
|       hill|
|     people|
|  testimony|
|          …|
|    opinion|
|     rights|
+-----------+
only showing top 20 rows

Persist noun: president Date: 2019-10-25
Persist noun: trump Date: 2019-10-25
Persist noun: president Date: 2019-10-25
Persist noun: vice Date: 2019-10-25
Persist noun: president Date: 2019-10-25
Persist noun: joe Date: 2019-10-25
Persist noun: biden Date: 2019-10-25
Persist noun: trump's Date: 2019-10-25
Persist noun: kind Date: 2019-10-25
Persist noun: guy Date: 2019-10-25
Persist noun: patriots Date: 2019-10-25
Persist noun: bill Date: 2019-10-25
Persist noun: yovanovitch Date: 2019-10-25
Persist noun: fiona Date: 2019-10-25
Persist noun: hill Date: 2019

+-------------+
|          col|
+-------------+
|            i|
|        radio|
|      station|
|   presenters|
|         john|
|      boehner|
|        slams|
|        party|
|        trump|
|        party|
|         ravi|
|        singh|
|      world's|
|humanitarians|
|       things|
|        trump|
|            …|
|            “|
|         home|
|          don|
+-------------+
only showing top 20 rows

Persist noun: i Date: 2019-10-25
Persist noun: radio Date: 2019-10-25
Persist noun: station Date: 2019-10-25
Persist noun: presenters Date: 2019-10-25
Persist noun: john Date: 2019-10-25
Persist noun: boehner Date: 2019-10-25
Persist noun: slams Date: 2019-10-25
Persist noun: party Date: 2019-10-25
Persist noun: trump Date: 2019-10-25
Persist noun: party Date: 2019-10-25
Persist noun: ravi Date: 2019-10-25
Persist noun: singh Date: 2019-10-25
Persist noun: world's Date: 2019-10-25
Persist noun: humanitarians Date: 2019-10-25
Persist noun: things Date: 2019-10-25
Persist noun: trump D

+---------+
|      col|
+---------+
|     bill|
|     barr|
|character|
|    trump|
|    drama|
|    trump|
|        ’|
|     fury|
|    build|
|  towards|
| sessions|
|     deci|
|        …|
| whenever|
|something|
|   people|
|  replies|
|        "|
|   you're|
|   member|
+---------+
only showing top 20 rows

Persist noun: bill Date: 2019-10-25
Persist noun: barr Date: 2019-10-25
Persist noun: character Date: 2019-10-25
Persist noun: trump Date: 2019-10-25
Persist noun: drama Date: 2019-10-25
Persist noun: trump Date: 2019-10-25
Persist noun: ’ Date: 2019-10-25
Persist noun: fury Date: 2019-10-25
Persist noun: build Date: 2019-10-25
Persist noun: towards Date: 2019-10-25
Persist noun: sessions Date: 2019-10-25
Persist noun: deci Date: 2019-10-25
Persist noun: … Date: 2019-10-25
Persist noun: whenever Date: 2019-10-25
Persist noun: something Date: 2019-10-25
Persist noun: people Date: 2019-10-25
Persist noun: replies Date: 2019-10-25
Persist noun: " Date: 2019-10-25
Persist noun: you