# Step : Review Collection - Tutorial

**Purpose:** Collect Google Maps reviews for discovered business locations

**What you'll learn:**
- How to use the ReviewCollector
- Different output modes (CSV, JSON, JSON-per-city, JSON-per-business)
- Checkpoint and resume functionality
- Review filtering and validation

**For Junior Developers:**
- Tests progress from simple to complex
- Clear outputs show what data looks like
- Checkpoint system prevents data loss on interruption
- Multiple output formats for different use cases

## What's New: Enhanced Place ID Support

This notebook now supports the **enhanced discovery system** with:
- **Canonical Place IDs**: Validated `ChIJ...` format for reliable review collection
- **Multiple ID types**: Handles `canonical_place_id`, `place_id`, and `data_id`
- **Backward compatibility**: Works with both old and new discovery formats

**New columns you might see:**
- `canonical_place_id`: Resolved canonical Google Maps place ID
- `resolve_status`: How the ID was obtained (cache/api_resolved/already_canonical)
- `data_id`: Numeric CID-like identifier (fallback if canonical not available)

The collection system automatically uses the best available ID for each location!

## Setup and Imports

In [None]:
# Add parent directory to path
import sys
from pathlib import Path

project_root = Path().resolve().parent
sys.path.insert(0, str(project_root / "src"))

print(f"Project root: {project_root}")
print(f"Python path updated")

In [None]:
# Import required libraries
import pandas as pd
import json
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns

# Import our modules
from review_analyzer.collect import ReviewCollector
from review_analyzer import config

print(" All imports successful!")
print(f"\nAvailable output modes:")
print(f" - {config.OutputMode.CSV}")
print(f" - {config.OutputMode.SINGLE_JSON}")
print(f" - {config.OutputMode.JSON_PER_CITY}")
print(f" - {config.OutputMode.JSON_PER_BUSINESS}")

## Data Architecture Overview

The pipeline uses an organized folder structure:

```
data/
 00_config/ # Static configurations
 cities/ # City aliases, coordinates, regions.geojson
 templates/ # Business templates (banks_template.csv)
 0_raw/ # Immutable source data
 discovery/ # Discovered places (timestamped folders)
 reviews/ # Raw reviews (timestamped folders)
 0_interim/ # Recomputable cache
 collection/ # Reviews from collect step
 transform/ # Normalized reviews + regions
 checkpoints/ # Resume points for long operations
 0_processed/ # Final outputs
 discovery/ # Processed agency lists
 collection/ # Collected reviews
 classification/ # Classified reviews (with sentiments/topics)
 0_analysis/ # Reports, figures, dashboards
 99_archive/ # Deprecated data

logs/ # Pipeline execution logs
```

**This notebook saves data to:**
- `data/0_interim/collection/` - Collected reviews (Parquet + CSV)
- `data/0_processed/collection/` - Test outputs


## Test : Initialize Collector & Check Input Data

**What this does:** 
- Creates ReviewCollector instance
- Loads agencies from Step (discover_placeids.ipynb)

**Expected output:** Confirmation of agencies loaded

In [None]:
# Initialize collector
collector = ReviewCollector(debug=True)

print(" ReviewCollector initialized successfully!")
print(f" Debug mode: {collector.debug}")
print(f" Client ready: {collector.client is not None}")

In [None]:
# Check for input data from Step (or use latest from processed/)
# Try new data architecture first
input_file = project_root / "data" / "0_processed" / "discovery" / "agencies_discovered.csv"

# Fallback to legacy path
if not input_file.exists():
 input_file = project_root / "data" / "output" / "agencies_for_collection.csv"

