## TW Shadowban

https://github.com/shadowban-eu/shadowban-eu-backend

In [None]:
import aiohttp
import argparse
import asyncio
import daemon
import json
import os
import re
import traceback
import urllib.parse
import sys
import time

from aiohttp import web
from bs4 import BeautifulSoup
from db import connect

In [None]:
TWITTER_AUTH_KEY = 'AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'

In [None]:
routes = web.RouteTableDef()

class UnexpectedApiError(Exception):
    pass

def get_nested(obj, path, default=None):
    for p in path:
        if obj is None or not p in obj:
            return default
        obj = obj[p]
    return obj

def is_error(result, code=None):
    return isinstance(result.get("errors", None), list) and (len([x for x in result["errors"] if x.get("code", None) == code]) > 0 or code is None and len(result["errors"] > 0))

def is_another_error(result, codes):
    return isinstance(result.get("errors", None), list) and len([x for x in result["errors"] if x.get("code", None) not in codes]) > 0

In [None]:
account_sessions = []
account_index = 0
log_file = None
debug_file = None
guest_session_pool_size = 10
guest_sessions = []
test_index = 0

#Select and return the next available session instance from account_sessions
def next_session():
    def key(s):
        remaining_time = s.reset - time.time()
        if s.remaining <= 3 and remaining_time > 0:
            return 900
        return remaining_time
    sessions = sorted([s for s in account_sessions if not s.locked], key=key)
    if len(sessions) > 0:
        return sessions[0]

In [None]:
# Define the TwitterSession class, representing a session for accessing Twitter API

class TwitterSession:
    twitter_auth_key = None

    #Guest token used for anonymous (guest) access
    def __init__(self):
        self._guest_token = None
        self._csrf_token = None

        # aiohttp ClientSession
        self._session = None

        # rate limit monitoring
        self.limit = -1
        self.remaining = 180
        self.reset = -1
        self.overshot = -1
        self.locked = False
        self.next_refresh = None

        # session user's @username
        # this stays `None` for guest sessions
        self.username = None

        self._headers = {
            "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
        }
        # sets self._headers
        self.reset_headers()

In [None]:
#Filter cookies in the current aiohttp session's cookie jar, iterate over each cookie and find the cookie holding the CSRF token
def set_csrf_header(self):
        cookies = self._session.cookie_jar.filter_cookies('https://twitter.com/')
        for key, cookie in cookies.items():
            if cookie.key == 'ct0':
                self._headers['X-Csrf-Token'] = cookie.value

In [None]:
def reset_headers(self):
        self._headers = {
            "User-Agent": "Mozilla/5.0 ..."
        }

In [None]:
async def get_guest_token(self):
    # Add Authorization header with Bearer token using the preset twitter_auth_key
    self._headers['Authorization'] = 'Bearer ' + self.twitter_auth_key
    
    # Asynchronously POST to Twitter's guest activation API endpoint
    async with self._session.post("https://api.twitter.com/1.1/guest/activate.json", headers=self._headers) as r:
        # Parse the JSON response from the API
        response = await r.json()
    
    # Extract the 'guest_token' from the response, or None if not found
    guest_token = response.get("guest_token", None)
    
    # If guest_token is None, log debug information
    if guest_token is None:
        debug("Failed to fetch guest token")
        debug(str(response))
        debug(str(self._headers))
    
    # Return the guest_token (or None if failed)
    return guest_token


In [None]:
async def renew_session(self):
    # Try to close the existing aiohttp ClientSession to release resources
    await self.try_close()
    # Create a new aiohttp ClientSession instance for a fresh network session
    self._session = aiohttp.ClientSession()
    # Reset HTTP headers to default values for the new session
    self.reset_headers()

In [None]:
# If there is a logged-in user, or no scheduled refresh time, or the current time is before the refresh time, do not refresh
async def refresh_old_token(self):
    if self.username is not None or self.next_refresh is None or time.time() < self.next_refresh:
        return
    debug("Refreshing token: " + str(self._guest_token))
    await self.login_guest()
    debug("New token: " + str(self._guest_token))

