Import this app's dependencies.

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline 

From the PySpark Streaming Programming Guide at https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations. This is the recommended way for each cluster node to get the SparkSession.

In [None]:
def getSparkSessionInstance(sparkConf):
    """Spark Streaming Programming Guide's recommended method 
       for getting an existing SparkSession or creating a new one."""
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

Function to display a Seaborn barplot based on the Spark DataFrame it receives. 

In [None]:
def display_barplot(spark_df, x, y, time, scale=2.0, size=(16, 9)):
    """Displays a Spark DataFrame's contents as a bar plot."""
    df = spark_df.toPandas()
    
    # remove prior graph when new one is ready to display
    display.clear_output(wait=True) 
    print(f'TIME: {time}')
    
    # create and configure a Figure containing a Seaborn barplot 
    plt.figure(figsize=size)
    sns.set(font_scale=scale)
    barplot = sns.barplot(data=df, x=x, y=y, 
                          palette=sns.color_palette('cool', 20))
    
    # rotate the x-axis labels 90 degrees for readability
    for item in barplot.get_xticklabels():
        item.set_rotation(90)
        
    plt.tight_layout()
    plt.show()

Function count_tags is called for every RDD to summarize the hashtag counts in that RDD, add them to the existing totals, then display an updated top-20 barplot.

In [None]:
def count_tags(time, rdd):
    """Count hashtags and display top-20 in descending order."""
    try:
        # get SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf()) 
        
        # map hashtag string-count tuples to Rows 
        rows = rdd.map(
            lambda tag: Row(hashtag=tag[0], total=tag[1])) 
        
        # create a DataFrame from the Row objects
        hashtags_df = spark.createDataFrame(rows)

        # create a temporary table view for use with Spark SQL
        hashtags_df.createOrReplaceTempView('hashtags')
        
        # use Spark SQL to get the top 20 hashtags in descending order
        top20_df = spark.sql(
            """select hashtag, total 
               from hashtags 
               order by total desc, hashtag asc 
               limit 20""")
        display_barplot(top20_df, x='hashtag', y='total', time=time)
    except Exception as e:
        print(f'Exception: {e}')


Main applications code sets up Spark streaming to read text from the `starttweetstream.py` script on localhost port 9876 and specifies how to process the tweets.

In [None]:
sc = SparkContext()

In [None]:
ssc = StreamingContext(sc, 10)

In [None]:
ssc.checkpoint('hashtagsummarizer_checkpoint')  

In [None]:
stream = ssc.socketTextStream('localhost', 9876)

In [None]:
tokenized = stream.flatMap(lambda line: line.split())

In [None]:
mapped = tokenized.map(lambda hashtag: (hashtag, 1))

In [None]:
hashtag_counts = mapped.updateStateByKey(
    lambda counts, prior_total: sum(counts) + (prior_total or 0)) 

In [None]:
hashtag_counts.foreachRDD(count_tags)

In [None]:
ssc.start()  # start the Spark streaming

In [None]:
#ssc.awaitTermination()  # wait for the streaming to finish

In [None]:
##########################################################################
# (C) Copyright 2019 by Deitel & Associates, Inc. and                    #
# Pearson Education, Inc. All Rights Reserved.                           #
#                                                                        #
# DISCLAIMER: The authors and publisher of this book have used their     #
# best efforts in preparing the book. These efforts include the          #
# development, research, and testing of the theories and programs        #
# to determine their effectiveness. The authors and publisher make       #
# no warranty of any kind, expressed or implied, with regard to these    #
# programs or to the documentation contained in these books. The authors #
# and publisher shall not be liable in any event for incidental or       #
# consequential damages in connection with, or arising out of, the       #
# furnishing, performance, or use of these programs.                     #
##########################################################################