if input_file.exists():
 agencies_df = pd.read_csv(input_file)
 
 print(f" INPUT DATA LOADED")
 print(f"="*60)
 print(f" File: {input_file.name}")
 print(f" Path: {input_file.parent}")
 print(f" Total agencies: {len(agencies_df)}")
 print(f" Columns: {list(agencies_df.columns)}")
 
 # Check for new columns from enhanced discovery
 if 'canonical_place_id' in agencies_df.columns:
 print(f"\n Enhanced discovery data detected!")
 canonical_count = agencies_df['canonical_place_id'].notna().sum()
 print(f" Canonical place IDs: {canonical_count}/{len(agencies_df)}")
 
 if 'resolve_status' in agencies_df.columns:
 print(f"\n Resolution status:")
 status_counts = agencies_df['resolve_status'].value_counts()
 for status, count in status_counts.items():
 print(f" - {status}: {count}")
 
 # Support both old (_bank) and new (_business) column names
 business_col = '_business' if '_business' in agencies_df.columns else '_bank'
 if business_col in agencies_df.columns:
 print(f"\n Businesses: {agencies_df[business_col].nunique()}")
 for business in agencies_df[business_col].unique()[:]: # Show first 
 count = len(agencies_df[agencies_df[business_col] == business])
 print(f" - {business}: {count} locations")
 if agencies_df[business_col].nunique() > :
 print(f" ... and {agencies_df[business_col].nunique() - } more")
 
 if '_city' in agencies_df.columns:
 print(f"\n Cities: {agencies_df['_city'].nunique()}")
 for city in agencies_df['_city'].unique()[:]: # Show first 
 count = len(agencies_df[agencies_df['_city'] == city])
 print(f" - {city}: {count} locations")
 if agencies_df['_city'].nunique() > :
 print(f" ... and {agencies_df['_city'].nunique() - } more")
 
 print(f"\n Sample:")
 display(agencies_df.head())
 
else:
 print(" No agencies file found!")
 print(f" Expected: {input_file}")
 print(f"\n Please run discover_placeids.ipynb first to generate agencies.")


## Test : Collect Reviews for Single Agency (CSV Mode)

**What this does:** Collects reviews for ONE agency to test the system

**Use case:** Quick test before full run

**Expected output:** CSV file with reviews and metadata

In [None]:
# Create test file with single agency
test_agencies = agencies_df.head().copy()
test_input = project_root / "data" / "0_interim" / "collection" / "test_single_agency.csv"
test_input.parent.mkdir(parents=True, exist_ok=True)
test_agencies.to_csv(test_input, index=False)

print(f" TEST: Single Agency")
print(f"="*60)
print(f" Agency: {test_agencies['title'].iloc[0] if 'title' in test_agencies.columns else test_agencies['_place_id'].iloc[0]}")
print(f" Place ID: {test_agencies['_place_id'].iloc[0]}")

# Show additional info if available
if 'canonical_place_id' in test_agencies.columns:
 canonical = test_agencies['canonical_place_id'].iloc[0]
 if pd.notna(canonical):
 print(f" Canonical ID: {canonical}")

if 'resolve_status' in test_agencies.columns:
 status = test_agencies['resolve_status'].iloc[0]
 print(f" Resolution: {status}")

# Output paths (use new data architecture)
output_path = project_root / "data" / "0_processed" / "collection" / f"test_single_reviews_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
output_path.parent.mkdir(parents=True, exist_ok=True)
checkpoint_path = project_root / "data" / "0_interim" / "collection" / "checkpoints" / "test_single_checkpoint.json"
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)

print(f"\n Output: {output_path.name}")
print(f" Checkpoint: {checkpoint_path.name}\n")

# Collect reviews
stats = collector.collect_reviews(
 input_file=test_input,
 output_mode="csv",
 output_path=output_path,
 checkpoint_file=checkpoint_path
)

print("\n" + "="*60)
print(" COLLECTION STATS")
print("="*60)
for key, value in stats.items():
 print(f" {key}: {value}")