In [None]:
# If the aiohttp session exists, try to close it asynchronously and ignore any exceptions
async def try_close(self):
    if self._session is not None:
        try:
            await self._session.close()
        except:
            pass

In [None]:
async def login_guest(self):
    # Asynchronously renew the session, set the CSRF header, then try to get a new guest token.
    # If a new guest token is obtained, update it and set the next refresh time to one hour later.
    # Finally, add the guest token to the request headers.
    await self.renew_session()
    self.set_csrf_header()
    old_token = self._guest_token
    new_token = await self.get_guest_token()
    self._guest_token = new_token if new_token is not None else old_token
    if new_token is not None:
        self.next_refresh = time.time() + 3600
    self._headers['X-Guest-Token'] = self._guest_token


In [None]:
async def login(self, username=None, password=None, email=None, cookie_dir=None):
    # Create a new aiohttp session for sending requests
    self._session = aiohttp.ClientSession()

    if password is not None:
        # If a password is provided, it means user login is required
        login_required = True
        cookie_file = None

        if cookie_dir is not None:
            # If a cookie directory is specified, try to load cookies from file to avoid repeated login
            cookie_file = os.path.join(cookie_dir, username)
            if os.path.isfile(cookie_file):
                log("Use cookie file for %s" % username)
                self._session.cookie_jar.load(cookie_file)
                login_required = False  # 有有效 cookie，跳过登录

        store_cookies = True

        if login_required:
            # Access the login page to get the authenticity_token required for login
            async with self._session.get("https://twitter.com/login", headers=self._headers) as r:
                login_page = await r.text()

            form_data = {}
            soup = BeautifulSoup(login_page, 'html.parser')
            # Extract the authenticity_token from the login page
            form_data["authenticity_token"] = soup.find('input', {'name': 'authenticity_token'}).get('value')
            form_data["session[username_or_email]"] = email 
            form_data["session[password]"] = password 
            form_data["remember_me"] = "1" 

            # Submit the login form
            async with self._session.post('https://twitter.com/sessions', data=form_data, headers=self._headers) as r:
                response = await r.text()
                if str(r.url) == "https://twitter.com/":
                    log("Login of %s successful" % username)
                else:
                    store_cookies = False
                    log("Error logging in %s (%s)" % (username, r.url))
                    debug("ERROR PAGE\n" + response)
        else:
            # If using cookie, visit homepage to refresh session
            async with self._session.get('https://twitter.com', headers=self._headers) as r:
                await r.text()

        # Set CSRF request headers
        self.set_csrf_header()
        self.username = username

        if cookie_file is not None and store_cookies:
            # Save cookies to file
            self._session.cookie_jar.save(cookie_file)

    else:
        # No password given, login as guest
        await self.login_guest()

    # Set the Authorization header with the static twitter_auth_key
    self._headers['Authorization'] = 'Bearer ' + self.twitter_auth_key


In [None]:
async def get(self, url, retries=0):
    # Set the CSRF header to ensure request security
    self.set_csrf_header()

    # Refresh old guest token to keep it valid
    await self.refresh_old_token()

    try:
        # Send asynchronous GET request and get JSON response
        async with self._session.get(url, headers=self._headers) as r:
            result = await r.json()
    except Exception as e:
        # Catch exceptions, log debug info, then re-raise the exception
        debug("EXCEPTION: " + str(type(e)))
        debug("EXCEPTION text: " + str(e))
        raise e

    # Monitor and update rate limit information
    self.monitor_rate_limit(r.headers)

    # If guest session with fewer than 10 requests remaining or error codes 88 or 239, relogin guest
    if self.username is None and self.remaining < 10 or is_error(result, 88) or is_error(result, 239):
        await self.login_guest()

    # If retries remain and error code 353 occurs, recursively retry the request
    if retries > 0 and is_error(result, 353):
        return await self.get(url, retries - 1)

    # If error code 326 returned, mark the session as locked
    if is_error(result, 326):
        self.locked = True

    # Return the fetched result
    return result

