In [1]:
from cloudpathlib import AnyPath

from app.settings import settings

#root_path = AnyPath("/home/john/projects/labcats/new_process_data")
root_path = AnyPath(settings.root_path)

data_root = root_path / "data"

import app.file_utils as fu
from importlib import reload
reload(fu)

fu.maybe_mkdir(data_root)

target_locations = ['Peckham','Gorton and Denton', 'Kensington and Bayswater', 'Bolsover','Makerfield']

[2026-02-17T15:57:03] INFO app.file_utils: Bucket already exists: categorum-test2


In [17]:
import app.process_week as pw
import elasticsearch
import os
import dotenv
import app.posts_and_comments as pacs
import app.file_utils as fu
from cloudpathlib import AnyPath

from datetime import timezone, timedelta

dotenv.load_dotenv()

print(f"Elasticsearch client version: {elasticsearch.__version__}")

es = elasticsearch.Elasticsearch(
    cloud_id=os.getenv("ES_CLOUD_ID"),
    api_key=os.getenv("API_KEY"),
)
print("Created basic ES client")

start_dt, end_dt = pw.get_most_recent_week(tz=timezone.utc, week_start=0, weeks_ago=1)

files_need_downloading = False

for location in target_locations:
    fout = fu.file_name_to_slug(data_root / f"posts_{location}_{start_dt.strftime('%Y-%m-%d')}_to_{end_dt.strftime('%Y-%m-%d')}.feather")
    if not AnyPath(fout).exists():
        files_need_downloading = True
    else:
        print (f"File {fout} already exists")

if files_need_downloading:
    print (start_dt, end_dt)
    posts_df = pw.download_entries_for_period(es, "dalmation-fb-posts", start_dt, end_dt+timedelta(hours=5), target_locations=target_locations)
    post_ids = list(set(posts_df['post.id'].values))
    comments_df = pw.download_entries_for_period(es, "dalmation-fb-comments", start_dt, end_dt,post_ids=post_ids)

    print (f"Downloaded {len(comments_df)} comments and {len(posts_df)} posts")
    df_all_posts_with_top_n_comments = pacs.get_posts_with_top_n_comments(posts_df, comments_df, n_comments=10, min_comments=1)
    

    for location in target_locations:
        df_location = df_all_posts_with_top_n_comments[df_all_posts_with_top_n_comments['tags.location'] == location]
        fout = fu.file_name_to_slug(data_root / f"posts_{location}_{start_dt.strftime('%Y-%m-%d')}_to_{end_dt.strftime('%Y-%m-%d')}.feather")
        fu.write_feather_to_anypath(df_location, fout)


Elasticsearch client version: (9, 3, 0)
Created basic ES client
File gs://categorum-test2/hyperlocal/data/posts_peckham_2026-02-02_to_2026-02-09.feather already exists
File gs://categorum-test2/hyperlocal/data/posts_gorton-and-denton_2026-02-02_to_2026-02-09.feather already exists
File gs://categorum-test2/hyperlocal/data/posts_kensington-and-bayswater_2026-02-02_to_2026-02-09.feather already exists
File gs://categorum-test2/hyperlocal/data/posts_bolsover_2026-02-02_to_2026-02-09.feather already exists
File gs://categorum-test2/hyperlocal/data/posts_makerfield_2026-02-02_to_2026-02-09.feather already exists


In [18]:
import re
import app.post_processing as pp
import app.file_utils as fu
from importlib import reload
reload(pp)

locations_to_process = ['Gorton and Denton',]

# Discover posts files that don't have a corresponding processed file yet
if locations_to_process:
    # Slugify location names to match the filenames on disk/GCS
    loc_slugs = [fu.file_name_to_slug(loc) for loc in locations_to_process]
    loc_group = "|".join(re.escape(s) for s in loc_slugs)
    posts_pattern = re.compile(rf"^posts_({loc_group})_(\d{{4}}-\d{{2}}-\d{{2}})_to_(\d{{4}}-\d{{2}}-\d{{2}})\.feather$")