In [None]:
# Inspect collected reviews
if output_path.exists():
 reviews_df = pd.read_csv(output_path)
 
 print(f"\n REVIEWS COLLECTED")
 print(f"="*60)
 print(f" Total reviews: {len(reviews_df)}")
 print(f" Columns: {list(reviews_df.columns)}\n")
 
 # Show sample reviews
 print("Sample reviews:")
 display(reviews_df.head())
 
 # Rating distribution
 if 'rating' in reviews_df.columns:
 print(f"\n Rating Distribution:")
 rating_counts = reviews_df['rating'].value_counts().sort_index()
 for rating, count in rating_counts.items():
 stars = '' * int(rating)
 bar = '' * (count * 0 // len(reviews_df))
 print(f" {stars} ({rating}): {count:} {bar}")
 
 # Review length statistics
 if 'text' in reviews_df.columns:
 reviews_df['text_length'] = reviews_df['text'].fillna('').str.len()
 print(f"\n Review Length Stats:")
 print(f" Average: {reviews_df['text_length'].mean():.0f} characters")
 print(f" Median: {reviews_df['text_length'].median():.0f} characters")
 print(f" Max: {reviews_df['text_length'].max():.0f} characters")
else:
 print(f" Output file not found: {output_path}")

## Test : Collect for Multiple Agencies (CSV Mode)

**What this does:** Collects reviews for agencies

**Use case:** Medium-sized test run

**Expected output:** Combined CSV with reviews from all agencies

In [None]:
# Create test file with agencies
test_agencies = agencies_df.head().copy()
test_input = project_root / "data" / "0_interim" / "collection" / "test_five_agencies.csv"
test_input.parent.mkdir(parents=True, exist_ok=True)
test_agencies.to_csv(test_input, index=False)

print(f" TEST: Five Agencies")
print(f"="*60)
print(f" Agencies: {len(test_agencies)}")
if 'title' in test_agencies.columns:
 for idx, title in enumerate(test_agencies['title'], ):
 print(f" {idx}. {title}")

# Output paths (use new data architecture)
output_path = project_root / "data" / "0_processed" / "collection" / f"test_five_reviews_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
output_path.parent.mkdir(parents=True, exist_ok=True)
checkpoint_path = project_root / "data" / "0_interim" / "collection" / "checkpoints" / "test_five_checkpoint.json"
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)

print(f"\n Output: {output_path.name}")
print(f" Checkpoint: {checkpoint_path.name}\n")

# Collect reviews
stats = collector.collect_reviews(
 input_file=test_input,
 output_mode="csv",
 output_path=output_path,
 checkpoint_file=checkpoint_path
)

print("\n" + "="*60)
print(" COLLECTION STATS")
print("="*60)
for key, value in stats.items():
 print(f" {key}: {value}")


In [None]:
# Analyze reviews by agency
if output_path.exists():
 reviews_df = pd.read_csv(output_path)
 
 print(f"\n REVIEWS BY AGENCY")
 print(f"="*60)
 print(f" Total reviews: {len(reviews_df)}\n")
 
 # Group by place_id
 if '_place_id' in reviews_df.columns:
 review_counts = reviews_df.groupby('_place_id').size().sort_values(ascending=False)
 print(" Reviews per agency:")
 for place_id, count in review_counts.items():
 # Get agency name if available
 agency_row = reviews_df[reviews_df['_place_id'] == place_id].iloc[0]
 name = agency_row.get('title', place_id[:0])
 print(f" {name}: {count} reviews")
 
 # Overall rating stats
 if 'rating' in reviews_df.columns:
 print(f"\n Overall stats:")
 print(f" Average rating: {reviews_df['rating'].mean():.f} ")
 print(f" Median rating: {reviews_df['rating'].median():.f} ")
 print(f" -star reviews: {(reviews_df['rating'] == ).sum()} ({(reviews_df['rating'] == ).sum()/len(reviews_df)*00:.f}%)")
 print(f" -star reviews: {(reviews_df['rating'] == ).sum()} ({(reviews_df['rating'] == ).sum()/len(reviews_df)*00:.f}%)")
 
 # Sample
 print(f"\n Sample reviews:")
 display(reviews_df.head())
