# Ingesting realtime tweets using Apache Kafka, Tweepy and Python

## Purpose:

Main data source for the lambda architecture pipeline
uses twitter streaming API to simulate new events coming in every minute
Kafka Producer sends the tweets as records to the Kafka Broker
Contents:

- Twitter setup
- Defining the Kafka producer
- Producing and sending records to the Kafka Broker
- Deployment

In [1]:
!pip install pyspark tweepy pymongo kafka-python "pymongo[srv]"

Collecting pyspark
  Using cached pyspark-3.3.0.tar.gz (281.3 MB)
Collecting tweepy
  Using cached tweepy-4.10.0-py3-none-any.whl (94 kB)
Collecting pymongo
  Downloading pymongo-4.1.1-cp37-cp37m-macosx_10_6_intel.whl (394 kB)
[K     |████████████████████████████████| 394 kB 89 kB/s eta 0:00:01
[?25hCollecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Collecting py4j==0.10.9.5
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Collecting requests-oauthlib<2,>=1.2.0
  Using cached requests_oauthlib-1.3.1-py2.py3-none-any.whl (23 kB)
Collecting oauthlib<4,>=3.2.0
  Using cached oauthlib-3.2.0-py3-none-any.whl (151 kB)
Collecting dnspython<3.0.0,>=1.16.0
  Using cached dnspython-2.2.1-py3-none-any.whl (269 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=a587d64edeb6929b78afd9c78f1294dbf9

In [2]:
!python --version

Python 3.7.13


In [3]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user ubuntu on 2022-06-09T19:58:58Z
Revision f74867bddfbcdd4d08076db36851e88b15e66556
Url https://github.com/apache/spark
Type --help for more information.


In [4]:
import tweepy
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
from pyspark.streaming import StreamingContext
from kafka import KafkaConsumer, KafkaProducer
import time
from datetime import datetime, timedelta
import json

In [5]:
topic_name = "tweet_stream"

## Base de données SQL

In [6]:
from pymongo import MongoClient

client = MongoClient(
    "mongodb+srv://twitterbot:SaPDBU5WU3O349mn@cluster0.z5t7h.mongodb.net/?tls=true&tlsAllowInvalidCertificates=true"
)

twitteranalysis_database = client.twitteranalysis

try: 
    twitteranalysis_database.command("serverStatus")
except Exception as e: 
    print(e)
else: 
    print("You are connected!")

client.close()

You are connected!


#  Configuration de Twitter 

getting the API object using authorization information

In [7]:
# twitter setup
consumer_key = "886bWUB38AHD1VC8vE777rVKs"
consumer_secret = "QLTWRcxbmxjOAAJatf4WCbL7j5vQYiyhSImv00wLarPVctcXE4"
access_token = "765095367067262976-Nz1XFSRSQjdd2MKYdPLiyKpjTUSsEoo"
access_token_secret = "8GZJcQVFJEk4SBrGgjk0V1HXiFau920Jt62Dntgf65qug"
# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object by passing in auth information
api = tweepy.API(auth) 
StreamTweepy = tweepy.Stream(
    consumer_key, 
    consumer_secret
)

In [8]:
#try connection
try:
    api.verify_credentials()
    print("Authentication Successful")
except:
    print("Authentication Error")

Authentication Successful


In [9]:
def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    #mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

# Création du producer Kafka 

- specify the Kafka Broker
- specify the topic name
- optional: specify partitioning strategy

In [10]:
def kafka_stream_tweets(data):
    producer = KafkaProducer(
        value_serializer=lambda m: json.dumps(m).encode('utf-8'),
        bootstrap_servers='localhost:9092'
    )
    producer.send(topic_name, value=data)
    date = datetime.now()
    print(date,"New Tweet ! (Len:",len(data),")")
    
    return True

## Initialisation de Spark

In [26]:
spark = SparkSession.builder\
    .appName("TwitterAnalysis") \
    .config('spark.jars.packages', 'org.apache.spark:spark-streaming-kafka-0-10_2.11:2.2.0') \
    .getOrCreate()

In [31]:
#Initialization Spark
sc = SparkContext.getOrCreate()
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 5)

In [None]:
tweets_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "earliest") \
    .load()

## Produire et envoyer des enregistrements au courtier Kafka

1. Interroger l'objet de l'API Twitter
2. Extraire les informations pertinentes dans la réponse
3. Formater et envoyer les données appropriés sur le courtier Kafka.

## Les tweets résultants ont les attributs suivants :
- id
- created_at
- followers_count
- localisation
- text
- nombre de favoris
- nombre de retweets

In [27]:
def get_twitter_data():
    tweets_rdd = []
    res = api.search_tweets(q="harcelements",count = 700)
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(i.created_at)
        record += ';'
        record += str(i.text)
        record += ';'
        record += str(i.user.description)
        record += ';' 
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        tweets_rdd.append(record)
    
    return tweets_rdd

In [28]:
import re

def make_my_tweet_cleaner(tweet):
    tweet = sc.parallelize([tweet])
    parsed = tweet
    words = parsed.flatMap(lambda line: line.replace('"',' ').replace("'",' ').replace("(",' ').replace(")",' ').replace("\\",' ').replace(".",' ').split())
    hashtags = words.filter(lambda w: re.findall(r'\B#\w*[a-zA-Z]+\w*',w)).map(lambda x:(x, 1))
    # clean the text by removing all emoji and mentionned peoples
    print(words.collect())

In [34]:
def getTweets():
    rdd_tweets = []
    # Get tweets
    try:
        # Get tweets from Paris (geocode) and with the keywords "harcelements" 
        tweets = api.search_tweets(q=["harcelements"], geocode = "48.864716,2.349014,10km", count=200)
        for i in tweets:
            record = ''
            record += str(i.user.id_str)
            #record += ';'
            record += str(i.created_at)
            #record += ';'
            record += str(i.text)
            rdd_tweets.append(record)
        
        return rdd_tweets

    except Exception as e:
        print("Error : " + str(e))

def sendTweets():
    # Get tweets and send to Kafka
    try:
        # Send tweets
        for tweet in get_twitter_data():
            make_my_tweet_cleaner(tweet)
            #kafka_stream_tweets(tweet)
            #stream_tweets = twitteranalysis_database.stream_tweets
            #stream_tweets.insert_many([{"tweet": tweet}])
    except Exception as e:
        print("Error : " + str(e))

In [35]:
sendTweets()

Error : An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 1.0 failed 1 times, most recent failure: Lost task 6.0 in stage 1.0 (TID 14) (192.168.1.182 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 601, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 71, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return

22/06/23 21:11:00 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 601, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 71, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: module 'pyspark.rdd' has no attribute 'T'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org

In [None]:
# # Code de vectorisation des textes des tweets #
# from pyspark.ml.feature import HashingTF, IDF 
# # Définition du traitement de HashingTF 
# nbfeatures=6000
# hashingTF = HashingTF(inputCol="tokens",outputCol="raw_features",numFeatures=nbfeatures)
# features_df = hashingTF.transform(data)
# idf = IDF(inputCol="raw_features", outputCol="features") 
# idf_model = idf.fit(features_df)
# scaled_features_df = idf_model.transform(features_df)

In [None]:
# display(tweets_df)

In [None]:
def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)

In [None]:
periodic_work(60 * 0.1)  # get data every couple of minutes

In [None]:
make_my_tweet_cleaner(tweet)