In [13]:
from atproto import Client
from datetime import datetime
import os
import pandas as pd
import time

# write a chunk of post-data to CSV
def write_chunk(df: str, target_user: str) -> None:
    # filename format is <username>_<earliest_contained_post_date>_<latest_contained_post_date>
    start = df['post_created_at'].min().to_pydatetime().strftime('%Y-%m-%d_%H-%M-%S')
    end = df['post_created_at'].max().to_pydatetime().strftime('%Y-%m-%d_%H-%M-%S')
    filename = f"usr_{target_user}/{target_user}.bsky.social_from_{start}_to_{end}.csv"
    df.to_csv(filename)

# ingest all posts for a specific BlueSky user as a collection of CSVs
def ingest_posts_by_user(target_user: str, output_filename: str) -> None:
    # Retrieve login info from local env
    USR = os.getenv('BSY_USR').lower()
    KEY = os.getenv('BSY_KEY')
    # Instantiate a BlueSky session
    cli = Client()
    cli.login(USR, KEY)

    schema = {'content_id':                []
             ,'post_uri':                  []
             ,'like_count':                []
             ,'quote_count':               []
             ,'reply_count':               []
             ,'repost_count':              []
             ,'post_created_at':           []
             ,'text':                      []
             ,'tags':                      []
             ,'embedded_link_title':       []
             ,'embedded_link_description': []
             ,'embedded_link_uri':         []
             ,'author_username':           []
             ,'author_displayname':        []
             ,'author_account_created_at': []
             }
    data = schema 
    
    # Set some control vars for ingestion
    pages_remain = True
    page_num = 0
    csr = None
    filenum = 0
    
    # Iterate through every post in their account's post history
    while pages_remain:
        
        # check if the current file is already "full" (larger than 100 MB)
        # if it is, stash the current data object as CSV and reset a new empty one
        df = pd.DataFrame(data)
        if df.memory_usage(deep=True).sum() / (1024*1024) >= 100:
            filenum+=1
            write_chunk(df, target_user)
            data = schema
            del df # No need to lock up memory while the next instance of `data` is filling up...
        
        # Retrieve a paginated post-feed for a specific bluesky user
        page_num +=1  
        resp = cli.get_author_feed(target_user, cursor=csr)
        feed = resp.feed
        print(f"Retrieving post data from page {page_num} for user @{target_user}.bsky.social...", end='\r')
        for item in feed:
            # i drink your data! i DRINK IT UP ლಠ益ಠ)ლ
            data['content_id'].append(item.post.cid)
            data['post_uri'].append(item.post.uri)
            data['like_count'].append(item.post.like_count)
            data['quote_count'].append(item.post.quote_count)
            data['reply_count'].append(item.post.reply_count)
            data['repost_count'].append(item.post.repost_count)
            data['post_created_at'].append(item.post.record.created_at)
            data['text'].append(item.post.record.text)
            data['tags'].append(item.post.record.tags)
            # post may or may not have external links
            try:
                data['embedded_link_title'].append(item.post.record.embed.external.title)
            except AttributeError:
                data['embedded_link_title'].append('null')
            try:
                data['embedded_link_description'].append(item.post.record.embed.external.description)
            except AttributeError:
                data['embedded_link_description'].append('null')
            try:
                data['embedded_link_uri'].append(item.post.record.embed.external.uri)
            except AttributeError:
                data['embedded_link_uri'].append('null')
            data['author_username'].append(item.post.author.handle)
            data['author_displayname'].append(item.post.author.display_name)
            data['author_account_created_at'].append(item.post.author.created_at)
        
        if not resp.cursor:
            pages_remain = False # flag to stop ingestion when the final page of posts is reached 
        csr = resp.cursor        # reset cursor when another page of posts is available
        time.sleep(2)            # limit query rate to avoid throttling, etc
    write_chunk(df, target_user)
    print(f"Post Ingestion for user @{target_user}.bsky.social Complete!")

In [153]:
ingest_posts_by_user('politico.com', 'feed_output.csv')

>Retrieving post data from page 6 for user @politico.com.bsky.social...
>Writing to disk as feed_output.csv...
>Post Ingestion for user @politico.com.bsky.social Complete!


In [None]:
import pandas as pd
from datetime import datetime

data = {'datecol': [datetime(2025, 3, 10, 12, 30, 30), datetime(2025, 3, 11, 12, 30, 30), datetime(2025, 3, 12, 12, 30, 30), datetime(2025, 3, 10, 13, 30, 30)]}
df = pd.DataFrame(data)
df['datecol'].max().to_pydatetime().strftime('%Y-%m-%d %H:%M:%S')

2025-03-12 12:30:30


In [12]:
s = ''
len(s)

0