In [None]:
async def search_raw(self, query, live=True):
    # Construct search request; add live tweet mode if live=True
    additional_query = ""
    if live:
        additional_query = "&tweet_search_mode=live"
    # Send GET request and return search results
    return await self.get("https://api.twitter.com/2/search/adaptive.json?q="+urllib.parse.quote(query)+"&count=20&spelling_corrections=0" + additional_query)

async def typeahead_raw(self, query):
    # Construct typeahead request to get user search suggestions
    return await self.get("https://api.twitter.com/1.1/search/typeahead.json?src=search_box&result_type=users&q=" + urllib.parse.quote(query))

async def profile_raw(self, username):
    # Fetch user profile info by username
    return await self.get("https://api.twitter.com/1.1/users/show.json?screen_name=" + urllib.parse.quote(username))

async def get_profile_tweets_raw(self, user_id):
    # Get tweets timeline for a user ID including replies, excluding retweets, up to 1000 tweets
    return await self.get("https://api.twitter.com/2/timeline/profile/" + str(user_id) +".json?include_tweet_replies=1&include_want_retweets=0&include_reply_count=1&count=1000")

async def tweet_raw(self, tweet_id, count=20, cursor=None, retry_csrf=True):
    # Fetch a single tweet and its conversation with pagination and count parameters
    if cursor is None:
        cursor = ""
    else:
        cursor = "&cursor=" + urllib.parse.quote(cursor)
    return await self.get("https://api.twitter.com/2/timeline/conversation/" + tweet_id + ".json?include_reply_count=1&send_error_codes=true&count="+str(count)+ cursor)


In [None]:
def monitor_rate_limit(self, headers):
    # Store the last remaining count to detect rate limit reset
    last_remaining = self.remaining
    limit = headers.get('x-rate-limit-limit', None)
    remaining = headers.get('x-rate-limit-remaining', None)
    reset = headers.get('x-rate-limit-reset', None)

    # Update the current limit, remaining requests, and reset timestamp
    if limit is not None:
        self.limit = int(limit)
    if remaining is not None:
        self.remaining = int(remaining)
    if reset is not None:
        self.reset = int(reset)

    # Detect rate limit reset if remaining increased from last time, and save overshoot count if any
    if last_remaining < self.remaining and self.overshot > 0 and self.username is not None:
        log('[rate-limit] Reset detected for ' + self.username + '. Saving overshoot count...')
        if db is not None:
            db.write_rate_limit({'screen_name': self.username, 'overshot': self.overshot})
        self.overshot = 0

    # Track and log requests failed due to hitting rate limit
    if self.remaining == 0:
        log('[rate-limit] Limit hit by ' + str(self.username) + '.')
        self.overshot += 1


In [None]:
@classmethod
def flatten_timeline(cls, timeline_items):
    # Initialize a result list to store all extracted tweet IDs
    result = []

    # Iterate through each timeline item
    for item in timeline_items:
        # If the item contains a direct tweet ID, extract and add it to result
        if get_nested(item, ["content", "item", "content", "tweet", "id"]) is not None:
            result.append(item["content"]["item"]["content"]["tweet"]["id"])

        # Otherwise, if the item contains a timelineModule, extract all tweet IDs inside it
        elif get_nested(item, ["content", "timelineModule", "items"]) is not None:
            timeline_items = item["content"]["timelineModule"]["items"]
            # Extract tweet IDs one by one from timelineModule items
            titems = [get_nested(x, ["item", "content", "tweet", "id"]) for x in timeline_items]
            # Filter out None and add valid IDs to result
            result += [x for x in titems if x is not None]

    # Return the list of all extracted tweet IDs
    return result