else:
 print(f" Output file not found")

## Test : JSON-per-City Mode

**What this does:** Collects reviews and saves separate JSON file per city

**Use case:** City-level analysis or reporting

**Expected output:** One JSON file per city

In [None]:
# Use agencies from multiple cities if available
test_agencies = agencies_df.head(0).copy()
test_input = project_root / "data" / "0_interim" / "collection" / "test_json_per_city_agencies.csv"
test_input.parent.mkdir(parents=True, exist_ok=True)
test_agencies.to_csv(test_input, index=False)

print(f" TEST: JSON-per-City Mode")
print(f"="*60)
print(f" Agencies: {len(test_agencies)}")
if '_city' in test_agencies.columns:
 cities = test_agencies['_city'].unique()
 print(f" Cities: {len(cities)} ({', '.join(cities)})")

# Output directory (use new data architecture)
output_dir = project_root / "data" / "0_processed" / "collection" / f"json_per_city_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_dir.mkdir(exist_ok=True, parents=True)
checkpoint_path = project_root / "data" / "0_interim" / "collection" / "checkpoints" / "test_json_city_checkpoint.json"
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)

print(f"\n Output dir: {output_dir.name}")
print(f" Checkpoint: {checkpoint_path.name}\n")

# Collect reviews
stats = collector.collect_reviews(
 input_file=test_input,
 output_mode="json-per-city",
 output_dir=output_dir,
 checkpoint_file=checkpoint_path
)

print("\n" + "="*60)
print(" COLLECTION STATS")
print("="*60)
for key, value in stats.items():
 print(f" {key}: {value}")


In [None]:
# Inspect JSON files created
json_files = sorted(output_dir.glob("*.json"))

print(f"\n JSON FILES CREATED")
print(f"="*60)
print(f" Total files: {len(json_files)}\n")

for json_file in json_files:
 with open(json_file) as f:
 data = json.load(f)
 
 # Count reviews in this file
 total_reviews = sum(len(reviews) for reviews in data.values())
 
 print(f" {json_file.name}")
 print(f" Places: {len(data)}")
 print(f" Reviews: {total_reviews}")
 
 # Show structure of first place
 if data:
 first_place_id = list(data.keys())[0]
 first_reviews = data[first_place_id]
 print(f" Sample place: {first_place_id}")
 print(f" Sample reviews: {len(first_reviews)}")
 if first_reviews:
 print(f" Sample review keys: {list(first_reviews[0].keys())}")
 print()

## Test : JSON-per-Business Mode

**What this does:** Collects reviews and saves separate JSON file per business

**Use case:** Business-level analysis or reporting

**Expected output:** One JSON file per business

In [None]:
# Use agencies from multiple businesses if available
test_agencies = agencies_df.head(0).copy()
test_input = project_root / "data" / "0_interim" / "collection" / "test_json_per_business_agencies.csv"
test_input.parent.mkdir(parents=True, exist_ok=True)
test_agencies.to_csv(test_input, index=False)

print(f" TEST: JSON-per-Business Mode")
print(f"="*60)
print(f" Agencies: {len(test_agencies)}")

# Support both old (_bank) and new (_business) column names
business_col = '_business' if '_business' in test_agencies.columns else '_bank'
if business_col in test_agencies.columns:
 businesses = test_agencies[business_col].unique()
 print(f" Businesses: {len(businesses)} ({', '.join(businesses)})")

# Output directory (use new data architecture)
output_dir = project_root / "data" / "0_processed" / "collection" / f"json_per_business_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_dir.mkdir(exist_ok=True, parents=True)
checkpoint_path = project_root / "data" / "0_interim" / "collection" / "checkpoints" / "test_json_business_checkpoint.json"
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)

print(f"\n Output dir: {output_dir.name}")
print(f" Checkpoint: {checkpoint_path.name}\n")