else:
    posts_pattern = re.compile(r"^posts_(.+)_(\d{4}-\d{2}-\d{2})_to_(\d{4}-\d{2}-\d{2})\.feather$")

unprocessed = []
for f in data_root.iterdir():
    m = posts_pattern.match(f.name)
    if not m:
        continue
    location_slug, date_from, date_to = m.groups()
    processed_name = f"processed_{location_slug}_{date_from}_to_{date_to}.feather"
    if not (data_root / processed_name).exists():
        unprocessed.append((date_from, date_to, location_slug, f))

# Sort chronologically (earliest first)
unprocessed.sort()

print(f"Found {len(unprocessed)} unprocessed posts file(s)")
for date_from, date_to, location_slug, input_path in unprocessed:
    print(f"  {input_path.name}")

for date_from, date_to, location_slug, input_path in unprocessed:
    print(f"\nProcessing {input_path.name} ...")
    pipeline = pp.PostProcessingPipeline(
        categories_path=root_path/"categories_to_study.json",
        tags_path=data_root/"tags"/location_slug,
        intermediary_path=AnyPath("/home/john/projects/hyperlocal/junk"),
    )
    processed_df = pipeline.process(input_path, save_intermediary_files=True)
    output_filename = f"processed_{location_slug}_{date_from}_to_{date_to}.feather"
    fu.write_feather_to_anypath(processed_df, data_root / output_filename)
    print(f"  Saved {output_filename}")


Found 0 unprocessed posts file(s)


In [14]:
pipeline._df_stage2.category.value_counts()

category
Public services.Roads & Infrastructure         16
Public services.Waste & Sanitation             11
Public services.Transport & Transit            10
Environmental.Biodiversity & Animal Welfare     6
Transport.Road Safety & Behavior                4
Crime.Road & Traffic Offenses                   4
Environmental.Waste Management & Recycling      4
Crime.Anti-Social Behavior                      3
Education.Governance & Policy                   3
Housing.Construction Standards                  3
Housing.Tenancy & Landlord Relations            3
Environmental.Water Management & Pollution      3
Public services.Council Governance              2
Environmental.Urban Planning & Green Spaces     2
Environmental.Local Government Maintenance      1
Education.Safety & Facilities                   1
Transport.Road Maintenance                      1
Public services.Social Care                     1
Economy.Retail & High Street                    1
Public services.Postal & Delivery        

In [7]:
pipeline._df_stage3.category.value_counts()

category
Public services.Roads & Infrastructure        3
Public services.Social Care                   2
Democracy.Electoral Integrity                 1
Housing.Tenancy & Landlord Relations          1
Environmental.Waste Management & Recycling    1
Democracy.Leadership & Party Critique         1
Cultural issues.Urban Transformation          1
Name: count, dtype: int64

In [15]:
import app.tag_manager as tm
from importlib import reload
reload(tm)
df_merged = tm.merge_tags(pipeline._df_stage3,data_root / "tags" / location_slug)
df_merged.tag.value_counts()[:20]


[2026-02-17T16:15:26] INFO app.tag_manager: TagManager initialized | extra={'path': 'gs://categorum-test2/hyperlocal/data/tags/bolsover/tag_record.csv'}
[2026-02-17T16:15:26] DEBUG app.tag_manager: Saving tag record | extra={'path': 'gs://categorum-test2/hyperlocal/data/tags/bolsover/tag_record.csv', 'rows': 74}


tag
Roadworks & Closures                12
Waste Collection                    11
Dangerous Driving                    4
Bus Service Changes                  4
Parking Enforcement                  3
Youth Welfare                        3
Fly-tipping                          3
Council House Swaps                  3
Escaped livestock                    2
Unfinished & Defective Housing       2
Potholes & Road Condition            2
Dogs worrying livestock              2
Localized Flooding                   2
Great Wolf Development               2
Dog fouling                          2
Council Communications               1
Vegetation and Graffiti              1
School Crossing Safety               1
School-related Traffic & Parking     1
Road Closures                        1
Name: count, dtype: int64