In [None]:
@classmethod
def get_ordered_tweet_ids(cls, obj, filtered=True):
    try:
        # Extract the 'entries' list from instructions containing 'addEntries'
        entries = [x for x in obj["timeline"]["instructions"] if "addEntries" in x][0]["addEntries"]["entries"]
    except (IndexError, KeyError):
        # Return empty list if key not found or list empty
        return []

    # Sort entries in descending order by 'sortIndex'
    entries.sort(key=lambda x: -int(x["sortIndex"]))

    # Extract all tweet IDs from entries using flatten_timeline
    flat = cls.flatten_timeline(entries)

    # Filter IDs to include only those in globalObjects['tweets'] if filtered is True
    return [x for x in flat if not filtered or x in obj["globalObjects"]["tweets"]]


In [None]:
async def test_ghost_ban(self, user_id):
    try:
        # Fetch raw tweets and replies for the user
        tweets_replies = await self.get_profile_tweets_raw(user_id)

        # Get all tweet IDs ordered by time
        tweet_ids = self.get_ordered_tweet_ids(tweets_replies)

        replied_ids = []
        # Filter tweets that are authored by the user and have replies
        for tid in tweet_ids:
            tweet = tweets_replies["globalObjects"]["tweets"][tid]
            if tweet["reply_count"] > 0 and tweet["user_id_str"] == user_id:
                replied_ids.append(tid)

        # For each tweet with replies, check if any reply is ghost banned
        for tid in replied_ids:
            tweet = await self.tweet_raw(tid)
            for reply_id, reply_obj in tweet["globalObjects"]["tweets"].items():
                # Skip if it's the original tweet or not a reply to this tweet
                if reply_id == tid or reply_obj.get("in_reply_to_status_id_str", None) != tid:
                    continue
                # Fetch detailed info of the reply tweet
                reply_tweet = await self.tweet_raw(reply_id)
                if reply_id not in reply_tweet["globalObjects"]["tweets"]:
                    continue
                obj = {"tweet": tid, "reply": reply_id}
                # If reply tweet exists, no ban; else, it's ghost banned
                if tid in reply_tweet["globalObjects"]["tweets"]:
                    obj["ban"] = False
                else:
                    obj["ban"] = True
                return obj
    except:
        # Catch any unexpected exceptions, print debug info, return error dict
        debug('Unexpected Exception:')
        debug(traceback.format_exc())
        return { "error": "EUNKNOWN" }