# Collect reviews
stats = collector.collect_reviews(
 input_file=test_input,
 output_mode="json-per-business",
 output_dir=output_dir,
 checkpoint_file=checkpoint_path
)

print("\n" + "="*60)
print(" COLLECTION STATS")
print("="*60)
for key, value in stats.items():
 print(f" {key}: {value}")


In [None]:
# Inspect JSON files created
json_files = sorted(output_dir.glob("*.json"))

print(f"\n JSON FILES CREATED")
print(f"="*60)
print(f" Total files: {len(json_files)}\n")

for json_file in json_files:
 with open(json_file) as f:
 data = json.load(f)
 
 # Count reviews
 total_reviews = sum(len(reviews) for reviews in data.values())
 
 print(f" {json_file.name}")
 print(f" Places: {len(data)}")
 print(f" Reviews: {total_reviews}")
 print()

## Test 6: Checkpoint and Resume

**What this does:** Tests checkpoint/resume functionality

**Scenario:** Simulates interrupted collection that can be resumed

**Expected output:** Second run resumes from where first run stopped

In [None]:
# Create test with agencies
test_agencies = agencies_df.head().copy()
test_input = project_root / "data" / "0_interim" / "collection" / "test_checkpoint_agencies.csv"
test_input.parent.mkdir(parents=True, exist_ok=True)
test_agencies.to_csv(test_input, index=False)

output_path = project_root / "data" / "0_processed" / "collection" / f"test_checkpoint_reviews.csv"
output_path.parent.mkdir(parents=True, exist_ok=True)
checkpoint_path = project_root / "data" / "0_interim" / "collection" / "checkpoints" / "test_checkpoint_demo.json"
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)

# Delete old checkpoint if exists
if checkpoint_path.exists():
 checkpoint_path.unlink()
 print(" Deleted old checkpoint\n")

print(f" TEST: Checkpoint and Resume")
print(f"="*60)
print(f" Agencies: {len(test_agencies)}")
print(f"\n RUN : Initial collection")

# First run
stats = collector.collect_reviews(
 input_file=test_input,
 output_mode="csv",
 output_path=output_path,
 checkpoint_file=checkpoint_path
)

print("\n First run complete")
print(f" Places processed: {stats['total_places']}")
print(f" Checkpoint created: {checkpoint_path.exists()}")

# Check checkpoint contents
if checkpoint_path.exists():
 with open(checkpoint_path) as f:
 checkpoint_data = json.load(f)
 print(f" Checkpoint contains: {list(checkpoint_data.keys())}")


In [None]:
# Second run - should use checkpoint
print(f"\n RUN : Resume from checkpoint")
print(f" (Should skip already processed places)\n")

stats = collector.collect_reviews(
 input_file=test_input,
 output_mode="csv",
 output_path=output_path,
 checkpoint_file=checkpoint_path
)

print("\n" + "="*60)
print(" COMPARISON")
print("="*60)
print(f" Run places: {stats['total_places']}")
print(f" Run places: {stats['total_places']}")
print(f"\n Checkpoint system working!")
print(f" Second run used existing checkpoint to avoid re-processing.")


## Test 7: Visualize Collection Results

**What this does:** Creates visualizations of collected reviews

**Outputs:**
- Rating distribution histogram
- Reviews per agency bar chart
- Review length distribution

In [None]:
# Load most recent reviews CSV (check new data architecture first)
output_dir = project_root / "data" / "0_processed" / "collection"
if not output_dir.exists():
 output_dir = project_root / "data" / "output" # Fallback to legacy

review_files = sorted(output_dir.glob("*reviews*.csv"), reverse=True)

