In [1]:
import time
from time import sleep
from typing import Any, Iterator, List, Optional
from loguru import logger
from dateutil import parser as date_parse
from datetime import datetime, timezone, date
from curl_cffi import requests
import json
import logging
import os
from dotenv import load_dotenv

load_dotenv()  # take environment variables from .env.

logging.basicConfig(
    level=(
        logging.DEBUG
        if os.getenv("DEBUG") and os.getenv("DEBUG").lower() != "false"
        else logging.INFO
    )
)

BASE_URL = "https://truthsocial.com"
API_BASE_URL = "https://truthsocial.com/api"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"

# Oauth client credentials, from https://truthsocial.com/packs/js/application-d77ef3e9148ad1d0624c.js
CLIENT_ID = "9X1Fdd-pxNsAgEDNi_SfhJWi8T-vLuV2WVzKIbkTCw4"
CLIENT_SECRET = "ozF8jzI4968oTKFkEnsBC-UbLPCdrSv0MkXGQu2o_-M"

proxies = {"http": os.getenv("http_proxy"), "https": os.getenv("https_proxy")}

TRUTHSOCIAL_USERNAME = os.getenv("TRUTHSOCIAL_USERNAME")
TRUTHSOCIAL_PASSWORD = os.getenv("TRUTHSOCIAL_PASSWORD")
TRUTHSOCIAL_TOKEN = os.getenv("TRUTHSOCIAL_TOKEN")


class LoginErrorException(Exception):
    pass


