In [2]:
import asyncio
from twscrape import API, gather
from twscrape.logger import set_log_level

async def main():
    api = API()  # or API("path-to.db") â€“ default is `accounts.db`

    # ADD ACCOUNTS (for CLI usage see next readme section)

    # Option 1. Adding account with cookies (more stable)
    cookies = "abc=12; ct0=xyz"  # or '{"abc": "12", "ct0": "xyz"}'
    await api.pool.add_account("user3", "pass3", "u3@mail.com", "mail_pass3", cookies=cookies)

    # Option2. Adding account with login / password (less stable)
    # email login / password required to receive the verification code via IMAP protocol
    # (not all email providers are supported, e.g. ProtonMail)
    await api.pool.add_account("user1", "pass1", "u1@example.com", "mail_pass1")
    await api.pool.add_account("user2", "pass2", "u2@example.com", "mail_pass2")
    await api.pool.login_all() # try to login to receive account cookies

    # API USAGE

    # search (latest tab)
    await gather(api.search("elon musk", limit=20))  # list[Tweet]
    # change search tab (product), can be: Top, Latest (default), Media
    await gather(api.search("elon musk", limit=20, kv={"product": "Top"}))

    # tweet info
    tweet_id = 20
    await api.tweet_details(tweet_id)  # Tweet
    await gather(api.retweeters(tweet_id, limit=20))  # list[User]

    # Note: this method have small pagination from X side, like 5 tweets per query
    await gather(api.tweet_replies(tweet_id, limit=20))  # list[Tweet]

    # get user by login
    user_login = "xdevelopers"
    await api.user_by_login(user_login)  # User

    # user info
    user_id = 2244994945
    await api.user_by_id(user_id)  # User
    await gather(api.following(user_id, limit=20))  # list[User]
    await gather(api.followers(user_id, limit=20))  # list[User]
    await gather(api.verified_followers(user_id, limit=20))  # list[User]
    await gather(api.subscriptions(user_id, limit=20))  # list[User]
    await gather(api.user_tweets(user_id, limit=20))  # list[Tweet]
    await gather(api.user_tweets_and_replies(user_id, limit=20))  # list[Tweet]
    await gather(api.user_media(user_id, limit=20))  # list[Tweet]

    # list info
    await gather(api.list_timeline(list_id=123456789))

    # trends
    await gather(api.trends("news"))  # list[Trend]
    await gather(api.trends("sport"))  # list[Trend]
    await gather(api.trends("VGltZWxpbmU6DAC2CwABAAAACHRyZW5kaW5nAAA"))  # list[Trend]

    # NOTE 1: gather is a helper function to receive all data as list, FOR can be used as well:
    async for tweet in api.search("elon musk"):
        print(tweet.id, tweet.user.username, tweet.rawContent)  # tweet is `Tweet` object

    # NOTE 2: all methods have `raw` version (returns `httpx.Response` object):
    async for rep in api.search_raw("elon musk"):
        print(rep.status_code, rep.json())  # rep is `httpx.Response` object

    # change log level, default info
    set_log_level("DEBUG")

    # Tweet & User model can be converted to regular dict or json, e.g.:
    doc = await api.user_by_id(user_id)  # User
    doc.dict()  # -> python dict
    doc.json()  # -> json string

if __name__ == "__main__":
    asyncio.run(main())

TypeError: unsupported operand type(s) for |: '_SpecialForm' and 'TypeVar'

In [None]:

s = r"H:\Dev\University\DataScience\Project\data_preparation\cookies\cookies.json"
print(base64.b64encode(s.encode()).decode())


SDpcRGV2XFVuaXZlcnNpdHlcRGF0YVNjaWVuY2VcUHJvamVjdFxkYXRhX3ByZXBhcmF0aW9uXGNvb2tpZXNcY29va2llcy5qc29u


In [6]:
import json
import base64

ck = json.load(open(r'h:\Dev\University\DataScience\Project\data_preparation\cookies\cookies2.json'))
print(base64.b64encode(json.dumps(ck).encode()).decode())

