# Event Ingestion Pipeline Testing

This notebook tests the **config-driven** event ingestion pipeline.
All sources (Ra.co, Ticketmaster, etc.) are created through `PipelineFactory`
using YAML configuration — no source-specific code needed.

**Pipeline flow:**
1. Factory reads `ingestion.yaml` and creates pipelines
2. Each pipeline fetches raw data via its adapter (GraphQL / REST)
3. FieldMapper extracts + transforms fields per config
4. TaxonomyMapper assigns Human Experience Taxonomy dimensions
5. Events are normalized to `EventSchema` and optionally enriched by LLM

In [None]:
pip install

[31mERROR: You must give at least one requirement to install (see "pip help install")[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


In [13]:
import sys
import os
import logging

import yaml

# Setup path — point to services/api so src.* imports work
API_ROOT = os.path.abspath(os.path.join("..", "services", "api"))
if API_ROOT not in sys.path:
    sys.path.insert(0, API_ROOT)

# Enable logging
logging.basicConfig(
    level=logging.INFO,
    format="%(name)s - %(levelname)s - %(message)s",
)

import pandas as pd

print(f"API root: {API_ROOT}")
print("Setup complete")

API root: /Users/josegarcia/Documents/GitHub/event-intelligence-platform/services/api
Setup complete


## Step 1: PipelineFactory — List All Configured Sources

The factory reads `ingestion.yaml` and can create pipelines for any enabled source.

In [14]:
from src.ingestion.factory import PipelineFactory

factory = PipelineFactory()

print("Configured Sources:")
print("=" * 50)
for name, info in factory.list_sources().items():
    status = "ENABLED" if info["enabled"] else "disabled"
    print(f"  {name:20} type={info['type']:10} [{status}]")

print(f"\nEnabled sources: {factory.list_enabled_sources()}")

Configured Sources:
  ra_co                type=api        [ENABLED]
  ticketmaster         type=api        [disabled]

Enabled sources: ['ra_co']


## Step 2: Ra.co Pipeline — Multi-City Ingestion

The Ra.co pipeline is created entirely from config. It uses:
- GraphQL API adapter
- Multi-city execution (Barcelona + Madrid via `defaults.areas`)
- Date-window splitting for complete coverage
- FieldMapper for extraction + transformations
- FeatureExtractor (LLM) for taxonomy enrichment

In [15]:
ra_co = factory.create_pipeline("ra_co")

print(f"Pipeline: {ra_co.config.source_name}")
print(f"Source type: {ra_co.source_type.value}")
print(f"Protocol: {ra_co.source_config.protocol}")
print(f"Endpoint: {ra_co.source_config.endpoint}")
print(f"Areas: {ra_co.source_config.defaults.get('areas', {})}")
print(f"Days ahead: {ra_co.source_config.defaults.get('days_ahead')}")
print(f"Feature extractor: {ra_co.feature_extractor is not None}")

Pipeline: ra_co
Source type: api
Protocol: graphql
Endpoint: https://ra.co/graphql
Areas: {'Barcelona': '20', 'Madrid': '28'}
Days ahead: 120
Feature extractor: False


In [16]:
# Execute Ra.co pipeline (multi-city: Barcelona + Madrid)
raco_result = ra_co.execute()

print("Ra.co Pipeline Results")
print("=" * 60)
print(f"Status: {raco_result.status.value}")
print(f"Total raw events: {raco_result.total_events_processed}")
print(f"Successful: {raco_result.successful_events}")
print(f"Failed: {raco_result.failed_events}")
print(f"Duration: {raco_result.duration_seconds:.2f}s")
print(f"Success rate: {raco_result.success_rate:.1f}%")
print(f"Cities: {raco_result.metadata.get('cities', [])}")

if raco_result.errors:
    print(f"\nErrors: {raco_result.errors}")

pipeline.ra_co - INFO - Starting multi-city execution: ra_co_20260210_205508_aa0e2b24 (2 cities)
pipeline.ra_co - INFO - Fetching events for Barcelona (area_id=20)...
pipeline.ra_co - INFO -   Barcelona: sliding window fetch [2026-02-10..2026-06-10] (capacity=500/call, window=7d)
src.ingestion.pipelines.apis.base_api - INFO - Fetching page 1/10...
src.ingestion.pipelines.apis.base_api - ERROR - API errors: [{'message': 'Int cannot represent non-integer value: "20"'}, {'message': 'Int cannot represent non-integer value: "50"'}, {'message': 'Int cannot represent non-integer value: "1"'}]
src.ingestion.pipelines.apis.base_api - INFO - Pagination complete: fetched 0 total events across 1 pages
src.ingestion.pipelines.apis.base_api - INFO - Fetching page 1/10...
src.ingestion.pipelines.apis.base_api - ERROR - API errors: [{'message': 'Int cannot represent non-integer value: "20"'}, {'message': 'Int cannot represent non-integer value: "50"'}, {'message': 'Int cannot represent non-integer val

Ra.co Pipeline Results
Status: failed
Total raw events: 0
Successful: 0
Failed: 0
Duration: 38.69s
Success rate: 0.0%
Cities: ['Barcelona', 'Madrid']


In [22]:
# Show sample normalized events
if raco_result.events:
    print(f"Sample Events ({len(raco_result.events)} total):")
    print("=" * 70)

    for i, event in enumerate(raco_result.events[:5]):
        print(f"\n[{i+1}] {event.title}")
        print(f"    City: {event.location.city} | Venue: {event.location.venue_name}")
        print(f"    Date: {event.start_datetime}")
        print(f"    Type: {event.event_type} | Price: {event.price.price_raw_text}")
        print(f"    Source URL: {event.source.source_url}")
        desc = (event.description or 'N/A')[:120]
        print(f"    Description: {desc}...")
        print(f"    Quality: {event.data_quality_score:.2f}")

## Step 3: Ticketmaster Pipeline (REST API)

Ticketmaster uses a REST API (not GraphQL), proving the pipeline is source-agnostic.

**Note:** Requires `TICKETMASTER_API_KEY` environment variable. If not set, this section is skipped.

In [18]:
tm_result = None
tm_api_key = os.environ.get("TICKETMASTER_API_KEY")

if tm_api_key:
    ticketmaster = factory.create_pipeline("ticketmaster")
    print(f"Pipeline: {ticketmaster.config.source_name}")
    print(f"Protocol: {ticketmaster.source_config.protocol}")
    print(f"Endpoint: {ticketmaster.source_config.endpoint}")

    tm_result = ticketmaster.execute(
        city="Barcelona",
        country_code="ES",
    )

    print(f"\nTicketmaster Results")
    print("=" * 60)
    print(f"Status: {tm_result.status.value}")
    print(f"Total: {tm_result.total_events_processed}")
    print(f"Successful: {tm_result.successful_events}")
    print(f"Duration: {tm_result.duration_seconds:.2f}s")
else:
    print("TICKETMASTER_API_KEY not set — skipping Ticketmaster pipeline.")
    print("Set the env var and re-run to test REST API ingestion.")

TICKETMASTER_API_KEY not set — skipping Ticketmaster pipeline.
Set the env var and re-run to test REST API ingestion.


## Step 4: Multi-Source Aggregation

Combine events from all sources and deduplicate.

In [19]:
all_events = list(raco_result.events) if raco_result.events else []

if tm_result and tm_result.events:
    all_events.extend(tm_result.events)

print(f"Total events across all sources: {len(all_events)}")

# Count by source
from collections import Counter
source_counts = Counter(e.source.source_name for e in all_events)
print(f"\nEvents by source:")
for source, count in source_counts.items():
    print(f"  {source}: {count}")

# Count by city
city_counts = Counter(e.location.city for e in all_events)
print(f"\nEvents by city:")
for city, count in sorted(city_counts.items(), key=lambda x: -x[1]):
    print(f"  {city}: {count}")

Total events across all sources: 0

Events by source:

Events by city:


## Step 5: DataFrame Visualization

Convert events to a comprehensive DataFrame using `pipeline.to_dataframe()`.

In [20]:
# Use the pipeline's built-in to_dataframe method
df = ra_co.to_dataframe(all_events)

print(f"DataFrame shape: {df.shape}")
print(f"\nColumns ({len(df.columns)} total):")
for col in df.columns:
    print(f"  {col}")

DataFrame shape: (0, 0)

Columns (0 total):


In [21]:
# Show key columns
key_cols = [
    "title", "city", "venue_name", "start_datetime",
    "event_type", "price_is_free", "price_minimum",
    "source_name", "data_quality_score",
]
df[key_cols].head(10)

KeyError: "None of [Index(['title', 'city', 'venue_name', 'start_datetime', 'event_type',\n       'price_is_free', 'price_minimum', 'source_name', 'data_quality_score'],\n      dtype='str')] are in the [columns]"

## Step 6: Taxonomy Enrichment

View the taxonomy dimensions populated by TaxonomyMapper and FeatureExtractor.

In [None]:
enrichment_cols = [
    "title",
    "taxonomy_subcategory",
    "taxonomy_subcategory_name",
    "taxonomy_energy_level",
    "taxonomy_social_intensity",
    "taxonomy_cognitive_load",
    "taxonomy_physical_involvement",
    "taxonomy_cost_level",
    "taxonomy_time_scale",
    "taxonomy_environment",
    "taxonomy_emotional_output",
    "taxonomy_age_accessibility",
    "taxonomy_repeatability",
]

# Only show columns that exist in the DataFrame
available = [c for c in enrichment_cols if c in df.columns]
print(f"Taxonomy Enrichment Data ({len(available)} columns):")
df[available].head(10)

## Step 7: Summary Statistics

In [None]:
print("=" * 60)
print("INGESTION SUMMARY")
print("=" * 60)

print(f"\nTotal events: {len(df)}")
print(f"Average quality score: {df['data_quality_score'].mean():.3f}")

print(f"\n--- By Source ---")
print(df.groupby("source_name").size().to_string())

print(f"\n--- By City ---")
print(df.groupby("city").size().sort_values(ascending=False).to_string())

print(f"\n--- By Event Type ---")
print(df.groupby("event_type").size().sort_values(ascending=False).to_string())

print(f"\n--- Free vs Paid ---")
print(df.groupby("price_is_free").size().to_string())

print(f"\n--- Date Range ---")
print(f"Earliest: {df['start_datetime'].min()}")
print(f"Latest:   {df['start_datetime'].max()}")

## Step 8: Save Results (Optional)

In [None]:
output_dir = "../data/raw"
os.makedirs(output_dir, exist_ok=True)
output_path = f"{output_dir}/events_all_sources.parquet"
df.to_parquet(output_path, index=False)
print(f"Saved {len(df)} events to {output_path}")

## Cleanup

In [None]:
ra_co.close()
if tm_api_key:
    ticketmaster.close()
print("Resources released.")