In [None]:
import json
import os

import requests

import setup

setup.init()

In [None]:
from dotenv import load_dotenv

load_dotenv()

In [None]:
from reddit.models import RedditPost
from snapshots.models import BrightDataSnapshot

In [None]:
BRIGHT_DATA_API_KEY = os.environ.get("BRIGHT_DATA_API_KEY")

assert BRIGHT_DATA_API_KEY is not None

In [None]:
def get_crawl_headers():
    return {
        "Authorization": f"Bearer {BRIGHT_DATA_API_KEY}",
        "Content-Type": "application/json",
    }

In [None]:
def perform_scrape_snapshot(subreddit_url: str, num_of_posts: int = 20) -> str:
    url = "https://api.brightdata.com/datasets/v3/trigger?dataset_id=gd_lvz8ah06191smkebj4&notify=false&include_errors=true&type=discover_new&discover_by=subreddit_url&limit_per_input=100"
    headers = get_crawl_headers()
    dataset_id = "gd_lvz8ah06191smkebj4"
    data = json.dumps(
        {
            "input": [
                {
                    "url": f"{subreddit_url}",
                    "sort_by": "Top",
                    "sort_by_time": "Today",
                    "num_of_posts": num_of_posts,
                },
            ],
        }
    )

    response = requests.post(url=url, headers=headers, data=data)

    response.raise_for_status()

    scrape_data = response.json()
    snapshot_id = scrape_data.get("snapshot_id")

    BrightDataSnapshot.objects.create(
        snapshot_id=snapshot_id,
        dataset_id=dataset_id,
        status="Unknown",
    )

    return scrape_data.get("snapshot_id")

In [None]:
scrape_snapshot_result = perform_scrape_snapshot("https://www.reddit.com/r/django")

print(scrape_snapshot_result)

In [None]:
def get_snapshot_progress(snapshot_id: str) -> bool | None:
    url = f"https://api.brightdata.com/datasets/v3/progress/{snapshot_id}"
    headers = get_crawl_headers()

    try:
        response = requests.get(url=url, headers=headers)

        response.raise_for_status()

        result = response.json()
        snapshot_id = result.get("snapshot_id")
        dataset_id = result.get("dataset_id")

        BrightDataSnapshot.objects.update_or_create(
            snapshot_id=snapshot_id,
            dataset_id=dataset_id,
            defaults={
                "status": result.get("status"),
            },
        )

        return result.get("status") == "ready"

    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")

In [None]:
get_snapshot_progress(scrape_snapshot_result)

In [None]:
def download_snapshot(snapshot_id: str) -> dict[str, str] | None:
    url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}"
    headers = get_crawl_headers()
    params = {
        "format": "json",
    }

    try:
        response = requests.get(url=url, headers=headers, params=params)

        response.raise_for_status()

        return response.json()

    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")

In [None]:
reddit_results = download_snapshot(scrape_snapshot_result)

print(reddit_results)

In [None]:
model_field_names = [field.name for field in RedditPost._meta.get_fields()]

print(model_field_names)

In [None]:
skip_fields = ["id", "post_id", "url"]
valid_fields = [x for x in model_field_names if x not in skip_fields]

print(valid_fields)

In [None]:
for thread in reddit_results:
    post_id = thread.get("post_id")
    url = thread.get("url")
    update_data = {k: v for k, v in thread.items() if k in valid_fields}

    print(post_id, url, update_data)

    RedditPost.objects.update_or_create(
        post_id=post_id,
        url=url,
        defaults=update_data,
    )