In [None]:
async def test_barrier(self, user_id, screen_name):
    try:
        # Get the raw timeline data of the user's tweets and replies
        tweets_replies = await self.get_profile_tweets_raw(user_id)
        # Get the ordered list of tweet IDs from the timeline data
        tweet_ids = self.get_ordered_tweet_ids(tweets_replies)

        reply_tweet_ids = []

        # Filter tweet IDs that are replies and exclude replies to conversations started by the user
        for tid in tweet_ids:
            if "in_reply_to_status_id_str" not in tweets_replies["globalObjects"]["tweets"][tid] or tweets_replies["globalObjects"]["tweets"][tid]["user_id_str"] != user_id:
                continue
            tweet = tweets_replies["globalObjects"]["tweets"][tid]
            conversation_tweet = get_nested(tweets_replies, ["globalObjects", "tweets", tweet["conversation_id_str"]])
            if conversation_tweet is not None and conversation_tweet.get("user_id_str") == user_id:
                continue
            reply_tweet_ids.append(tid)

        # Return error if user has no reply tweets
        if not reply_tweet_ids:
            return {"error": "ENOREPLIES"}

        # Test each reply tweet for barrier condition
        for tid in reply_tweet_ids:
            # Get the tweet ID this reply is replying to
            replied_to_id = tweets_replies["globalObjects"]["tweets"][tid].get("in_reply_to_status_id_str", None)
            if replied_to_id is None:
                continue

            # Get detailed info and partial replies of the tweet being replied to
            replied_tweet_obj = await self.tweet_raw(replied_to_id, 50)
            if "globalObjects" not in replied_tweet_obj:
                continue
            if replied_to_id not in replied_tweet_obj["globalObjects"]["tweets"]:
                continue
            replied_tweet = replied_tweet_obj["globalObjects"]["tweets"][replied_to_id]

            # The first tweet of the conversation must exist
            if not replied_tweet["conversation_id_str"] in replied_tweet_obj["globalObjects"]["tweets"]:
                continue
            conversation_tweet = replied_tweet_obj["globalObjects"]["tweets"][replied_tweet["conversation_id_str"]]

            # Exclude replies to conversations started by the user themselves
            if conversation_tweet["user_id_str"] == user_id:
                continue

            # Skip tweets with over 500 replies to avoid interference from very popular tweets
            if replied_tweet["reply_count"] > 500:
                continue

            debug('[' + screen_name + '] Barrier Test: ')
            debug('[' + screen_name + '] Found:' + tid)
            debug('[' + screen_name + '] In reply to:' + replied_to_id)

            # Select a reference session, here using current instance
            reference_session = next_session()
            reference_session = self
            if reference_session is None:
                debug('No reference session')
                return

            global account_index
            account_index += 1

            # Get up to 1000 replies of the replied tweet and check if reply appears in list
            before_barrier = await reference_session.tweet_raw(replied_to_id, 1000)
            if get_nested(before_barrier, ["globalObjects", "tweets"]) is None:
                debug('notweets\n')
                return

            # If reply is present in the reply list, no ban (hidden) detected
            if tid in self.get_ordered_tweet_ids(before_barrier):
                return {"ban": False, "tweet": tid, "in_reply_to": replied_to_id}

            # If not present, paginate through more replies to check if reply is hidden deeper
            cursors = ["ShowMoreThreads", "ShowMoreThreadsPrompt"]
            last_result = before_barrier

            for stage in range(0, 2):
                entries = [x for x in last_result["timeline"]["instructions"] if "addEntries" in x][0]["addEntries"]["entries"]

                try:
                    cursor = [x["content"]["operation"]["cursor"]["value"] for x in entries if get_nested(x, ["content", "operation", "cursor", "cursorType"]) == cursors[stage]][0]
                except (KeyError, IndexError):
                    continue

                # Load more replies using cursor pagination
                after_barrier = await reference_session.tweet_raw(replied_to_id, 1000, cursor=cursor)

                if get_nested(after_barrier, ["globalObjects", "tweets"]) is None:
                    debug('retinloop\n')
                    return

                # Check if reply appears after pagination; if yes, reply is hidden (ban=True)
                if tid in self.get_ordered_tweet_ids(after_barrier):
                    return {"ban": True, "tweet": tid, "stage": stage, "in_reply_to": replied_to_id}
                last_result = after_barrier

            # Return unknown error if unable to determine ban status
            debug('[' + screen_name + '] outer loop return')
            return { "error": "EUNKNOWN" }
    except:
        debug('Unexpected Exception in test_barrier:\n')
        debug(traceback.format_exc())
        return { "error": "EUNKNOWN" }