class Api:
    def __init__(
        self,
        username=TRUTHSOCIAL_USERNAME,
        password=TRUTHSOCIAL_PASSWORD,
        token=TRUTHSOCIAL_TOKEN,
    ):
        self.ratelimit_max = 300
        self.ratelimit_remaining = None
        self.ratelimit_reset = None
        self.__username = username
        self.__password = password
        self.auth_id = token

    def __check_login(self):
        """Runs before any login-walled function to check for login credentials and generates an auth ID token"""
        if self.auth_id is None:
            if self.__username is None:
                raise LoginErrorException("Username is missing.")
            if self.__password is None:
                raise LoginErrorException("Password is missing.")
            self.auth_id = self.get_auth_id(self.__username, self.__password)

    def _make_session(self):
        s = requests.Session()
        return s

    def _check_ratelimit(self, resp):
        if resp.headers.get("x-ratelimit-limit") is not None:
            self.ratelimit_max = int(resp.headers.get("x-ratelimit-limit"))
        if resp.headers.get("x-ratelimit-remaining") is not None:
            self.ratelimit_remaining = int(resp.headers.get("x-ratelimit-remaining"))
        if resp.headers.get("x-ratelimit-reset") is not None:
            self.ratelimit_reset = date_parse.parse(
                resp.headers.get("x-ratelimit-reset")
            )

        if (
            self.ratelimit_remaining is not None and self.ratelimit_remaining <= 50
        ):  # We do 50 to be safe; their tracking is a bit stochastic... it can jump down quickly
            now = datetime.utcnow().replace(tzinfo=timezone.utc)
            time_to_sleep = (
                self.ratelimit_reset.replace(tzinfo=timezone.utc) - now
            ).total_seconds()
            logger.warning(
                f"Approaching rate limit; sleeping for {time_to_sleep} seconds..."
            )
            sleep(time_to_sleep)

    def _get(self, url: str, params: dict = None) -> Any:
        headers = {
            "User-Agent": USER_AGENT,
        }

        if self.auth_id is not None:
            headers["Authorization"] = "Bearer " + self.auth_id

        resp = self._make_session().get(
            API_BASE_URL + url,
            params=params,
            proxies=proxies,
            impersonate="chrome110",
            headers=headers,
        )

        # Will also sleep
        self._check_ratelimit(resp)

        return resp.json()



    def _get_paginated(self, url: str, params: dict = None, resume: str = None) -> Any:
        next_link = API_BASE_URL + url

        if resume is not None:
            next_link += f"?max_id={resume}"

        while next_link is not None:
            resp = self._make_session().get(
                next_link,
                params=params,
                proxies=proxies,
                impersonate="chrome110",
                headers={
                    "Authorization": "Bearer " + self.auth_id,
                    "User-Agent": USER_AGENT,
                },
            )
            link_header = resp.headers.get("Link", "")
            next_link = None
            for link in link_header.split(","):
                parts = link.split(";")
                if len(parts) == 2 and parts[1].strip() == 'rel="next"':
                    next_link = parts[0].strip("<>")
                    break
            logger.info(f"Next: {next_link}, resp: {resp}, headers: {resp.headers}")
            yield resp.json()

            # Will also sleep
            self._check_ratelimit(resp)

    def lookup(self, user_handle: str = None) -> Optional[dict]:
        """Lookup a user's information."""

        self.__check_login()
        assert user_handle is not None
        return self._get("/v1/accounts/lookup", params=dict(acct=user_handle))

    def search(
        self,
        searchtype: str = None,
        query: str = None,
        limit: int = 40,
        resolve: bool = 4,
        offset: int = 0,
        min_id: str = "0",
        max_id: str = None,
    ) -> Optional[dict]:
        """Search users, statuses or hashtags."""

        self.__check_login()
        assert query is not None and searchtype is not None

        page = 0
        while page < limit:
            if max_id is None:
                resp = self._get(
                    "/v2/search",
                    params=dict(
                        q=query,
                        resolve=resolve,
                        limit=limit,
                        type=searchtype,
                        offset=offset,
                        min_id=min_id,
                    ),
                )

            else:
                resp = self._get(
                    "/v2/search",
                    params=dict(
                        q=query,
                        resolve=resolve,
                        limit=limit,
                        type=searchtype,
                        offset=offset,
                        min_id=min_id,
                        max_id=max_id,
                    ),
                )

            offset += 40
            if not resp[searchtype]:
                break

            yield resp

    def trending(self):
        """Return trending truths."""

        self.__check_login()
        return self._get("/v1/truth/trending/truths")

    def tags(self):
        """Return trending tags."""

        self.__check_login()
        return self._get("/v1/trends")

    def suggested(self, maximum: int = 50) -> dict:
        """Return a list of suggested users to follow."""
        self.__check_login()
        return self._get(f"/v2/suggestions?limit={maximum}")

    def ads(self, device: str = "desktop") -> dict:
        """Return a list of ads from Rumble's Ad Platform via Truth Social API."""

        return self._get(f"/v3/truth/ads?device={device}")

    def user_followers(
        self,
        user_handle: str = None,
        user_id: str = None,
        maximum: int = 1000,
        resume: str = None,
    ) -> Iterator[dict]:
        assert user_handle is not None or user_id is not None
        user_id = user_id if user_id is not None else self.lookup(user_handle)["id"]

        n_output = 0
        for followers_batch in self._get_paginated(
            f"/v1/accounts/{user_id}/followers", resume=resume
        ):
            for f in followers_batch:
                yield f
                n_output += 1
                if maximum is not None and n_output >= maximum:
                    return

    def user_following(
        self,
        user_handle: str = None,
        user_id: str = None,
        maximum: int = 1000,
        resume: str = None,
    ) -> Iterator[dict]:
        assert user_handle is not None or user_id is not None
        user_id = user_id if user_id is not None else self.lookup(user_handle)["id"]

        n_output = 0
        for followers_batch in self._get_paginated(
            f"/v1/accounts/{user_id}/following", resume=resume
        ):
            for f in followers_batch:
                yield f
                n_output += 1
                if maximum is not None and n_output >= maximum:
                    return

    def pull_statuses(
        self,
        username: str,
        replies=False,
        verbose=False,
        created_after: datetime = None,
        since_id=None,
        pinned=False,
    ) -> List[dict]:
        """Pull the given user's statuses.

        Params:
            created_after : timezone aware datetime object
            since_id : number or string

        Returns a list of posts in reverse chronological order,
            or an empty list if not found.
        """

        params = {}
        user_id = self.lookup(username)["id"]
        page_counter = 0
        keep_going = True
        while keep_going:
            try:
                url = f"/v1/accounts/{user_id}/statuses"
                if pinned:
                    url += "?pinned=true&with_muted=true"
                elif not replies:
                    url += "?exclude_replies=true"
                if verbose:
                    logger.debug("--------------------------")
                    logger.debug(f"{url} {params}")
                result = self._get(url, params=params)
                page_counter += 1
            except json.JSONDecodeError as e:
                logger.error(f"Unable to pull user #{user_id}'s statuses': {e}")
                break
            except Exception as e:
                logger.error(f"Misc. error while pulling statuses for {user_id}: {e}")
                break

            if "error" in result:
                logger.error(
                    f"API returned an error while pulling user #{user_id}'s statuses: {result}"
                )
                break

            if len(result) == 0:
                break

            if not isinstance(result, list):
                logger.error(f"Result is not a list (it's a {type(result)}): {result}")

            posts = sorted(
                result, key=lambda k: k["id"], reverse=True
            )  # reverse chronological order (recent first, older last)
            params["max_id"] = posts[-1][
                "id"
            ]  # when pulling the next page, get posts before this (the oldest)

            if verbose:
                logger.debug(f"PAGE: {page_counter}")

            if pinned:  # assume single page
                keep_going = False

            for post in posts:
                post["_pulled"] = datetime.now().isoformat()

                # only keep posts created after the specified date
                # exclude posts created before the specified date
                # since the page is listed in reverse chronology, we don't need any remaining posts on this page either
                post_at = date_parse.parse(post["created_at"]).replace(
                    tzinfo=timezone.utc
                )
                if (created_after and post_at <= created_after) or (
                    since_id and post["id"] <= since_id
                ):
                    keep_going = False  # stop the loop, request no more pages
                    break  # do not yeild this post or remaining (older) posts on this page

                if verbose:
                    logger.debug(f"{post['id']} {post['created_at']}")

                yield post

    
    def get_auth_id(self, username: str, password: str) -> str:
        """Logs in to Truth account and returns the session token"""
        url = BASE_URL + "/oauth/token"
        retries = 3  # Number of retry attempts
        delay = 1  # Initial delay in seconds

        for attempt in range(retries):
            try:
                payload = {
                    "client_id": CLIENT_ID,
                    "client_secret": CLIENT_SECRET,
                    "grant_type": "password",
                    "username": username,
                    "password": password,
                    "redirect_uri": "urn:ietf:wg:oauth:2.0:oob",
                    "scope": "read",
                }

                sess_req = requests.request(
                    "POST",
                    url,
                    json=payload,
                    proxies=proxies,
                    impersonate="chrome110",
                    headers={
                        "User-Agent": USER_AGENT,
                    },
                )
                sess_req.raise_for_status()
                return sess_req.json()["access_token"]
            except requests.RequestsError as e:
                print(f"Failed login request: {str(e)}")
                if attempt < retries - 1:
                    print(f"Retrying in {delay} seconds...")
                    time.sleep(delay*2)
                    delay *= 2  # Exponential backoff
                else:
                    raise

