In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import configparser
import time
import tweepy

In [2]:
path_conf = 'conf/settings.ini'
config = configparser.ConfigParser()
config.read(path_conf)
keys = config['KEYS']

In [3]:
TWITTER_APP_KEY = keys['TWITTER_APP_KEY']
TWITTER_APP_SECRET = keys['TWITTER_APP_SECRET']
TWITTER_KEY = keys['TWITTER_KEY']
TWITTER_SECRET = keys['TWITTER_SECRET']

auth = tweepy.OAuthHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET)
auth.set_access_token(TWITTER_KEY, TWITTER_SECRET)

api = tweepy.API(auth)

In [103]:
from streamz import Stream
from streamz.dataframe import DataFrame
from datetime import datetime as dt
import pandas as pd
import json
from IPython.display import clear_output

# build listener for new tweets

# create a listener that prints the text of any tweet that comes from the Twitter API.
class StreamListener(tweepy.StreamListener):
  def __init__(self, time_limit=60):
    # set time limit and counter
    self.start_time = time.time()
    self.limit = time_limit
    self.counter = 0
    # define source and sink
    self.source = Stream()
    self.sink = {}
    # count hashtags
    clear_output()
    self.source\
      .map(self.hashtags)\
      .accumulate(self.sum_hashtags)\
      .sink(self.sink.update)
    super(StreamListener, self).__init__()
  
  def on_status(self, status):
    # skip retweets
    if hasattr(status, "retweeted_status"):
      return
    # process tweets
    if hasattr(status, "extended_tweet"):
      text = status.extended_tweet["full_text"]
    else:
      text = status.text
    id_str = status.id_str
    name = status.user.screen_name
    ts = dt.now().timestamp()
    # create data dict
    data = {'id': id_str, 'name': name, 'text': text, 'ts': ts}
    # emit data to source
    self.source.emit(data)
    self.counter += 1
    # check for sink cleanup
    if self.counter % 1000 == 0:
      self.clean_sink()
    # check for end
    if (time.time() - self.start_time) < self.limit:
      return True
    else:
      return False

  def on_error(self, status_code):
    print(f'Error: {status_code}')
    if status_code == 420:
      print('Too many requests!')
  
  def hashtags(self, data):
    words = data['text'].split()
    hashtags = list(filter(lambda s: s.startswith('#'), words))
    hashtags = dict((hashtag, 1) for hashtag in hashtags)
    return(hashtags)
  
  def sum_hashtags(self, x, y):
    for key in y:
      if key in x:
        x[key] += 1
      else:
        x[key] = 1
    return(x)
  
  def get_counter(self):
    return(self.counter)

  def get_hashtags(self):
    return(self.sink)
  
  def clean_sink(self):
    for key in self.sink:
      value = self.sink[key]
      if value == 1:
        self.sink.pop(key)
  
  def get_filter_hashtags(self):
    out = {}
    for key in self.sink:
      value = self.sink[key]
      if value > 1:
        out[key] = value
    return(out)
  
  def get_filter_hashtags_df(self):
    htags = self.get_filter_hashtags()
    keys = list(htags.keys())
    values = list(htags.values())
    out = pd.DataFrame({'hashtags': keys, 'count': values})
    out = out.sort_values('count', ascending=False)
    return(out)

In [104]:
stream_listener = StreamListener(time_limit=7200)
tracker = ['trump', 'donald trump']

In [105]:
stream_listener.running = False

In [106]:
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=tracker, is_async=True)

# Get top 10 hashtags

In [107]:
def count_capitals(word):
  count = sum(1 for c in word if c.isupper())
  return(count)

