In [None]:
%pip install tweepy==3.10.0

import json
import tweepy

In [None]:
with open('src/credential.json','r') as f:
    credential = json.load(f)

CONSUMER_KEY = credential['twitter_api_key']
CONSUMER_SECRET = credential['twitter_api_secret_key']
ACCESS_TOKEN = credential['twitter_access_token']
ACCESS_TOKEN_SECRET = credential['twitter_access_token_secret']
bearer_token = credential['bearer_token']

In [None]:
class SimpleStreamListener(tweepy.StreamListener):
    
    def on_status(self, status):
        tweet = json.dumps({
            'id': status.id, 
            'name': status.user.name, 
            'user_location': status.user.location,
            'text': status.text, 
            'fav': status.favorite_count, 
            'tweet_date': status.created_at.strftime("%Y-%m-%d %H:%M:%S"), 
            'tweet_location': status.place.full_name if status.place else None}, 
            default=str)

            
    def on_error(self, status_code):
        print(status_code)
        if status_code == 420:
            return False

stream_listener = SimpleStreamListener()
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
twitterStream = tweepy.Stream(auth, stream_listener)
twitterStream.filter(track=['Starbucks'], languages=['en'])


In [None]:
%pip install confluent_kafka

In [None]:
### kafka
# ref: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html
import asyncio

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic

BROKER_URL = "PLAINTEXT://localhost:9092"
TOPIC_NAME = "test"

### twitter
import tweepy
from tweepy.auth import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import logging 


### logging 
FORMAT = "%(asctime)s | %(name)s - %(levelname)s - %(message)s"
LOG_FILEPATH = "/workspace/twitter_topic_analysis_dashboard/logs/testing.log"
logging.basicConfig(
    filename=LOG_FILEPATH,
    level=logging.INFO,
    filemode='w',
    format=FORMAT)

### Authenticate to Twitter
with open('src/credential.json','r') as f:
    credential = json.load(f)

CONSUMER_KEY = credential['twitter_api_key']
CONSUMER_SECRET = credential['twitter_api_secret_key']
ACCESS_TOKEN = credential['twitter_access_token']
ACCESS_TOKEN_SECRET = credential['twitter_access_token_secret']
BEARER_TOKEN = credential['bearer_token']


@dataclass
class Purchase:
    username: str = field(default_factory=faker.user_name)
    currency: str = field(default_factory=faker.currency_code)
    amount: int = field(default_factory=lambda: random.randint(100, 200000))

    def serialize(self):
        """Serializes the object in JSON string format"""
        # TODO: Serializer the Purchase object
        #       See: https://docs.python.org/3/library/json.html#json.dumps
        return json.dumps(
                {
                        "username": self.username,
                        "currency": self.currency,
                        "amount"  : self.amount,
                }
        )




class SimpleStreamListener(tweepy.StreamListener):
    """
    Streaming the recent tweets related to the query to Azure Datalake
    """

    def on_status(self, status):
        tweet = json.dumps({
            'id': status.id, 
            'name': status.user.name, 
            'user_location':status.user.location, 
            'text': status.text, 
            'fav': status.favorite_count, 
            'tweet_date': status.created_at.strftime("%Y-%m-%d %H:%M:%S"), 
            'tweet_location': status.place.full_name if status.place else None
        }, default=str)  
        print(tweet)
        return tweet
    

def streaming_tweets():
    stream_listener = SimpleStreamListener()
    auth = tweepy.OAuthHandler(consumer_key=CONSUMER_KEY, consumer_secret = CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
    twitterStream = tweepy.Stream(auth, stream_listener)
    twitterStream.filter(track=['starbucks'], languages=['en'])



async def produce(topic_name):
    """
    Produces data into the Kafka Topic
    :param topic_name:
    :return:
    """
    p = Producer({"bootstrap.servers": BROKER_URL})

    try:
        # produce a message to the topic 
        p.produce(topic_name, Purchase().serialize())
        p.flush()
        await asyncio.sleep(0.01)
    except Exception as e:
        print(e)
        return False


async def consumer(topic_name):
    """
    Consumes data from the Kafka topic
    :param topic_name:
    :return:
    """
    # Configure the consumer with 'bootstrap.servers' and 'group.id' (not applicable in this case)
    c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "test"})

    # Subscribe to the topic
    c.subscribe([topic_name])

    message = c.consume(timeout=0.01)

    # Handle the message
    if message is None:
        await asyncio.sleep(0.01)
        logging.ERROR("No message is recevied by consumer")    

async def produce_consume():
    """
    Runs the Producer and Consumer task
    :return:
    """
    t1 = asyncio.create_task(produce(TOPIC_NAME))
    t2 = asyncio.create_task(consumer(TOPIC_NAME))
    await t1
    await t2

def main():
    """
    runs the exercise
    :return:
    """
    # Configure the AdminClient with 'bootstrap.servers' 
    client = AdminClient({"bootstrap.servers": BROKER_URL})

    # Create a NewTopic object
    topic = NewTopic(TOPIC_NAME, num_partitions = 1, replication_factor = 1)

    # Using 'client', create the topic
    client.create_topics([topic])

    try:
        asyncio.run(produce_consume())
    except KeyboardInterrupt as e:
        logging.INFO('Shutting Down')



In [None]:

streaming_tweets()

In [None]:

BROKER_URL = "localhost:9092"
TOPIC_NAME = "testing2"

### twitter
import tweepy
from tweepy.auth import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import logging 


### logging 
FORMAT = "%(asctime)s | %(name)s - %(levelname)s - %(message)s"
LOG_FILEPATH = "/workspace/twitter_topic_analysis_dashboard/logs/testing.log"
logging.basicConfig(
    filename=LOG_FILEPATH,
    level=logging.INFO,
    filemode='w',
    format=FORMAT)

### Authenticate to Twitter
with open('src/credential.json','r') as f:
    credential = json.load(f)

CONSUMER_KEY = credential['twitter_api_key']
CONSUMER_SECRET = credential['twitter_api_secret_key']
ACCESS_TOKEN = credential['twitter_access_token']
ACCESS_TOKEN_SECRET = credential['twitter_access_token_secret']
BEARER_TOKEN = credential['bearer_token']

class SimpleStreamListener(tweepy.StreamListener):
    """
    Streaming the recent tweets related to the query to Azure Datalake
    """

    def on_status(self, status):
        tweet = json.dumps({
            'id': status.id, 
            'name': status.user.name, 
            'user_location':status.user.location, 
            'text': status.text, 
            'fav': status.favorite_count, 
            'tweet_date': status.created_at.strftime("%Y-%m-%d %H:%M:%S"), 
            'tweet_location': status.place.full_name if status.place else None
        }, default=str)  

        producer.send(topic_name, str.encode(status))
        return True
    

async def produce_streaming_tweets():
    """
    Produces data into the Kafka Topic
    :param topic_name:
    :return:
    """
    stream_listener = SimpleStreamListener()
    auth = tweepy.OAuthHandler(consumer_key=CONSUMER_KEY, consumer_secret = CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
    twitterStream = tweepy.Stream(auth, stream_listener)
    twitterStream.filter(track=['starbucks'], languages=['en'])



Web Scrapping