In [None]:
import config
import datetime
import os
import pandas as pd
import requests
import tweepy as tw
import threading
import time
import queue
from pathlib import Path

In [None]:
def get_df():
    return pd.DataFrame(
        columns=[
            "tweet_id",
            "name",
            "screen_name",
            "retweet_count",
            "text",
            "mined_at",
            "created_at",
            "favourite_count",
            "hashtags",
            "status_count",
            "followers_count",
            "location",
            "source_device",
        ]
    )

In [None]:
#### Twitter API ####
class TweetMiner(object):
    result_minit = 20
    ret = []
    api = False

    twitter_keys = {
        "consumer_key": config.consumer_key,
        "consumer_secret": config.consumer_secret,
        "access_token_key": config.access_token_key,
        "access_token_secret": config.access_token_secret,
    }

    def __init__(self, keys_dict=twitter_keys, api=api):
        """ 
        Initialize the miner.
        """
        self.api = api
        auth = tw.OAuthHandler(keys_dict["consumer_key"], keys_dict["consumer_secret"])
        auth.set_access_token(keys_dict["access_token_key"], keys_dict["access_token_secret"])
        self.api = tw.API(auth, wait_on_rate_limit=True)
        self.twitter_keys = keys_dict
        self.path = Path(f'{os.getcwd()}')

    def mine_tweets(self, query="BTC"):
        """
        Mine tweets from the query.
        """
        last_tweet_id = False
        page_num = 1

        ret = get_df()
        crypto_query = f"#{query}" # since:2022-01-24 until:2022-01-26"
        print("========", query, crypto_query)

        document_count = 0
        for page in tw.Cursor(
            self.api.search_tweets, 
            q=crypto_query, 
            lang="en", 
            tweet_mode="extended", 
            count=200
        ).pages():
            print("........... new page", page_num)
            page_num += 1

            for tweet in page:
                data = {
                    "tweet_id": tweet.id,
                    "name": tweet.user.name,
                    "screen_name": tweet.user.screen_name,
                    "retweet_count": tweet.retweet_count,
                    "text": tweet.full_text,
                    "mined_at": datetime.datetime.now(),
                    "created_at": tweet.created_at,
                    "favourite_count": tweet.favorite_count,
                    "hashtags": tweet.entities.get("hashtags"),
                    "status_count": tweet.user.statuses_count,
                    "followers_count": tweet.user.followers_count,
                    "location": tweet.user.location,
                    "source_device": tweet.source,
                }
                try:
                    data["retweet_text"] = tweet.retweeted_status.full_text
                except:
                    data["retweet_text"] = "None"

                last_tweet_id = tweet.id
                ret = ret.append(data, ignore_index=True)

            
            if page_num % 180 == 0:
                date_label = datetime.datetime.now().strftime("%Y-%m-%d")
                file_name = f"{date_label}-{query}-{document_count}.csv"
                print("Saving to file:", file_name, " after page number ", page_num)
                ret.to_csv(file_name, index=False)
                document_count += 1
                print("Resetting df")
                ret = get_df()
        date_label = datetime.datetime.now().strftime("%Y-%m-%d")
        ret.to_csv(f"{self.Path}\{date_label}-{query}-{page_num}.csv", index=False)

In [None]:
miner = TweetMiner()

handle_list = [
    #"DOGE",
    "BTC",
]

In [None]:
should_publish = threading.Event()
update_queue = queue.Queue()


def start_publisher():
    """ 
    Start the publisher thread. (This is the thread that will be listening for updates)
    """
    global handle_list

    starttime = time.time()
    print("Start polling", starttime)
    poll_iteration = 1

    for i in range(10):
        for name in handle_list[:1]:
            print(i, poll_iteration, "\rpublishing update ", end="")
            update_queue.put((poll_iteration, name))
            poll_iteration += 1
            time.sleep(900)
            print("\rawaiting for publishing update", end="")
            should_publish.wait()
            update_queue.join()

def start_update_listener():
    """
    Start the update listener thread. (This is the thread that will be listening for updates)
    """
    while True:
        poll_iteration, name = update_queue.get()

        print(" --- ", name)
        try:

            miner.mine_tweets(query=name)
            update_queue.task_done()

        except Exception as e:  # work on python 3.x
            print("Failed to upload to ftp: " + str(e))

# Start the threads
listener_thread = threading.Thread(target=start_update_listener, daemon=True)
publisher_thread = threading.Thread(target=start_publisher, daemon=True)

In [None]:
# Start the threads. 
publisher_thread.start()
listener_thread.start()

# start publishing
should_publish.set()

In [None]:
# Start the threads. 
should_publish.clear()