In [1]:
!pip install pyspark



In [10]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import streaming

import pandas as pd
from IPython.display import display, clear_output
import seaborn as sns

spark = SparkSession.builder.appName("Streaming").getOrCreate()

In [11]:
lines = spark.readStream.format('socket').option('host', 'localhost').option('port', 9999).load()

words = lines.select(f.explode(f.split(lines.value, " ")).alias("word"))

wordCounts = words.groupBy("word").count()

In [15]:
query = wordCounts.writeStream.outputMode("complete").format("console").start()

# query.awaitTermination()

In [15]:
from TwitterSecrets import twitter_secrets as ts

OUT_PATH = "/content/twitterdata"

QUERY = "euro 2021"

STOP_AFTER = 500

In [16]:
import json
import tempfile
import requests
import pathlib
from datetime import datetime as dt
from uuid import uuid4
from requests_oauthlib import OAuth1Session

In [14]:
pathlib.Path(OUT_PATH).mkdir(parents = True, exist_ok = True)

query_data = {
    "track": f"#{QUERY}".replace("#", "").lower(), "language": "en"
}

twitter = OAuth1Session(
    client_key = ts.CONSUMER_KEY,
    client_secret = ts.CONSUMER_SECRET,
    resource_owner_key = ts.ACCESS_TOKEN,
    resource_owner_secret = ts.ACCESS_SECRET,
)

url = "https://stream.twitter.com/1/1/statuses/filter.json"
query_url = f"{url}?{'&'.join([f'{k}={v}' for k, v in query_data.items()])}"

print(f'STREAMING {STOP_AFTER} TWEETS')

with twitter.get(query_url, stream = True) as response:
  for i, raw_tweet in enumerate(response.iter_lines()):
    if i == STOP_AFTER:
      break
    try:
      tweet = json.loads(raw_tweet)
      print(f"{i+1}/{STOP_AFTER}: {tweet['user']['screen_name']} @ {tweet['created_at']}: {tweet['text']}\n")
    except (json.JSONDecodeError, KeyError) as err:
      print("ERROR")
      continue
    with pathlib.Path(OUT_PATH) / f"{dt.now().timestamp()}_{uuid4()}.json" as F:
      F.write_bytes(raw_tweet)

STREAMING 500 TWEETS


# **Test**

In [4]:
import socket
import sys
import requests
import requests_oauthlib
import json
# from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

In [5]:
from TwitterSecrets import twitter_secrets as ts

OUT_PATH = "/content/twitterdata"

QUERY = "euro 2021"

STOP_AFTER = 500

ACCESS_TOKEN = ts.ACCESS_TOKEN
ACCESS_SECRET = ts.ACCESS_SECRET
CONSUMER_KEY = ts.CONSUMER_KEY
CONSUMER_SECRET = ts.CONSUMER_SECRET
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

In [6]:
def get_tweets():
    url = 'https://stream.twitter.com/1.1/statuses/filter.json'
    query_data = [('language', 'en'), ('follow', '25073877')]
    query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
    response = requests.get(query_url, auth=my_auth, stream=True)
    print(query_url, response)
    
    return response

In [7]:
def process_send_tweets_to_spark(http_resp, tcp_connection):
    for line in http_resp.iter_lines():
        try:
            full_tweet = json.loads(line)
            tweet_text = str(full_tweet['text'].encode("utf-8"))

            # analysis sentiment score
            sentiment_score = analyzer.polarity_scores(tweet_text)["compound"]
            if sentiment_score >= 0.05:
                sentiment = "POSITIVE"
            elif sentiment_score <= -0.05:
                sentiment = "NEGATIVE"
            else:
                sentiment = "NEUTRAL"

            # separate sentiment label with tweet content
            mess =  sentiment + '||||' + tweet_text + '\n' 

            tcp_connection.send(bytes(mess, 'utf-8'))
        except:
            e = sys.exc_info()[0]
            print("Error: %s" % e)

In [8]:
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)

Waiting for TCP connection...


KeyboardInterrupt: ignored

In [None]:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
import re

# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# create spark instance with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# creat the Streaming Context from the above spark context with window size 2 seconds
ssc = StreamingContext(sc, 2)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 9009
dataStream = ssc.socketTextStream("localhost",9009)