In [2]:
truth_social = Api()
resp = truth_social.search(searchtype="users", query="ranjeetsn96")
resp

<generator object Api.search at 0x1058caae0>

In [None]:
import time
from datetime import datetime, timezone

def poll_and_process(api, username_to_stream, since_post_id=None, created_after_datetime=None):
    while True:
        # Get the latest posts
        latest_posts = api.pull_statuses(username_to_stream)
        
        # Process the posts based on your criteria
        for post in latest_posts:
            post_id = int(post['id'])  # Convert post ID to integer
            post_created_at = datetime.fromisoformat(post['created_at'])
            
            # Check if the post meets your criteria
            if (since_post_id is None or post_id > since_post_id) and \
               (created_after_datetime is None or post_created_at > created_after_datetime):
                # Process or print the post as needed
                print(post)
        
        # Sleep for 5 seconds before checking again
        time.sleep(5)


if __name__ == "__main__":
    # Set your Truth Social credentials (username and password) or token
    # Note: Ensure you have set the environmental variables TRUTHSOCIAL_USERNAME and TRUTHSOCIAL_PASSWORD
    # or TRUTHSOCIAL_TOKEN before running the script.
    
    # Example usage
    username_to_stream = "ranjeetsn96"
    since_post_id = 112081324703705770  # Replace with the actual post ID
    created_after_datetime = datetime(2023, 3, 10, tzinfo=timezone.utc)

    # Create an instance of the Api class
    api = Api()

    # Start polling and processing
    poll_and_process(api, username_to_stream, since_post_id, created_after_datetime)


