## Streaming Twitter

Twitter Stream Processing with Tweepy, Kafka and Spark Structured Streaming.


This notebook guides through the development of a basic stream processing application using Twitter data. 

We will fetch tweets from 
- Paris
- Rome
- Berlin
- Munich
- Vienna
- Zurich

over the "Filter realtime tweets" functionality of the Twitter API. By passing bounding boxes for each of the 6
cities to the API, we can restrict the stream of data to all (public) tweets coming specifically from these areas.



**Two use cases will be implemented:**
- Computing how many tweets per minute are produced in each city and storing these values (on a minute-by-minute basis) in an Azure PostgreSQL database. 
- Submitting an alert in a Kafka output topic when in one of these cities a certain threshold for the number of tweets per minute is exceeded. 


**The tools we're gonna make use of are:**
- A python script leveraging the Tweepy package for fetching the tweets 
- Kafka - running on an Azure VM - as input layer for Spark Structured Streaming 
- Spark Structured Streaming on Databricks Community Edition (Spark 2.3, Scala 2.11)
- Azure PostgreSQL DB to persist results

### Setup

In [None]:
import tweepy
import json 

Create 'config/twitter-config.json' file with format: 
<br> {'access-token': 'XXX',
 'access-token-secret': 'XXX',
 'api-key': 'XXX',
 'api-secret-key': 'XXX'}

In [None]:
# json file 'config/twitter-config.json' necessary
with open('config/twitter-config.json') as f:
    config = json.load(f)

### Hello Tweepy!

Post the most current 5 tweets of my wall:

In [None]:
auth = tweepy.OAuthHandler(config['api-key'], config['api-secret-key'])
auth.set_access_token(config['access-token'], config['access-token-secret'])

api = tweepy.API(auth)

public_tweets = api.home_timeline()

for tweet in public_tweets[:5]:
    print(tweet.text)

### Accessing Geolocations

Refer to https://gist.github.com/dev-techmoe/ef676cdd03ac47ac503e856282077bf2 for Tweepy's staus object structure.

From [Twitter API](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/geo-objects.html): *"The place object is always present when a Tweet is geo-tagged, while the coordinates object is only present (non-null) when the Tweet is assigned an exact location. If an exact location is provided, the coordinates object will provide a [long, lat] array with the geographical coordinates, and a Twitter Place that corresponds to that location will be assigned."*

We want to extract posts from the following European cities: 
- Paris: 2.229335, 48.819298, 2.416376, 48.902579
- Rome: 12.358222, 41.777618, 12.632567, 42.016053
- Berlin: 13.097992, 52.379442, 13.712705, 52.667389
- Munich: 11.373648, 48.063202, 11.699651, 48.240582
- Vienna: 16.192143, 48.114450, 16.551614,48.326583
- Zurich: 8.460501, 47.324890,  8.616888, 47.432676

### Test script for Tweepy

JSON data should be printed as requiried for the Kafka topic. Of course, you'll need to adjust the config file according to your TwitterDev Account.

In [None]:
import json
import tweepy 
from datetime import datetime 

mapping_dict = {'FR': 'Paris', 
                'AT': 'Vienna',
                'CH': 'Zurich', 
                'IT': 'Rome'}

class StdOutListener(tweepy.StreamListener):
    
    ''' Handles data received from the stream. '''
 
    def on_status(self, status):
        
        status_dict = {}
        
        try: 
            #known error checking
            if status.place == None: 
                self.on_error('place_none')
            elif len(status.place.country_code) != 2: 
                self.on_error('invalid_cc')

            #get text
            try: 
                status_dict['tweet'] = status.extended_tweet['full_text']
            except AttributeError: 
                status_dict['tweet'] = status.text

            #get country code
            if status.place.country_code == 'DE': 

                lon = int(status.place.bounding_box.coordinates[0][0][0])

                if lon == 13: 
                    status_dict['city'] = 'Berlin'
                elif lon == 11:
                    status_dict['city'] = 'Munich'

            else: 
                status_dict['city'] = mapping_dict[status.place.country_code]

            #get timestamps 
            status_dict['timestamp'] = str(status.created_at)

            #get hashtags
            for hashtag in status.entities['hashtags']:
                hashtags = []
                hashtags.append(hashtag['text'])

            print(json.dumps(status_dict, ensure_ascii = False))
            
        except: 
            #unspecified error, skip and continue listening
            self.on_error('unspecified')
    
    def on_error(self, status_code):
        print('Got an error with status code: ' + str(status_code))
        return True # To continue listening
 
    def on_timeout(self):
        print('Timeout...')
        return True # To continue listening
    
    