In [None]:
async def test(self, username):
    result = {"timestamp": time.time()}  # Record the test timestamp
    profile = {}

    # Get raw user profile data
    profile_raw = await self.profile_raw(username)
    debug('Testing ' + str(username))

    # Check for other API errors like user not found or suspended
    if is_another_error(profile_raw, [50, 63]):
        debug("Other error:" + str(username))
        raise UnexpectedApiError

    # Parse user ID
    try:
        user_id = str(profile_raw["id"])
    except KeyError:
        user_id = None

    # Parse screen_name, fallback to username
    try:
        profile["screen_name"] = profile_raw["screen_name"]
    except KeyError:
        profile["screen_name"] = username

    # Parse restriction status (e.g., interstitial page)
    try:
        profile["restriction"] = profile_raw["profile_interstitial_type"]
    except KeyError:
        pass
    if profile.get("restriction", None) == "":
        del profile["restriction"]

    # Parse protected status (private account)
    try:
        profile["protected"] = profile_raw["protected"]
    except KeyError:
        pass

    # Check if user exists (no error 50)
    profile["exists"] = not is_error(profile_raw, 50)
    # Check if user is suspended (error 63)
    suspended = is_error(profile_raw, 63)
    if suspended:
        profile["suspended"] = suspended

    # Check if user has tweets
    try:
        profile["has_tweets"] = int(profile_raw["statuses_count"]) > 0
    except KeyError:
        profile["has_tweets"] = False

    result["profile"] = profile

    # If user doesn’t exist, suspended, protected or no tweets, return result early
    if not profile["exists"] or profile.get("suspended", False) or profile.get("protected", False) or not profile.get('has_tweets'):
        return result

    result["tests"] = {}

    search_raw = await self.search_raw("from:@" + username)
    result["tests"]["search"] = False

    try:
        tweets = search_raw["globalObjects"]["tweets"]
        # Take the latest tweet ID found as search test result
        for tweet_id, tweet in sorted(tweets.items(), key=lambda t: t[1]["id"], reverse=True):
            result["tests"]["search"] = str(tweet_id)
            break
    except (KeyError, IndexError):
        pass

    # Use typeahead API to test if user appears in autocomplete
    typeahead_raw = await self.typeahead_raw("@" + username)
    result["tests"]["typeahead"] = False
    try:
        # Check if username appears (case-insensitive)
        result["tests"]["typeahead"] = len([1 for user in typeahead_raw["users"] if user["screen_name"].lower() == username.lower()]) > 0
    except KeyError:
        pass

    # If search test failed (no tweets), test for ghost ban
    if "search" in result["tests"] and result["tests"]["search"] == False:
        result["tests"]["ghost"] = await self.test_ghost_ban(user_id)
    else:
        # Otherwise, assume no ghost ban
        result["tests"]["ghost"] = {"ban": False}

    # If no ghost ban detected, test for more replies visibility barrier
    if not get_nested(result, ["tests", "ghost", "ban"], False):
        result["tests"]["more_replies"] = await self.test_barrier(user_id, profile['screen_name'])
    else:
        result["tests"]["more_replies"] = { "error": "EISGHOSTED"}

    debug('[' + profile['screen_name'] + '] Writing result to DB')

    # Write result to database if available
    if db is not None:
        db.write_result(result)

    return result


async def close(self):
    await self._session.close()


In [None]:
def debug(message):
    if message.endswith('\n') is False:
        message = message + '\n'

    # Write to debug file and flush if available
    if debug_file is not None:
        debug_file.write(message)
        debug_file.flush()
    else:
        print(message)


def log(message):
    if message.endswith('\n') is False:
         message = message + '\n'

    # Write to log file and flush if available
    if log_file is not None:
        log_file.write(message)
        log_file.flush()
    else:
        # Otherwise print to console
        print(message)


In [None]:
def print_session_info(sessions):
    text = ""
    for session in sessions:
        text += "\n%6d %5d %9d %5d" % (int(session.locked), session.limit, session.remaining, session.reset - int(time.time()))
    return text

In [None]:
@routes.get('/.stats')
async def stats(request):
    # Handle GET request: returns rate limit info for guest and account sessions

    text = "--- GUEST SESSIONS ---\n\nLocked Limit Remaining Reset"
    text += print_session_info(guest_sessions)
    text += "\n\n\n--- ACCOUNTS ---\n\nLocked Limit Remaining Reset"
    text += print_session_info(account_sessions)
    return web.Response(text=text)


@routes.get('/.unlocked/{screen_name}')
async def unlocked(request):
    # Handle GET request: unlocks the session for the given screen name

    screen_name = request.match_info['screen_name']
    text = "Not unlocked"  
    for session in account_sessions:
        if session.username.lower() != screen_name.lower():
            continue 
        session.locked = False 
        text = "Unlocked" 
    return web.Response(text=text) 


In [None]:
@routes.get('/{screen_name}')
async def api(request):
    # Define API endpoint: handles GET requests to test a Twitter user's visibility (ghost ban, search ban, etc.)

    global test_index
    screen_name = request.match_info['screen_name']

    session = guest_sessions[test_index % len(guest_sessions)]
    test_index += 1

    result = await session.test(screen_name)

    log(json.dumps(result) + '\n')

    if (args.cors_allow is not None):
        return web.json_response(result, headers={"Access-Control-Allow-Origin": args.cors_allow})
    else:
        return web.json_response(result)