In [3]:
import time
from datetime import datetime
import pytz
from google.cloud import bigquery

# Set the path to your service account key JSON file
key_path = 'marine-embassy-416705-19cd2730e55e.json'

# Set the environment variable
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = key_path

def get_last_row_tweet_id(project_id, dataset_id, table_id):
    bigquery_client = bigquery.Client(project=project_id)

    query = f"SELECT MAX(id) AS max_id FROM `{project_id}.{dataset_id}.{table_id}`"
    query_job = bigquery_client.query(query)

    results = query_job.result()
    last_row = None
    latest_id = None

    for row in results:
        last_row = row
        latest_id = row['max_id']
    return last_row, latest_id

def write_tweets_to_bigquery(tweet, project_id, dataset_id, table_id):
    bigquery_client = bigquery.Client(project=project_id)

    dataset_ref = bigquery_client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    tweet_data = {
        "id": tweet["id"],
        "created_at": tweet["created_at"],  
        "text": tweet["content"],
        "user_id": tweet["account"]["id"],
        "username": tweet['account']['username']
    }

    errors = bigquery_client.insert_rows_json(table_ref, [tweet_data])

    if errors:
        print(f"Error inserting data into BigQuery: {errors}")
    else:
        print(f"Tweet inserted into BigQuery: {tweet_data}")

def poll_twitter_stream(query, bigquery_project_id, bigquery_dataset_id, 
                        bigquery_table_id, searchtype='statuses',
                        limit=1, offset=0):
    while True:
        _, latest_id = get_last_row_tweet_id(bigquery_project_id, bigquery_dataset_id, bigquery_table_id)
        latest_post_id = int(latest_id) if latest_id else None
        api = Api()
        time.sleep(2)
        latest_posts = api.search(searchtype=searchtype, query=query, limit=limit, offset=offset, min_id=latest_post_id)
        # Check if the generator is empty
        try:
            first_post = next(latest_posts)
            statuses_list_overall = [first_post]
        except StopIteration:
            print("No posts found.")
            continue        

        if len(statuses_list_overall) > 0:
            statuses_list = statuses_list_overall[0][searchtype]
            if len(statuses_list) != 0:
                for post in statuses_list:
                    post_id = int(post['id']) 
                    if (latest_post_id is None) or (post_id > latest_post_id):
                        write_tweets_to_bigquery(tweet=post, project_id=bigquery_project_id, dataset_id=bigquery_dataset_id, table_id=bigquery_table_id)
                        time.sleep(5)

if __name__ == "__main__":

    # Set your BigQuery project, dataset, and table information
    bigquery_project_id = "marine-embassy-416705"
    bigquery_dataset_id = "truth_social_dataset"
    bigquery_table_id = "truth_social"
    
    
    query = "ranjeetsn96"

    poll_twitter_stream(bigquery_project_id = bigquery_project_id, 
                        bigquery_dataset_id = bigquery_dataset_id,
                        bigquery_table_id = bigquery_table_id,
                        query=query)

INFO:numexpr.utils:Note: NumExpr detected 10 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:NumExpr defaulting to 8 threads.


Failed login request: HTTP Error 429: 
Retrying in 1 seconds...
Failed login request: HTTP Error 429: 
Retrying in 2 seconds...
Failed login request: HTTP Error 429: 


RequestsError: HTTP Error 429: 

In [48]:
from google.cloud import bigquery
import os

# Set the path to your service account key JSON file
key_path = 'marine-embassy-416705-19cd2730e55e.json'

# Set the environment variable
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = key_path