if __name__ == '__main__':
     
    listener = StdOutListener()
    auth = tweepy.OAuthHandler(config['api-key'], config['api-secret-key'])
    auth.set_access_token(config['access-token'], config['access-token-secret'])

    stream = tweepy.Stream(auth, listener)
    print('Stream started at {}'.format(datetime.now()))
    loc_ls = [2.229335, 48.819298, 2.416376, 48.902579,
              12.358222, 41.777618, 12.632567, 42.016053,
              13.097992, 52.379442, 13.712705, 52.667389,
              11.373648, 48.063202, 11.699651, 48.240582,
              16.192143, 48.114450, 16.551614, 48.326583,
              8.460501, 47.324890,  8.616888, 47.432676]
    stream.filter(locations=loc_ls)

**Sample Output**: <br> 
*Stream started at 2018-12-21 12:05:46.941793 <br>
{"tweet": "@authentic__one @STban91240 Mais moi je t'apprécie et tu le sais. \nEt si c'est à cause d'Hazard, dis toi qu'en 2011 c'était le meilleur joueur de la L1. Après tout c'est toujours ça que Neymar n'a pas.\nSakam.", "city": "Paris", "timestamp": "2018-12-21 11:05:44"} <br>
{"tweet": "immagine decisamente accurata https://t.co/KDN7mUtzZF", "city": "Rome", "timestamp": "2018-12-21 11:05:45"} <br>
{"tweet": "Outdoor sports - Best fitness equipment @ Kahlenberg https://t.co/HYHx4jbzyk", "city": "Vienna", "timestamp": "2018-12-21 11:05:46"} <br>
{"tweet": "@karypant @Mov5Stelle Peggio, ci considera malati da curare. Né più né meno che quello che pensano di omosessuali. Gravissimo! e tutti i docenti che hanno votato 5s muti! Che vergogna \nLa #scuolapubblica è laica non confessionale", "city": "Rome", "timestamp": "2018-12-21 11:05:58"}*

### Kafka Producer Script

If this script runs, we know we get the right data from Twitter. Next step: let's write the Python script 'streaming-twitter.py' (see file) - it'll be used to fetch the tweets and write directly into our Kafka topic on our VM.

### Setting up Kafka on Virtual Machine 

See: https://kafka.apache.org/quickstart

Ubuntu Server 18.04.running on Azure.

**Step 1)** Start Ubuntu Server 18.04. VM (in our case running on Azure), enable SSH access <br> 
**Step 2)** Install necessary packages on VM: 
- sudo apt install openjdk-8-jre-headless
- sudo apt install python3-pip
- pip3 install tweepy
- pip3 install kafka-python

**Step 2)** Edit Kafka config/server_properties, uncomment the following lines
- listeners=PLAINTEXT://0.0.0.0:9092 
- advertised.listeners=PLAINTEXT://*YOUR_VM_IP_HERE*:9092 

**Step 3)** Opening Ports 2181, 2188 and 9092 for inbound connections in the Azure portal (VM > Network) <br>
**Step 4)** Loading 'streaming_twitter.py'and 'config/twitter-config.json' into VM (e.g. using nano) <br>
**Step 5)** Launching Kafka Zookeeper & Server and cretaing input topic
- *Launching Kafka Zookeeper:* bin/zookeeper-server-start.sh config/zookeeper.properties
- *Launching Kafka Server:* bin/kafka-server-start.sh config/server.properties
- *Creating Input Topic:* bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twitter-input
- *Creating Alert Topic:* bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twitter-alerts

You can check if the topics have been created via:
- bin/kafka-topics.sh --list --zookeeper localhost:2181

You can delete topics via: 
- bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic twitter-input

**Step 6)** Write into topic via 'streaming-twitter.py' file: 
- python3 streaming-twitter.py

**Step 7)** Check if streams are received: 
- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic twitter-input --from-beginning

<br>
**Output should be your JSONs tweet by tweet. ** <br> <br>

### Setup PostgreSQL on Azure 

Databricks Community Edition

- Enable external access to the DB by setting up a corresponding Firewall rule in the Azure portal, see https://docs.microsoft.com/en-us/azure/postgresql/concepts-firewall-rules. <br><br>

- Creating a database on PostgreSQL: 
        CREATE DATABASE twitter;

- Creating a table in the database: 
    
        CREATE TABLE tweets (
            window_id SERIAL PRIMARY KEY,
            window_start TIMESTAMP,
            city VARCHAR(10),
            tweet_count INTEGER );
            
- Creating window sequence 
    
        CREATE SEQUENCE w_sequence START 1 INCREMENT 1;

### Running Spark Structured Streaming Operations in Databricks 

see Scala Files (corresponding to Databricks Notebooks).