<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" /> 

<img width="500" style="float:center" 
     src="https://images6.alphacoders.com/119/1191374.jpg" />

# Twitter Structured Streaming 🦑
* [Business Case](#0)
* [1. Setup](#1) 
  * [1.1 Start the Kafka service and the producer](#1.1)
  * [1.2 Search for Spark Installation](#1.2)
  * [1.3 Create SparkSession](#1.3)
* [2. Twitter & SquidGame](#2) 
  * [2.1 Create a Streaming DataFrame](#2.1)
  * [2.2 Transform the DataFrame](#2.2)
  * [2.3 Applying the Streaming Logic per Question](#2.3)
* [3. Questions](#3)
  * [3.1 Conversation: What are the top 10 hashtags in the SquidGame conversation?](#3.1)
  * [3.2 Conversation: Top 10 most common languages on the conversation of SquidGame.](#3.2)
  * [3.3 Conversation: Who are the top 5 chit-chatter in the SquidGame conversation?](#3.3)
  * [3.4 Conversation Background: What are the top 10 locations in the SquidGame conversation?](#3.4)
  * [3.5 Giving context to Location - Checking how many users have their geotag enabled.](#3.5)
  * [3.6 Conversation Background: How are they tweeting? Top 5 technical sources used by Twitter Users.](#3.6)
  * [3.7 Influencers: Who are the top 10 accounts mentioned in the SquidGame Conversation?](#3.7)
  * [3.8 Influencers: Who are the most replied users in the SquidGame conversation?](#3.8)
  * [3.9 Who are the users? The ratio of Verified accounts in the conversation of SquidGame.](#3.9)
  * [3.10 Influencer: Who are the verified accounts tweeting about SquidGame?](#3.10)
  * [3.11 Influencers: Top 5 most followed users in our SquidGame Conversation.](#3.11)
  * [3.12 Given the development of the SquidGame coin in the last season's launch, are there cashtags embedded in the SquidGame conversation? What are the top 5?](#3.12)
* [4. Please, terminate the tools used.](#4)
  * [4.1 Stop the Spark Streaming application](#4.1)
  * [4.2 Stop the Kafka producer](#4.2)
  * [4.3 Stop the Kafka service](#4.3)

<a id='0'></a>
## Business Case 📎
To further understand user's behavior and find valuable insights for the future marketing campaign of the second season of Squid Game, Netflix has asked Team H, as analytics consultants, to set up a structured streaming Pipeline using Kafka of the conversation of Squid Game on Twitter to provide live insights for both marketing and sales team. Hence, to find valuable information, Team H has established 12 key business questions aligned with the Marketing and Sales strategy following three different pillars: Quick insights on the conversation, Conversation Background, and Insights on the Influencers. Hence, this Jupyter Notebook is a compilation of ready-to-query consumer-insights questions that aims to support those strategic teams with live insights of the Squid Game conversations.

</div>

<ul class="roman">
     <li> Quick insights on the Conversation</li>
     <ul class="square">
 <li>1. Conversation: What are the top 10 hashtags in the SquidGame conversation?</li>
 <li>2. Conversation: Top 10 most common languages on the conversation of SquidGame.</li>
 <li>3. Conversation: Who are the top 5 chit-chatter in the SquidGame conversation?
 </ul>
 <li>Conversation Background:
 <ul class="square">
 <li>4. Conversation Background: What are the top 10 locations in the SquidGame conversation?</li>
 <li>5. Conversation Background: Giving context to Location - Checking how many users have their geotag enabled.</li>
 <li>6. Conversation Background: How are they tweeting? Top 5 technical sources used by Twitter Users.</li>
  </ul>
 <li>Influencers:
 <ul class="square">
 <li>7. Influencers: Who are the top 10 accounts mentioned in the SquidGame Conversation?
 <li>8. Influencers: Who are the most replied users in the SquidGame conversation?
 <li>9. Influences: Who are the users? The ratio of Verified accounts in the conversation of SquidGame.
 <li>10. Influencer: Who are the verified accounts tweeting about SquidGame?
 <li> 11. Influencers: Top 5 most followed users in our SquidGame Conversation.
 <li> 12. Coins: Given the development of the SquidGame coin in the last season's launch, are there cashtags embedded in the SquidGame conversation? What are the top 5?

<a id='1'></a>
## 1. Seting Up 🧹

<a id='1.1'></a>
### 1.1 Starting with Kafka service and the producer 
<p>Dear Marketing Team, before starting to use this notebook please be sure that you log into the course environment by:</p>
<ul>
    <li><p><b>Start the Kafka service</b>:
        <br/><em>\$ sudo service kafka start</p></em><br/></li>
    <li>Add your <b>API key</b>, <b>API secret</b>, <b>access token</b> and <b>access secret</b> to the credentials.ini file</li>
    <li><p><b>Start the producer</b> connecting to Twitter and filtering tweets by keywords or hashtags:
        <br/>\$ python3 twitter_producer.py credentials.ini "squidgame,#SquidGame, #sg2, #squidgame2, #season2sg" -b localhost:9092 -t tweets</p></li>
</ul>
Please, also don't forget to pip install tweepy with 
**-m pip install --upgrade pip**

<a id='1.2'></a>
### 1.2 Searching for the Spark Installation 
This step is required just because we are working in the course environment.

In [1]:
import findspark
findspark.init()

<a id='1.3'></a>
### 1.3 Creating SparkSession

In addition to create the Spark Session, we need to set up a variable environment to include extra libraries in our "cluster".<br/>
In this case we're including the Spark package as our job will connect to Kafka.

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3" pyspark-shell'

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.appName("Twitter").getOrCreate()

Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark3/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2f754186-817b-4b95-b56d-aac91246502e;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.3 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 420ms :: artifacts dl 12ms
	:: modules in use

<a id='2'></a>
## 2. Twitter & SquidGame 🤓

<a id='2.1'></a>
### 2.1 Creating a Streaming DataFrame

Here, we are creating a **streaming** DataFrame called tweets_raw from the Kafka stream. 

- format: **kafka** 
- kafka.bootstrap.servers: **localhost:9092** 
- subscribe: **tweets**
- startingOffsets: **latest**


In [3]:
tweets_raw = (spark.readStream
                   .format("kafka")
                   .option("kafka.bootstrap.servers", "localhost:9092")
                   .option("subscribe", "tweets")
                   .option("startingOffsets", "latest")
                   .load())

tweets_raw

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

<a id='2.2'></a>
### 2.2 Transform the DataFrame

This step is important, since we are translating the tweet in its proper structure.

The contents of the value column are tweets in JSON format. The JSON structure is defined in the official Twitter developer website and is the following:

https://developer.twitter.com/en/docs/twitter-api/v1/data-dictionary/object-model/tweet

Importantly, it is **not mandatory to define all the JSON properties** in the schema definition, however since this notebook is supposed to be ready-to-query we thought it would be important to translate everything to JSON.

In [4]:
tweet_schema="""
created_at string,
id bigint,
id_str string,
text string,
source string,
truncated boolean,
in_reply_to_status_id bigint,
in_reply_to_status_id_str string,
in_reply_to_user_id bigint,
in_reply_to_user_id_str string,
in_reply_to_screen_name string,
`user` struct<
            id:bigint,
            id_str:string,
            name:string,
            screen_name:string,
            location:string,
            url:string,
            description:string,
            protected:boolean,
            verified:boolean,
            followers_count:bigint,
            friends_count:bigint,
            listed_count:bigint,
            favourites_count:bigint,
            statuses_count:bigint,
            created_at:string,
            profile_banner_url:string,
            profile_image_url_https:string,
            default_profile:boolean,
            default_profile_image:boolean,
            withheld_in_countries: array<string>,
            withheld_scope:string,
            geo_enabled:boolean
            >,
coordinates struct <
            coordinates:array<float>,
            type:string
            >,
place struct<
            country:string,
            country_code:string,
            full_name:string,
            place_type:string,
            url:string
            >,
quoted_status_id bigint,
quoted_status_id_str string,
is_quote_status boolean,
quote_count bigint,
reply_count bigint,
retweet_count bigint,
favorite_count bigint,
entities struct<
            user_mentions:array<struct<screen_name:string>>,
            hashtags:array<struct<text:string>>, 
            media:array<struct<expanded_url:string>>, 
            urls:array<struct<expanded_url:string>>, 
            symbols:array<struct<text:string>>
            >,
favorited boolean,
retweeted boolean,
possibly_sensitive boolean,
filter_level string,
lang string
"""

In [5]:
from pyspark.sql.functions import from_json, col

tweets = (tweets_raw.selectExpr("cast(value as string)") 
                    .select(from_json(col("value"), tweet_schema).alias("tweet")))
                      
tweets.printSchema()

root
 |-- tweet: struct (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- id_str: string (nullable = true)
 |    |-- text: string (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- truncated: boolean (nullable = true)
 |    |-- in_reply_to_status_id: long (nullable = true)
 |    |-- in_reply_to_status_id_str: string (nullable = true)
 |    |-- in_reply_to_user_id: long (nullable = true)
 |    |-- in_reply_to_user_id_str: string (nullable = true)
 |    |-- in_reply_to_screen_name: string (nullable = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screen_name: string (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- protected: bool

In [6]:
tweets

DataFrame[tweet: struct<created_at:string,id:bigint,id_str:string,text:string,source:string,truncated:boolean,in_reply_to_status_id:bigint,in_reply_to_status_id_str:string,in_reply_to_user_id:bigint,in_reply_to_user_id_str:string,in_reply_to_screen_name:string,user:struct<id:bigint,id_str:string,name:string,screen_name:string,location:string,url:string,description:string,protected:boolean,verified:boolean,followers_count:bigint,friends_count:bigint,listed_count:bigint,favourites_count:bigint,statuses_count:bigint,created_at:string,profile_banner_url:string,profile_image_url_https:string,default_profile:boolean,default_profile_image:boolean,withheld_in_countries:array<string>,withheld_scope:string,geo_enabled:boolean>,coordinates:struct<coordinates:array<float>,type:string>,place:struct<country:string,country_code:string,full_name:string,place_type:string,url:string>,quoted_status_id:bigint,quoted_status_id_str:string,is_quote_status:boolean,quote_count:bigint,reply_count:bigint,retweet

In [7]:
from pyspark.sql.functions import *
tweets_df = tweets.select(col("tweet.user.name").alias("name"), col("tweet.user.location").alias("location"), col("tweet.user.description").alias("description"), col("tweet.user.created_at").alias("twitter_created_at"), col("tweet.user.followers_count").alias("followers_count"), col("tweet.user.friends_count").alias("friends_count"), col("tweet.user.favourites_count").alias("favourites_count"), col("tweet.user.verified").alias("verified"))
tweets_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- description: string (nullable = true)
 |-- twitter_created_at: string (nullable = true)
 |-- followers_count: long (nullable = true)
 |-- friends_count: long (nullable = true)
 |-- favourites_count: long (nullable = true)
 |-- verified: boolean (nullable = true)



<a id='2.3'></a>
### 2.3 Applying the Streaming Logic per Question

In this notebook, we will be answering 12 essential business questions for both the marketing and sales team to better understand the conversation of SquidGame on Twitter. The process of each question uses the exact step-by-step process defined below. 

<li> Step 1: Applying transformations to the dataframe. </li>
<li> Step 2: Normalizing the subject at hand to lower case so they can be grouped more easily and account for different. 
<li> Step 3: Sinking the streaming DataFrame. </li>
<li> Step 4: Action the stream by calling the start method. </li>
<li> Step 5: Checking results since we defined sink to keep the data in memory as a table. </li>


In [8]:
from pyspark.sql.functions import *
import time
from IPython.display import clear_output

<a id='3.1'></a>
**1. Conversation: What are the top 10 hashtags in the SquidGame conversation?**

In [9]:
hashtags = (tweets.select(explode("tweet.entities.hashtags.text").alias("hashtag")))                
hashtags

DataFrame[hashtag: string]

In [10]:
trending_hashtags = (hashtags.withColumn("hashtag",lower("hashtag"))
                    .groupBy("hashtag").count())
trending_hashtags

DataFrame[hashtag: string, count: bigint]

In [11]:
stream_writer_hashtags = (trending_hashtags.writeStream
                         .format("memory")
                         .queryName("trending_hashtags_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_hashtags

<pyspark.sql.streaming.DataStreamWriter at 0x7fab1c04b828>

In [12]:
query = stream_writer_hashtags.start()

In [None]:
import time
from IPython.display import clear_output
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_hashtags_query order by count desc limit 10").show()
    
#Please, interrupet the kernel before jumping to the next question. You should get get KeyboardInterrupt message.

                                                                                

+---------+-----+
|  hashtag|count|
+---------+-----+
|squidgame|    1|
|  binarmy|    1|
+---------+-----+



<a id='3.2'></a>
**2. Conversation: Top 10 most common languages on the conversation of SquidGame.**

In [21]:
languages = (tweets.select(lower("tweet.lang").alias("language")))                
languages

DataFrame[language: string]

In [22]:
trending_languages = (languages.groupBy("language").count())
trending_languages

DataFrame[language: string, count: bigint]

In [23]:
stream_writer_languages = (trending_languages.writeStream
                         .format("memory")
                         .queryName("trending_languages_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_languages

<pyspark.sql.streaming.DataStreamWriter at 0x7fab11d23e48>

In [24]:
query = stream_writer_languages.start()

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_languages_query order by count desc limit 10").show()

+--------+-----+
|language|count|
+--------+-----+
|      en|    1|
+--------+-----+



<a id='3.3'></a>
**3. Conversation: Who are the top 5 chit-chatter in the SquidGame conversation?**

In [9]:
users = (tweets.select("tweet.user.screen_name"))               
users

DataFrame[screen_name: string]

In [10]:
trending_users = (users.withColumn("screen_name",lower("screen_name"))
                    .groupBy("screen_name").count())
trending_users

DataFrame[screen_name: string, count: bigint]

In [11]:
stream_writer_users = (trending_users.writeStream
                         .format("memory")
                         .queryName("trending_users_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_users

<pyspark.sql.streaming.DataStreamWriter at 0x7f92163875f8>

In [12]:
query = stream_writer_users.start()

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_users_query order by count desc limit 10").show()

+---------------+-----+
|    screen_name|count|
+---------------+-----+
|        raart96|    2|
| cemalturkmen15|    1|
|forbeslifelatam|    1|
|         fiztwt|    1|
+---------------+-----+





<a id='3.4'></a>
**4. Conversation Background: What are the top 10 locations in the SquidGame conversation?***

In [14]:
locations = (tweets.select("tweet.place.country_code").alias("user_location"))                
locations

DataFrame[country_code: string]

In [15]:
trending_locations = (locations.withColumn("user_location.country_code",lower("user_location.country_code"))
                    .groupBy("user_location.country_code").count())
trending_locations

DataFrame[country_code: string, count: bigint]

In [16]:
stream_writer_locations = (trending_locations.writeStream
                         .format("memory")
                         .queryName("trending_locations_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_locations

<pyspark.sql.streaming.DataStreamWriter at 0x7f9216367240>

In [17]:
query = stream_writer_locations.start()

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_locations_query order by count desc limit 10").show()

+------------+-----+
|country_code|count|
+------------+-----+
|        null|    2|
+------------+-----+



<a id='3.5'></a>
**5. Conversation Background: Giving context to Location - Checking how many users have their geotag enabled.**

In [27]:
geotag_booleans = (tweets.select("tweet.user.geo_enabled").alias("geo_enabled"))                
geotag_booleans

DataFrame[geo_enabled: boolean]

In [28]:
trending_geotag_booleans = (geotag_booleans.withColumn("geo_enabled",lower("geo_enabled"))
                    .groupBy("geo_enabled").count())
trending_geotag_booleans

DataFrame[geo_enabled: string, count: bigint]

In [29]:
stream_geotag_booleans = (trending_geotag_booleans.writeStream
                         .format("memory")
                         .queryName("trending_geotag_booleans_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_geotag_booleans

<pyspark.sql.streaming.DataStreamWriter at 0x7f921635a048>

In [30]:
query = stream_geotag_booleans.start()

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_geotag_booleans_query order by count desc limit 10").show()

+-----------+-----+
|geo_enabled|count|
+-----------+-----+
|      false|    8|
|       true|    2|
+-----------+-----+



<a id='3.6'></a>
**6. Conversation Background: How are they tweeting? Top 5 technical sources used by Twitter Users.**

In [40]:
sources = (tweets.select("tweet.source").alias("source"))               
sources

DataFrame[source: string]

[Stage 307:(128 + 4) / 200][Stage 308:>  (0 + 0) / 1][Stage 310:>  (0 + 0) / 1]

In [41]:
trending_sources = (sources.withColumn("source",lower("source"))
                    .groupBy("source").count())
trending_sources

[Stage 307:(142 + 4) / 200][Stage 308:>  (0 + 0) / 1][Stage 310:>  (0 + 0) / 1]

DataFrame[source: string, count: bigint]

                                                                                

In [42]:
stream_writer_sources = (trending_sources.writeStream
                         .format("memory")
                         .queryName("trending_sources_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_sources

<pyspark.sql.streaming.DataStreamWriter at 0x7f921635a6d8>

In [43]:
query = stream_writer_sources.start()

[Stage 309:(78 + 4) / 200][Stage 311:>(0 + 0) / 200][Stage 313:>(0 + 0) / 200]0]

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_sources_query order by count desc limit 5").show()

+--------------------+-----+
|              source|count|
+--------------------+-----+
|<a href="http://t...|    4|
|<a href="http://t...|    2|
|<a href="https://...|    1|
|<a href="https://...|    1|
|<a href="https://...|    1|
+--------------------+-----+



<a id='3.7'></a>
**7. Influencers: Who are the top 10 accounts mentioned in the SquidGame Conversation?**

In [45]:
# applying transformations to the dataframe
mentions = (tweets.select(explode("tweet.entities.user_mentions.screen_name").alias("mention")))                 
mentions

DataFrame[mention: string]

In [46]:
#normalizing the mentions to lower case and grouping
trending_mentions = (mentions.withColumn("mention",lower("mention"))
                    .groupBy("mention").count())
trending_mentions

DataFrame[mention: string, count: bigint]

In [47]:
#sinking the streaming DataFrame
#format: memory
#outputMode: complete
#trigger: 2 seconds
stream_writer_mentions = (trending_mentions.writeStream
                         .format("memory")
                         .queryName("trending_mentions_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_mentions

<pyspark.sql.streaming.DataStreamWriter at 0x7f92162e0fd0>

In [48]:
#actioning the stream by calling the start method
query = stream_writer_mentions.start()

In [None]:
#checking results since we defined sink to keep the data in memory as a table
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_mentions_query order by count desc limit 10").show()

                                                                                

+-------+-----+
|mention|count|
+-------+-----+
| yoo_i_|    1|
+-------+-----+



<a id='3.8'></a>
**8. Influencers: Who are the most replied users in the SquidGame conversation?**

In [58]:
replies = (tweets.select("tweet.in_reply_to_screen_name").alias("reply"))                
replies

DataFrame[in_reply_to_screen_name: string]

In [59]:
trending_replies = (replies.withColumn("reply.in_reply_to_screen_name",lower("reply.in_reply_to_screen_name"))
                    .groupBy("reply.in_reply_to_screen_name").count())
trending_replies



DataFrame[in_reply_to_screen_name: string, count: bigint]

In [60]:
stream_writer_replies = (trending_replies.writeStream
                         .format("memory")
                         .queryName("trending_replies_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_replies



<pyspark.sql.streaming.DataStreamWriter at 0x7f9216301780>

                                                                                

In [61]:
query = stream_writer_replies.start()

                                                                                

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_replies_query order by count desc limit 10").show()

                                                                                

+-----------------------+-----+
|in_reply_to_screen_name|count|
+-----------------------+-----+
|                   null|    3|
|        Gdr2VS8mZG1UEYg|    1|
+-----------------------+-----+



                                                                                

<a id='3.9'></a>
**9. Influences: Who are the users? The ratio of Verified accounts in the conversation of SquidGame.**

In [71]:
verified_status = (tweets.select("tweet.user.verified"))                 
verified_status

DataFrame[verified: boolean]

In [72]:
trending_status = (verified_status.withColumn("verified",lower("verified"))
                    .groupBy("verified").count())
trending_status

DataFrame[verified: string, count: bigint]

In [73]:
stream_writer_status = (trending_status.writeStream
                         .format("memory")
                         .queryName("trending_status_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_status

<pyspark.sql.streaming.DataStreamWriter at 0x7f9216319630>

In [74]:
query = stream_writer_status.start()

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_status_query order by count desc limit 10").show()

+--------+-----+
|verified|count|
+--------+-----+
|   false|    3|
+--------+-----+



<a id='3.10'></a>
**10. Influencer: Who are the verified accounts tweeting about SquidGame?**

In [9]:
verified_users = (tweets.select("tweet.user.screen_name", col("tweet.user.verified").alias("verified")))                 
verified_users

DataFrame[screen_name: string, verified: boolean]

In [10]:
trending_verified_users = verified_users.where(col("verified") == "True").select("screen_name").groupBy("screen_name").count()
trending_verified_users

DataFrame[screen_name: string, count: bigint]

In [11]:
stream_writer_verified_users = (trending_verified_users.writeStream
                         .format("memory")
                         .queryName("trending_verified_users_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_verified_users

<pyspark.sql.streaming.DataStreamWriter at 0x7f95f0c52940>

In [12]:
query = stream_writer_verified_users.start()

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_verified_users_query").show()

+-----------+-----+
|screen_name|count|
+-----------+-----+
+-----------+-----+



<a id='3.11'></a>
**11. Influencers: Top 5 most followed users in our SquidGame Conversation.**

In [19]:
most_followed = (tweets.select("tweet.user.screen_name", col("tweet.user.followers_count").alias("followers")))                 
most_followed

DataFrame[screen_name: string, followers: bigint]

In [21]:
trending_most_followed = most_followed.groupBy("screen_name").count()
trending_most_followed

DataFrame[screen_name: string, count: bigint]

In [11]:
stream_most_followed = (trending_most_followed.writeStream
                         .format("memory")
                         .queryName("trending_verified_users_query")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_most_followed

<pyspark.sql.streaming.DataStreamWriter at 0x7f5448864630>

In [12]:
query = stream_most_followed.start()

In [None]:
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select screen_name from trending_verified_users_query order by count desc limit 5").show()

+-----------+
|screen_name|
+-----------+
+-----------+



<a id='3.12'></a>
**12. Coins: Given the development of the SquidGame coin in the last season's launch, are there cashtags embedded in the SquidGame conversation? What are the top 5?**

In [9]:
cashtags = (tweets.select(explode("tweet.entities.symbols.text").alias("cashtag")))
cashtags

DataFrame[cashtag: string]

In [10]:
trending_cashtags = (cashtags.withColumn("cashtag",upper("cashtag"))
                    .groupBy("cashtag").count())
trending_cashtags

DataFrame[cashtag: string, count: bigint]

In [11]:
stream_writer_cashtags = (trending_cashtags.writeStream
                         .format("memory")
                         .queryName("trending_cashtags_now")
                         .outputMode("complete")
                         .trigger(processingTime='2 seconds'))
stream_writer_cashtags

<pyspark.sql.streaming.DataStreamWriter at 0x7f84d55f76a0>

In [12]:
query = stream_writer_cashtags.start()

In [None]:
import time
from IPython.display import clear_output
while(True):
    time.sleep(10)
    clear_output(wait=False)
    spark.sql("select * from trending_cashtags_now order by count desc limit 5").show()

+-------+-----+
|cashtag|count|
+-------+-----+
|  SQUID|    3|
+-------+-----+



<a id='4'></a>
## 4. Please, terminate the tools used. 🙅🏼‍♀️

<a id='4.1'></a>
### 3.1 Stop the Spark Streaming application
In order to stop the Spark Streaming application go to **Kernel -> Shutdown**.

<a id='4.2'></a>
### 4.2 Stop the Kafka producer
Go to the terminal where you started the producer and **press Ctrl + C**. 

<a id='4.3'></a>
### 4.3 Stop the Kafka service
<p>Go to a terminal windows and type the following command:</p>
<em>$ sudo service kafka stop</em>