def insert_tweet_into_bigquery(tweet, project_id, dataset_id, table_id):
    bigquery_client = bigquery.Client()

    dataset_ref = bigquery_client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    # Check if the table exists; if not, create it with the specified schema
    table = bigquery_client.get_table(table_ref)
    if not table.schema:
        schema = [
            bigquery.SchemaField("id", "STRING"),
            bigquery.SchemaField("created_at", "TIMESTAMP"),
            bigquery.SchemaField("text", "STRING"),
            bigquery.SchemaField("user_id", "STRING"),
            # Add more fields as needed
        ]
        table.schema = schema
        table = bigquery_client.update_table(table, ["schema"])

    # Convert tweet data to match the schema
    tweet_data = {
        "id": tweet["id"],
        "created_at": tweet["created_at"],
        "text": tweet["text"],
        "user_id": tweet["user"]["id"],
        "username": tweet["account"]["username"]
        # Add more fields as needed
    }

    # Insert data into BigQuery table
    errors = bigquery_client.insert_rows_json(table_ref, [tweet_data])

    if errors:
        print(f"Error inserting data into BigQuery: {errors}")
    else:
        print(f"Tweet inserted into BigQuery: {tweet_data}")

if __name__ == "__main__":
    # Set your BigQuery project, dataset, and table information
    bigquery_project_id = "marine-embassy-416705"
    bigquery_dataset_id = "truth_social_dataset"
    bigquery_table_id = "truth_social"

    # Example usage
    # tweet_id = "1234567890"  # Replace with the actual tweet ID
    # tweet_created_at = "2024-03-12T12:34:56.789Z"  # Replace with the actual tweet creation timestamp
    # tweet_text = "This is a sample tweet text."  # Replace with the actual tweet text
    # user_id = "987654321"  # Replace with the actual user ID

    # # Create a sample tweet
    # sample_tweet = {
    #     "id": tweet_id,
    #     "created_at": tweet_created_at,
    #     "text": tweet_text,
    #     "user": {"id": user_id},
    #     "username": tweet["account"]["username"]
    #     # Add more fields as needed
    # }

    # Insert the sample tweet into BigQuery
    insert_tweet_into_bigquery(sample_tweet, bigquery_project_id, bigquery_dataset_id, bigquery_table_id)


Tweet inserted into BigQuery: {'id': '1234567890', 'created_at': '2024-03-12T12:34:56.789Z', 'text': 'This is a sample tweet text.', 'user_id': '987654321'}


In [319]:
list(truth_social.pull_statuses("ranjeetsn96"))