In [None]:
async def login_accounts(accounts, cookie_dir=None):
    # Asynchronously log into multiple Twitter accounts

    if accounts is None or len(accounts) == 0:
        # Return early if the accounts list is empty or None
        return

    if cookie_dir is not None and not os.path.isdir(cookie_dir):
        # If a cookie directory is specified and doesn't exist, create it with secure permissions
        os.mkdir(cookie_dir, 0o700)

    coroutines = [] 
    for acc in accounts:
        session = TwitterSession() 
        coroutines.append(session.login(*acc, cookie_dir=cookie_dir)) 
        account_sessions.append(session) 

    await asyncio.gather(*coroutines)


In [None]:
async def login_guests():
    # Asynchronously create and log in a pool of guest sessions

    for i in range(0, guest_session_pool_size):
        # Loop to create the desired number of guest TwitterSession instances
        session = TwitterSession()
        guest_sessions.append(session)
        # Append the session to the global guest_sessions list

    # Log in all guest sessions concurrently
    await asyncio.gather(*[s.login() for s in guest_sessions])

    log("Guest sessions created") 
    # Log that guest sessions have been created


In [None]:
def ensure_dir(path):
    # Create the directory if it does not already exist
    if os.path.isdir(path) is False:
        print('Creating directory %s' % path)
        os.mkdir(path)

parser = argparse.ArgumentParser(description='Twitter Shadowban Tester')
parser.add_argument('--account-file', type=str, default='.htaccounts', help='json file with reference account credentials')
parser.add_argument('--cookie-dir', type=str, default=None, help='directory for session account storage')
parser.add_argument('--log', type=str, default=None, help='log file where test results are written to')
parser.add_argument('--daemon', action='store_true', help='run in background')
parser.add_argument('--debug', type=str, default=None, help='debug log file')
parser.add_argument('--port', type=int, default=8080, help='port which to listen on')
parser.add_argument('--host', type=str, default='127.0.0.1', help='hostname/ip which to listen on')
parser.add_argument('--mongo-host', type=str, default=None, help='hostname or IP of mongoDB service to connect to')
parser.add_argument('--mongo-port', type=int, default=27017, help='port of mongoDB service to connect to')
parser.add_argument('--mongo-db', type=str, default='tester', help='name of mongo database to use')
parser.add_argument('--twitter-auth-key', type=str, default=TWITTER_AUTH_KEY, help='auth key for twitter guest session')
parser.add_argument('--cors-allow', type=str, default=None, help='value for Access-Control-Allow-Origin header')
args = parser.parse_args()

TwitterSession.twitter_auth_key = args.twitter_auth_key

if (args.cors_allow is None):
    debug('[CORS] Running without CORS headers')
else:
    debug('[CORS] Allowing requests from: ' + args.cors_allow)


accounts = []
if args.account_file is None:
    debug('No account file specified.')
elif not os.path.exists(args.account_file):
    debug('Account file does not exist')
else:
    ensure_dir(args.cookie_dir)
    with open(args.account_file, "r") as f:
        accounts = json.loads(f.read())

if args.log is not None:
    print("Logging test results to %s" % args.log)
    log_dir = os.path.dirname(args.log)
    ensure_dir(log_dir)
    log_file = open(args.log, "a")

if args.debug is not None:
    print("Logging debug output to %s" % args.debug)
    debug_dir = os.path.dirname(args.debug)
    ensure_dir(debug_dir)
    debug_file = open(args.debug, "a")

def run():
    global db
    db = None
    if args.mongo_host is not None:
        db = connect(host=args.mongo_host, port=args.mongo_port)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(login_accounts(accounts, args.cookie_dir))
    loop.run_until_complete(login_guests())
    app = web.Application()
    app.add_routes(routes)
    web.run_app(app, host=args.host, port=args.port)

if args.daemon:
    with daemon.DaemonContext():
        run()
else:
    run()