## Set up a SparkSession connected to the socket create with Tweepy

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('streamTwitterTags').getOrCreate()

In [155]:
from pyspark.sql.functions import lower, split, explode, substring, count

# get tweets
tweets = spark.readStream.format("socket").option("host", "127.0.0.1").option("port", 5555).load()

# convert to lowercase and split the words
words_train = tweets.select(split(lower(tweets.value), " ").alias("value"))

# 'explode' a list of words into rows with single word
words_df = words_train.select(explode(words_train.value).alias('word_explode'))

# keep only rows that the word starts with '#'
hashtags = words_df.filter(substring(words_df.word_explode,0,1)=='#')
hashtags = hashtags.select(hashtags.word_explode.alias('hashtag'))

# count hashtags
count_hashtags = hashtags.groupBy('hashtag').count()
count_hashtags_order = count_hashtags.orderBy(count_hashtags['count'].desc())

In [156]:
query = count_hashtags_order.writeStream.outputMode("complete").format("console").start()

In [157]:
query.stop()