[{'id': '112092114452628901',
  'created_at': '2024-03-14T04:18:49.994Z',
  'in_reply_to_id': None,
  'quote_id': None,
  'in_reply_to_account_id': None,
  'sensitive': False,
  'spoiler_text': '',
  'visibility': 'public',
  'language': 'en',
  'uri': 'https://truthsocial.com/@ranjeetsn96/112092114452628901',
  'url': 'https://truthsocial.com/@ranjeetsn96/112092114452628901',
  'content': '<p><span class="h-card"><a href="https://truthsocial.com/@ranjeetsn96" class="u-url mention">@<span>ranjeetsn96</span></a></span> Hello 13</p>',
  'account': {'id': '111841474664274681',
   'username': 'ranjeetsn96',
   'acct': 'ranjeetsn96',
   'display_name': '',
   'locked': False,
   'bot': False,
   'discoverable': True,
   'group': False,
   'created_at': '2024-01-29T21:57:50.344Z',
   'note': '<p></p>',
   'url': 'https://truthsocial.com/@ranjeetsn96',
   'avatar': 'https://truthsocial.com/avatars/original/missing.png',
   'avatar_static': 'https://truthsocial.com/avatars/original/missing.png

In [227]:
hashtag_search_results = truth_social.search(searchtype="statuses", query="ranjeetsn96", limit=11, offset=0, min_id=None)

In [266]:
hashtag_search_results = truth_social.search(searchtype="statuses", query="ranjeetsn96", limit=11, offset=0, min_id=112088003857783358)

In [263]:
truth_social.search()

<generator object Api.search at 0x13cb0e980>

In [267]:
result_list = list(hashtag_search_results).copy()

In [268]:
len(result_list[0]['statuses'])

4

In [261]:
for diction in result_list[0]['statuses']:
    print(diction['id'])

112087999404467494
112088003095505733
112088006600759719
112088002001424961
112088001259747846
112088000535603215
112088345297935912
112088003857783358
112088004766493508
112090910633977869


In [20]:
for result in hashtag_search_results:
    # Extract and handle the relevant information from the search result
    print(result)

In [18]:
# Assuming the response structure is a dictionary
for result in hashtag_search_results:
    # Extract and handle the relevant information from the search result
    if 'hashtag' in result:
        hashtags = result['hashtag']
        for hashtag_info in hashtags:
            # Process or print the hashtag information as needed
            print(hashtag_info)


In [4]:
html_message = list(truth_social.pull_statuses("ranjeetsn96"))[0]["content"]

In [15]:
list(truth_social.pull_statuses("ranjeetsn96"))

[{'id': '112073682293955478',
  'created_at': '2024-03-10T22:11:17.575Z',
  'in_reply_to_id': None,
  'quote_id': None,
  'in_reply_to_account_id': None,
  'sensitive': False,
  'spoiler_text': '',
  'visibility': 'public',
  'language': 'en',
  'uri': 'https://truthsocial.com/@ranjeetsn96/112073682293955478',
  'url': 'https://truthsocial.com/@ranjeetsn96/112073682293955478',
  'content': '<p>hello again</p>',
  'account': {'id': '111841474664274681',
   'username': 'ranjeetsn96',
   'acct': 'ranjeetsn96',
   'display_name': '',
   'locked': False,
   'bot': False,
   'discoverable': True,
   'group': False,
   'created_at': '2024-01-29T21:57:50.344Z',
   'note': '<p></p>',
   'url': 'https://truthsocial.com/@ranjeetsn96',
   'avatar': 'https://truthsocial.com/avatars/original/missing.png',
   'avatar_static': 'https://truthsocial.com/avatars/original/missing.png',
   'header': '',
   'header_static': '',
   'followers_count': 0,
   'following_count': 0,
   'statuses_count': 2,
   'la

In [5]:
from bs4 import BeautifulSoup

In [6]:
soup = BeautifulSoup(html_message, 'html.parser')

paragraph_tag = soup.find('p')

# Find the anchor tag with the class 'u-url' and 'mention'
mention_tag = soup.find('a', class_='u-url mention')

# Extract the username from the href attribute
if paragraph_tag:
    message = paragraph_tag.text.strip()
    print(message)
else:
    print("Paragraph tag not found.")

@ranjeetsn96 this is a test message


In [38]:
diction = {"test_message":"This is a test message"}

In [None]:
import os
from mastodon import Mastodon

print("Mastodon Bot starting...")

# Set up Mastodon instance
mastodon = Mastodon(
    client_id=os.environ['CLIENT_KEY'],
    client_secret=os.environ['CLIENT_SECRET'],
    access_token=os.environ['ACCESS_TOKEN'],
    api_base_url='https://botsin.space/'
)

# Specify the hashtag you want to follow
hashtag_to_follow = '#ranjeet_company'

# Start streaming for the specified hashtag
stream = mastodon.stream('/api/v1/streaming/tags', tag=hashtag_to_follow)

# Define callback function for stream listener
def on_update(status):
    status_id = status['id']
    acct = status['account']['acct']
    content = status['content']

    # Your logic for handling updates with the specified hashtag
    print(f"Received update with hashtag: {hashtag_to_follow}")
    print(f"Content: {content}")

# Set up callback for updates
stream.on('update', on_update)

# Keep the script running
stream.run_forever()


In [22]:
!pip3 install Mastodon.py

Collecting Mastodon.py
  Downloading Mastodon.py-1.8.1-py2.py3-none-any.whl.metadata (3.9 kB)
Collecting blurhash>=1.1.4 (from Mastodon.py)
  Downloading blurhash-1.1.4-py2.py3-none-any.whl.metadata (769 bytes)
Downloading Mastodon.py-1.8.1-py2.py3-none-any.whl (65 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.9/65.9 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading blurhash-1.1.4-py2.py3-none-any.whl (5.3 kB)
Installing collected packages: blurhash, Mastodon.py
Successfully installed Mastodon.py-1.8.1 blurhash-1.1.4

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
load_dotenv()

True

In [4]:
os.environ['CLIENT_KEY']

'ENCpqOPfbm6w3UeV6JclTvTFXzUKKORXLY54u-uO6dE'