if review_files:
 latest_file = review_files[0]
 print(f" Loading: {latest_file.name}\n")
 
 reviews_df = pd.read_csv(latest_file)
 
 print(f" REVIEW VISUALIZATIONS")
 print(f"="*60)
 print(f"Total reviews: {len(reviews_df)}\n")
 
 # Detect business column (support both old and new)
 business_col = None
 if '_business' in reviews_df.columns:
 business_col = '_business'
 elif '_bank' in reviews_df.columns:
 business_col = '_bank'
 
 # Set style
 plt.style.use('seaborn-v0_8-whitegrid')
 
 # . Rating distribution
 if 'rating' in reviews_df.columns:
 fig, ax = plt.subplots(figsize=(0, 6))
 rating_counts = reviews_df['rating'].value_counts().sort_index()
 colors = ['#d678', '#ff7f0e', '#ffbb78', '#98df8a', '#ca0c']
 rating_counts.plot(kind='bar', ax=ax, color=colors)
 ax.set_title('Review Rating Distribution', fontsize=, fontweight='bold')
 ax.set_xlabel('Rating (Stars)', fontsize=)
 ax.set_ylabel('Number of Reviews', fontsize=)
 ax.set_xticklabels([f'{int(x)} ' for x in rating_counts.index], rotation=0)
 ax.grid(axis='y', alpha=0.)
 
 # Add value labels on bars
 for i, v in enumerate(rating_counts):
 ax.text(i, v + 0., str(v), ha='center', va='bottom')
 
 plt.tight_layout()
 plt.show()
 
 # . Reviews per business location
 if business_col and business_col in reviews_df.columns:
 review_counts = reviews_df.groupby(business_col).size().sort_values(ascending=False).head(0)
 
 fig, ax = plt.subplots(figsize=(0, 6))
 review_counts.plot(kind='barh', ax=ax, color='steelblue')
 ax.set_title('Reviews per Business Location (Top 0)', fontsize=, fontweight='bold')
 ax.set_xlabel('Number of Reviews', fontsize=)
 ax.set_ylabel('Business', fontsize=)
 ax.grid(axis='x', alpha=0.)
 plt.tight_layout()
 plt.show()
 
 # . Review length distribution
 if 'text' in reviews_df.columns:
 reviews_df['text_length'] = reviews_df['text'].fillna('').str.len()
 
 fig, ax = plt.subplots(figsize=(0, 6))
 ax.hist(reviews_df['text_length'], bins=0, color='coral', edgecolor='black', alpha=0.7)
 ax.set_title('Review Length Distribution', fontsize=, fontweight='bold')
 ax.set_xlabel('Review Length (characters)', fontsize=)
 ax.set_ylabel('Frequency', fontsize=)
 ax.axvline(reviews_df['text_length'].mean(), color='red', linestyle='--', label=f'Mean: {reviews_df["text_length"].mean():.0f}')
 ax.axvline(reviews_df['text_length'].median(), color='green', linestyle='--', label=f'Median: {reviews_df["text_length"].median():.0f}')
 ax.legend()
 ax.grid(axis='y', alpha=0.)
 plt.tight_layout()
 plt.show()
 
else:
 print(" No review files found. Run a test first!")


## Export for Next Step

**What this does:** Prepares review data for Step (Classification)

**Output:** Clean CSV file ready for classification notebook