def merge_hashtags(df):
  out = {}
  vdf = df.values
  for t in vdf:
    key = t[0]
    val = t[1]
    keys_lower = map(str.lower, out.keys())
    key_lower = key.lower()
    if key_lower in keys_lower:
      for ok in out.keys():
        if key_lower == ok.lower():
          key_old = ok
          key_new = key
      capitals_old = count_capitals(key_old)
      capitals_new = count_capitals(key_new)
      if capitals_new > capitals_old:
        out[key_new] = out[key_old] + val
        out.pop(key_old)
      else:
        out[key_old] += val
    else:
      out[key] = val
  htags = list(out.keys())
  vals = list(out.values())
  df = pd.DataFrame({'hashtags': htags, 'count': vals})
  df_out = df.sort_values('count', ascending=False).reset_index()[['hashtags', 'count']]
  return(df_out)

df = stream_listener.get_filter_hashtags_df()
df = merge_hashtags(df)
df.head(10)

Unnamed: 0,hashtags,count


Exception in thread Thread-5:
Traceback (most recent call last):
  File "/home/max/minimamba/envs/aai/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/max/minimamba/envs/aai/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/max/minimamba/envs/aai/lib/python3.8/site-packages/tweepy/streaming.py", line 320, in _run
    six.reraise(*exc_info)
  File "/home/max/minimamba/envs/aai/lib/python3.8/site-packages/six.py", line 703, in reraise
    raise value
  File "/home/max/minimamba/envs/aai/lib/python3.8/site-packages/tweepy/streaming.py", line 289, in _run
    self._read_loop(resp)
  File "/home/max/minimamba/envs/aai/lib/python3.8/site-packages/tweepy/streaming.py", line 351, in _read_loop
    self._data(next_status_obj)
  File "/home/max/minimamba/envs/aai/lib/python3.8/site-packages/tweepy/streaming.py", line 323, in _data
    if self.listener.on_data(data) is False:
  File "/home/max/minimamba

## Test section

In [199]:
with open('twitter.json', 'r') as f:
  dump = json.load(f)
  for data in dump:
    if data['id'] == '1282929875997749249':
      sample = data
def hashtags(data):
    words = data['text'].split()
    hashtags = list(filter(lambda s: s.startswith('#'), words))
    hashtags = dict((hashtag, 1) for hashtag in hashtags)
    return(hashtags)
test = hashtags(sample)
print(test)

{'#trump2020': 1, '#MakeChinaPay': 1, '#Vote2020': 1, '#voteoutdemocrats': 1}


In [201]:
x = {'#trump2020': 1, '#MakeChinaPay': 1, '#Vote2020': 1, '#voteoutdemocrats': 1}
y = {'#Trump': 1, '#Biden,': 1}
z = {'#Trump': 1, '#Corona,': 1}
def sum_hashtags(x, y):
  for key in y:
    if key in x:
      x[key] += 1
    else:
      x[key] = 1
  return(x)
test = sum_hashtags(x, y)
test = sum_hashtags(x, z)
print(test)

{'#trump2020': 1, '#MakeChinaPay': 1, '#Vote2020': 1, '#voteoutdemocrats': 1, '#Trump': 2, '#Biden,': 1, '#Corona,': 1}


In [213]:
from streamz import Stream

tsample = '@realDonaldTrump Let China pay for this\n\nVote Trump this November \n\n#trump2020 #MakeChinaPay #Vote2020 #voteoutdemocrats'

source = Stream()
source\
  .map(hashtags)\
  .accumulate(sum_hashtags)\
  .sink(print)
#source.map(hashtags).sink(print)

Output()

In [215]:
source.emit(sample)

{'#trump2020': 2, '#MakeChinaPay': 2, '#Vote2020': 2, '#voteoutdemocrats': 2}


```
description = status.user.description
loc = status.user.location
text = status.text
coords = status.coordinates
name = status.user.screen_name
user_created = status.user.created_at
followers = status.user.followers_count
id_str = status.id_str
created = status.created_at
retweets = status.retweet_count
bg_color = status.user.profile_background_color
```

interesting objects attributes
```
text: short text with link
user: object(id, name, screen_name, location)
extended_tweet(full_text, user_mentions)
```