# Dependencies

In [67]:
!pip install macrocosmos dotenv pandas fastparquet

I0000 00:00:1758873824.225941 1023362 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers


Collecting fastparquet
  Using cached fastparquet-2024.11.0-cp313-cp313-macosx_11_0_arm64.whl.metadata (4.2 kB)
Collecting cramjam>=2.3 (from fastparquet)
  Using cached cramjam-2.11.0-cp313-cp313-macosx_11_0_arm64.whl.metadata (5.6 kB)
Collecting fsspec (from fastparquet)
  Using cached fsspec-2025.9.0-py3-none-any.whl.metadata (10 kB)
Using cached fastparquet-2024.11.0-cp313-cp313-macosx_11_0_arm64.whl (683 kB)
Using cached cramjam-2.11.0-cp313-cp313-macosx_11_0_arm64.whl (1.7 MB)
Using cached fsspec-2025.9.0-py3-none-any.whl (199 kB)
Installing collected packages: fsspec, cramjam, fastparquet
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3/3[0m [fastparquet]
[1A[2KSuccessfully installed cramjam-2.11.0 fastparquet-2024.11.0 fsspec-2025.9.0


## Setup

In [22]:
import pandas as pd
import json

from datetime import datetime, timezone, timedelta
from dotenv import load_dotenv
import os
import macrocosmos

load_dotenv()

client = macrocosmos.GravityClient(api_key=os.environ.get("MACROCOSMOS_API_KEY"))

In [14]:
end_dt = datetime.now(timezone.utc)
start_dt = end_dt - timedelta(days=7)

start_dt.isoformat(), end_dt.isoformat() 

('2025-09-18T08:15:11.206842+00:00', '2025-09-25T08:15:11.206842+00:00')

### Create a job

- Gravity Tasks follows a similar pattern to OnDemand, so you can add channel names, subreddits, etc
- It's suggested to start with the OnDemand API to figure out what you wish to scrape first


```py
class GravityTask(BaseModel):
    """
     GravityTask defines a crawler's criteria for a single job (platform/topic)
    """

# topic: the topic of the job (e.g. '#ai' for X, 'r/ai' for Reddit)
    topic: typing.Optional[str] = Field(default="")
# platform: the platform of the job ('x' or 'reddit')
    platform: str = Field(default="")
# keyword: the keyword to search for in the job (optional)
    keyword: typing.Optional[str] = Field(default="")
# post_start_datetime: the start date of the job (optional)
    post_start_datetime: typing.Optional[datetime] = Field(default_factory=datetime.now)
# post_end_datetime: the end date of the job (optional)
    post_end_datetime: typing.Optional[datetime] = Field(default_factory=datetime.now)
```

In [36]:
task_name = f"Macrocosmos Sample - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%SZ')}"

gravity_tasks = [
                 {"topic": "#ai", "platform": "x", "post_start_datetime": start_dt, "post_end_datetime": end_dt}, 
                 {"topic": "#ai", "platform": "reddit", "post_start_datetime": start_dt, "post_end_datetime": end_dt},
                 {"topic": "MrBeast", "platform": "youtube", "post_start_datetime": start_dt, "post_end_datetime": end_dt}
                 ]

notification_requests=[{
        'type': 'email',
        'address': 'theo.ntakouris@gmail.com'
    }]

In [None]:
# Optional: name the overall Gravity task with a timestamp so it's easy to find later

response = client.gravity.CreateGravityTask(
    gravity_tasks=gravity_tasks,
    name=task_name,
    notification_requests=notification_requests
)

response

gravity_task_id: "multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"

In [32]:
task_id = response.gravity_task_id
task_id

'multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe'

In [33]:
with open('job.json', 'w') as f:
    f.write(json.dumps({
        'gravity_task_id': task_id
    }))

### Monitor it's status (will take a few hours to complete)

In [34]:
with open('job.json', 'r') as f:
    j = json.loads(f.read())
    task_id = j['gravity_task_id']

task_id

'multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe'

In [51]:
resp = client.gravity.GetGravityTasks(
    gravity_task_id=task_id,
    include_crawlers=True  # bring crawler details/counts if available
)

resp

gravity_task_states {
  gravity_task_id: "multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"
  name: "Macrocosmos Sample - 2025-09-25 09:00:20Z"
  status: "Running"
  start_time {
    seconds: 1758790820
    nanos: 771210000
  }
  crawler_ids: "crawler-0-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"
  crawler_ids: "crawler-1-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"
  crawler_ids: "crawler-2-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"
  crawler_workflows {
    crawler_id: "crawler-0-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"
    criteria {
      platform: "x"
      topic: "#ai"
      notification {
        to: "theo.ntakouris@macrocosmos.ai"
        link: "https://app.macrocosmos.ai/gravity/datasets/multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"
      }
      user_id: "c6d07878-ea54-4592-9677-7988de1bddc3"
      keyword: ""
      post_start_datetime {
        seconds: 1758183311
      }
      post_end_datetime {
        seconds: 1758788111
      }
    }

In [52]:
task_state = resp.gravity_task_states[0]
print(f'Status: {task_state.status}')

for wf in task_state.crawler_workflows:
    print(f'Workflow: {wf.crawler_id} ({wf.criteria.platform})')
    print(f'Collected Rows: {wf.state.records_collected}')

Status: Running
Workflow: crawler-0-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe (x)
Collected Rows: 766
Workflow: crawler-1-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe (reddit)
Collected Rows: 0
Workflow: crawler-2-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe (youtube)
Collected Rows: 2


In [56]:
workflow_ids_per_platform = {wf.criteria.platform: wf.crawler_id for wf in task_state.crawler_workflows}
workflow_ids_per_platform

{'x': 'crawler-0-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe',
 'reddit': 'crawler-1-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe',
 'youtube': 'crawler-2-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe'}

### Build your dataset(s)

Once you're happy with the amount of data or jobs can't pick up more data, you can build your datasets

In [None]:
# you can get the dataset for all the platforms here
resp = client.gravity.BuildDataset(
    crawler_id=workflow_ids_per_platform['x'],
    max_rows=1000,
    notification_requests=notification_requests
)
resp

dataset_id: "dataset-47b3581f-1e44-4dda-9d77-c0780612eac8"
dataset {
  crawler_workflow_id: "crawler-0-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"
  create_date {
    seconds: 1758873456
    nanos: 766847000
  }
  status: "Running"
  status_message: "Initializing"
  steps {
    step_name: "Initializing"
  }
  total_steps: 10
}

In [58]:
dataset_id = resp.dataset_id
dataset_id

'dataset-47b3581f-1e44-4dda-9d77-c0780612eac8'

In [59]:
with open('job.json', 'w') as f:
    f.write(json.dumps({
        'gravity_task_id': task_id,
        'workflow_ids_per_platform': workflow_ids_per_platform,
        'dataset_id': dataset_id
    }))

### Download your Data!
Or use existing parquet files you download from the ui

In [60]:
dataset_resp = client.gravity.GetDataset(dataset_id=dataset_id)
dataset_resp

dataset {
  crawler_workflow_id: "crawler-0-multicrawler-532b5b59-ac58-4d9e-9806-3f0aa93beffe"
  create_date {
    seconds: 1758873456
    nanos: 766847000
  }
  expire_date {
  }
  files {
    file_name: "data_2025-09-26_x_377dq5zw.parquet"
    file_size_bytes: 105096
    last_modified {
      seconds: 1758873477
    }
    num_rows: 306
    s3_key: "production_datasets/rift/datetime=2025-09-26/platform=x/label=ai/keyword=no_keyword_requested/chunk=jiql6q1n/data_2025-09-26_x_377dq5zw.parquet"
    url: "https://data-universe-storage.nyc3.digitaloceanspaces.com/production_datasets/rift/datetime=2025-09-26/platform=x/label=ai/keyword=no_keyword_requested/chunk=jiql6q1n/data_2025-09-26_x_377dq5zw.parquet"
  }
  status: "Completed"
  status_message: "Dataset ready for download"
  steps {
    progress: 1
    step: 8
    step_name: "Billing correction"
  }
  total_steps: 10
  nebula {
    file_size_bytes: 537083
    url: "https://data-universe-storage.nyc3.digitaloceanspaces.com/nebulas/produ

In [63]:
dataset_resp.dataset.status, len(dataset_resp.dataset.files), dataset_resp.dataset.files[0].num_rows

('Completed', 1, 306)

In [69]:
df = pd.read_parquet(dataset_resp.dataset.files[0].url, engine='fastparquet')
df.head()

Unnamed: 0,url,label,text,timestamp,uri,username,user_id,user_display_name,tweet_id,conversation_id,is_reply,is_quote,user_verified,datetime
0,https://x.com/BharatSing_h/status/197112444823...,#ai,Impressive progress. Dominating a tough coding...,2025-09-25T08:07:44+00:00,https://x.com/BharatSing_h/status/197112444823...,BharatSing_h,,,,1971120810485223930,True,False,False,2025-09-25T08:07:44+00:00
1,https://x.com/EoEo1224_/status/197111769173086...,#ai,Yupp’s design philosophy really shines here — ...,2025-09-25T07:40:53+00:00,https://x.com/EoEo1224_/status/197111769173086...,EoEo1224_,,,,1971117691730866203,False,True,False,2025-09-25T07:40:53+00:00
2,https://x.com/FpeSre/status/1971123589400219975,#ai,Turns out “vibe coding” is like cooking by sme...,2025-09-25T08:04:20+00:00,https://x.com/FpeSre/status/1971123589400219975,FpeSre,,,,1971123589400219975,False,False,False,2025-09-25T08:04:20+00:00
3,https://x.com/KaijoRiskManage/status/197111290...,#ai,⚠️ People need to start acting now — AI won’t ...,2025-09-25T07:21:53+00:00,https://x.com/KaijoRiskManage/status/197111290...,KaijoRiskManage,,,,1971112908487786809,False,False,True,2025-09-25T07:21:53+00:00
4,https://x.com/NeuralLatent/status/197111815220...,#ai,Working on Retrieval System beyond RAG #AI #RA...,2025-09-25T07:42:43+00:00,https://x.com/NeuralLatent/status/197111815220...,NeuralLatent,,,,1971118152206729564,False,False,False,2025-09-25T07:42:43+00:00


In [70]:
df.dtypes

url                  object
label                object
text                 object
timestamp            object
uri                  object
username             object
user_id              object
user_display_name    object
tweet_id             object
conversation_id      object
is_reply             object
is_quote             object
user_verified        object
datetime             object
dtype: object