In [None]:
# Export for classification or transform
if review_files:
 reviews_df = pd.read_csv(review_files[0])
 
 # Detect business column (support both old and new)
 business_col = '_business' if '_business' in reviews_df.columns else '_bank'
 
 # Prepare for next step (transform then classification)
 required_columns = ['_place_id', '_city', business_col, 'text', 'rating']
 available_columns = [col for col in reviews_df.columns if col in required_columns or col.startswith('_')]
 
 # Keep only reviews with text
 if 'text' in reviews_df.columns:
 df_clean = reviews_df[reviews_df['text'].notna()].copy()
 else:
 df_clean = reviews_df.copy()
 
 # Save to interim for transform step (new data architecture)
 next_step_file = project_root / "data" / "0_interim" / "collection" / "reviews.parquet"
 next_step_file.parent.mkdir(parents=True, exist_ok=True)
 df_clean.to_parquet(next_step_file, index=False)
 
 # Also save CSV for backward compatibility
 next_step_csv = project_root / "data" / "0_interim" / "collection" / "reviews.csv"
 df_clean.to_csv(next_step_csv, index=False)
 
 print(f" EXPORT COMPLETE")
 print(f"="*60)
 print(f" Parquet: {next_step_file.name}")
 print(f" CSV: {next_step_csv.name}")
 print(f" Records: {len(df_clean)}")
 print(f" Columns: {list(df_clean.columns)}")
 
 if 'text' in df_clean.columns:
 reviews_with_text = df_clean['text'].notna().sum()
 print(f" Reviews with text: {reviews_with_text} ({reviews_with_text/len(df_clean)*00:.f}%)")
 
 # Show business breakdown
 if business_col in df_clean.columns:
 business_counts = df_clean[business_col].value_counts()
 print(f"\n Reviews by business:")
 for business, count in business_counts.head().items():
 print(f" - {business}: {count}")
 if len(business_counts) > :
 print(f" ... and {len(business_counts) - } more")
 
 print(f"\n Data saved to: data/0_interim/collection/")
 print(f"\n Next steps:")
 print(f" . Transform reviews (normalize + add region)")
 print(f" Run: python -m review_analyzer.main transform")
 print(f" . Classify reviews (extract sentiments/topics)")
 print(f" Open: classify_reviews.ipynb")
else:
 print(" No review files found. Run a test first!")


## Summary for New Developers

**What you learned:**

. **ReviewCollector** - The main class for collecting reviews from Google Maps
. **Enhanced Place ID Support** - Automatic handling of canonical place IDs, resolution status, and data IDs
. **Output modes:**
 - `csv` - Single CSV file (easiest for analysis)
 - `json` - Single JSON file (structured data)
 - `json-per-city` - Separate files per city
 - `json-per-business` - Separate files per business location
. **Checkpoint system** - Automatic save/resume for long operations (prevents data loss)
. **Review metadata** - Ratings, dates, authors, full review text
6. **New data architecture** - Organized folder structure (00_config, 0_raw, 0_interim, 0_processed, 0_analysis)

**Key takeaways:**
- System auto-detects canonical place IDs from enhanced discovery output
- Start with single business location tests before running full collections
- Use checkpoints for large collections to enable safe interruption and resume
- Choose output mode based on your analysis needs
- Always validate collected data with visualizations
- Data is saved to `data/0_interim/collection/` for next pipeline stage

**Output mode decision guide:**
- `csv` For spreadsheet analysis, simple processing, data exploration
- `json` For programmatic access, single file simplicity
- `json-per-city` For city-level reports or parallel processing by city
- `json-per-business` For business-level reports or competitive analysis

**New features (Enhanced Discovery):**
- **Canonical Place IDs**: System validates and resolves place IDs to standardized ChIJ format
- **Resolution Status**: Track how place IDs were resolved (cache, api_resolved, already_canonical, etc.)
- **OSM Integration**: Automatic city alias resolution (e.g., "Kénitra" "Kenitra")
- **Backward Compatibility**: Works with both old (`_bank`) and new (`_business`) data formats

**Data Flow (New Architecture):**
```
discover collect transform classify
 ↓ ↓ ↓ ↓
0_raw/ 0_interim/ 0_interim/ 0_processed/
discovery collection transform classification
```

**Next steps:**
. You've discovered place_ids (Step : discover_placeids.ipynb)
. You've collected reviews (Step : collect_reviews.ipynb - this notebook!)
. Transform reviews - normalize fields, add regions (CLI: `python -m review_analyzer.main transform`)
. Classify reviews to extract topics and sentiments (Step : classify_reviews.ipynb)

**Pipeline Command (Run all steps):**
```bash
python -m review_analyzer.main pipeline \
 --businesses "Attijariwafa Bank" \
 --cities "Casablanca" \
 --business-type "bank"
```