W3siZG9tYWluIjogIi54LmNvbSIsICJleHBpcmF0aW9uRGF0ZSI6IDE3OTYwMTcxNTUuMTkzNDAyLCAiaG9zdE9ubHkiOiBmYWxzZSwgImh0dHBPbmx5IjogdHJ1ZSwgIm5hbWUiOiAiYXV0aF90b2tlbiIsICJwYXRoIjogIi8iLCAic2FtZVNpdGUiOiAibm9fcmVzdHJpY3Rpb24iLCAic2VjdXJlIjogdHJ1ZSwgInNlc3Npb24iOiBmYWxzZSwgInN0b3JlSWQiOiBudWxsLCAidmFsdWUiOiAiMTAyMGNmYWViZTc5ZGRiZTllNDVkMDgwY2E5ZTgyYTM0NjY4MjAxYyJ9LCB7ImRvbWFpbiI6ICIueC5jb20iLCAiZXhwaXJhdGlvbkRhdGUiOiAxNzYxNDY2MTMwLjEwMjE1MiwgImhvc3RPbmx5IjogZmFsc2UsICJodHRwT25seSI6IGZhbHNlLCAibmFtZSI6ICJndCIsICJwYXRoIjogIi8iLCAic2FtZVNpdGUiOiBudWxsLCAic2VjdXJlIjogdHJ1ZSwgInNlc3Npb24iOiBmYWxzZSwgInN0b3JlSWQiOiBudWxsLCAidmFsdWUiOiAiMTk4MjMyMDk5NjM1NjA4NDIxMiJ9LCB7ImRvbWFpbiI6ICIueC5jb20iLCAiZXhwaXJhdGlvbkRhdGUiOiAxNzk2MDE3MTMwLjA0Mjk0NywgImhvc3RPbmx5IjogZmFsc2UsICJodHRwT25seSI6IGZhbHNlLCAibmFtZSI6ICJndWVzdF9pZCIsICJwYXRoIjogIi8iLCAic2FtZVNpdGUiOiAibm9fcmVzdHJpY3Rpb24iLCAic2VjdXJlIjogdHJ1ZSwgInNlc3Npb24iOiBmYWxzZSwgInN0b3JlSWQiOiBudWxsLCAidmFsdWUiOiAidjElM0ExNzYxNDU3MTI5OTA1OTY3MDgifSwgeyJkb21haW4iOiAi

In [2]:
import asyncio
from twscrape import API

async def main():
    api = API()  # uses the database (twscrape.db)
    
    # Fetch up to 10 tweets from Elon Musk
    async for tweet in api.search("from:elonmusk", limit=10):
        print(tweet.date, "-", tweet.rawContent)

asyncio.run(main())


RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
import asyncio
from twscrape import API, gather
from twscrape.logger import set_log_level

async def main():
    api = API()  

    # API USAGE

    # search (latest tab)
    await gather(api.search("elon musk", limit=20))  # list[Tweet]
    # change search tab (product), can be: Top, Latest (default), Media
    await gather(api.search("elon musk", limit=20, kv={"product": "Top"}))

    # tweet info
    tweet_id = 20
    await api.tweet_details(tweet_id)  # Tweet
    await gather(api.retweeters(tweet_id, limit=20))  # list[User]

    # Note: this method have small pagination from X side, like 5 tweets per query
    await gather(api.tweet_replies(tweet_id, limit=20))  # list[Tweet]

    # # get user by login
    # user_login = "xdevelopers"
    # await api.user_by_login(user_login)  # User

    # user info
    user_id = 2244994945
    await api.user_by_id(user_id)  # User
    await gather(api.following(user_id, limit=20))  # list[User]
    await gather(api.followers(user_id, limit=20))  # list[User]
    await gather(api.verified_followers(user_id, limit=20))  # list[User]
    await gather(api.subscriptions(user_id, limit=20))  # list[User]
    await gather(api.user_tweets(user_id, limit=20))  # list[Tweet]
    await gather(api.user_tweets_and_replies(user_id, limit=20))  # list[Tweet]
    await gather(api.user_media(user_id, limit=20))  # list[Tweet]

    # # list info
    # await gather(api.list_timeline(list_id=123456789))

    # # trends
    # await gather(api.trends("news"))  # list[Trend]
    # await gather(api.trends("sport"))  # list[Trend]
    # await gather(api.trends("VGltZWxpbmU6DAC2CwABAAAACHRyZW5kaW5nAAA"))  # list[Trend]

    # NOTE 1: gather is a helper function to receive all data as list, FOR can be used as well:
    async for tweet in api.search("elon musk"):
        print(tweet.id, tweet.user.username, tweet.rawContent)  # tweet is `Tweet` object

    # NOTE 2: all methods have `raw` version (returns `httpx.Response` object):
    async for rep in api.search_raw("elon musk"):
        print(rep.status_code, rep.json())  # rep is `httpx.Response` object

    # change log level, default info
    set_log_level("DEBUG")

    # Tweet & User model can be converted to regular dict or json, e.g.:
    doc = await api.user_by_id(user_id)  # User
    doc.dict()  # -> python dict
    doc.json()  # -> json string

if __name__ == "__main__":
    asyncio.run(main())

In [5]:
if __name__ == "__main__":
    asyncio.run(main())

  m = tuple(map(os.fspath, m))


RuntimeError: asyncio.run() cannot be called from a running event loop