# Advanced Pipeline Operations

Beyond transforming individual records and basic grouping, `Flow` offers a suite of more advanced methods for manipulating and combining data streams. These operations can help you with tasks like sorting, joining different datasets, handling duplicates, and working with sequences of records.

Many of these methods, especially those that need to reorder or compare records across the stream (like `sort` or `join`), may need to materialize part or all of the stream into memory. 

In [1]:
from penaltyblog.matchflow import Flow
from pprint import pprint

matches = [
    {
        "match_id": 123,
        "competition_name": "Premier League",
        "home_team": "Manchester City",
        "away_team": "Arsenal",
        "match_date": "2023-10-08",
    },
    {
        "match_id": 456,
        "competition_name": "Champions League",
        "home_team": "Real Madrid",
        "away_team": "Bayern Munich",
        "match_date": "2023-10-10",
    },
]

events = [
    {
        "event_id": 1,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:01:30.500",
        "type_name": "Pass",
        "player_name": "Kevin De Bruyne",
        "location": [60.1, 40.3],
        "pass_recipient_name": "Erling Haaland",
        "pass_outcome_name": "Complete",
        "team_name": "Manchester City",
    },
    {
        "event_id": 2,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:01:32.100",
        "type_name": "Shot",
        "player_name": "Erling Haaland",
        "location": [85.5, 50.2],
        "shot_xg": 0.05,
        "shot_outcome_name": "Goal",
        "team_name": "Manchester City",
    },
    {
        "event_id": 3,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:02:05.000",
        "type_name": "Duel",
        "player_name": "Rodri",
        "duel_type_name": "Tackle",
        "duel_outcome_name": "Won",
        "team_name": "Manchester City",
    },
    {
        "event_id": 4,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:02:10.000",
        "type_name": "Pass",
        "player_name": "Kevin De Bruyne",
        "location": [70.0, 25.0],
        "pass_recipient_name": "Jack Grealish",
        "pass_outcome_name": "Incomplete",
        "team_name": "Manchester City",
    },
    {
        "event_id": 5,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:03:00.000",
        "type_name": "Shot",
        "player_name": "Bukayo Saka",
        "location": [75.0, 60.0],
        "shot_xg": 0.01,
        "shot_outcome_name": "Post",
        "team_name": "Arsenal",
    },
]

## Sorting and Ordering

### Sorting Records (`.sort()`)
The `.sort()` method sorts the records in the `Flow` based on the values in a specified field.

Important: Sorting requires all records to be loaded into memory to determine their correct order. This means `.sort()` materializes and consumes the `Flow` it's called on, returning a new `Flow` with the sorted records.

Records where the sort key (`by` field) is `None` are always placed at the very end of the sorted sequence, regardless of the reverse order.

In [2]:
# This will sort events chronologically.
sorted_events_flow = Flow(events).sort(by="timestamp")

# Example: Sorting shots by xG in descending order
shots_flow = (
    Flow(events)
    .filter(lambda r: r.get("type_name") == "Shot")
    .sort(by="shot_xg", reverse=True)
)

# Get the shot with the highest xG
shots_flow.first()

{'event_id': 2,
 'match_id': 123,
 'period': 1,
 'timestamp': '00:01:32.100',
 'type_name': 'Shot',
 'player_name': 'Erling Haaland',
 'location': [85.5, 50.2],
 'shot_xg': 0.05,
 'shot_outcome_name': 'Goal',
 'team_name': 'Manchester City'}

### Assigning Row Numbers (`.row_number()`)

The `.row_number()` method first sorts the records by the by field (similar to `.sort()`) and then assigns a sequential integer rank (starting from 1) to each record in a new field.

Like `.sort()`, this method also materializes and consumes the Flow.

Records where the `by` field is None are placed at the end and receive None as their row number.

In [3]:
shots_flow = (
    Flow(events)
    .filter(lambda r: r.get("type_name") == "Shot")
    .row_number(by="shot_xg", new_field="xg_rank", reverse=True)
)

# Each shot record now has an 'xg_rank' field.
# The shot with the highest xG will have xg_rank = 1.
pprint(shots_flow.head(2).collect())

[{'event_id': 2,
  'location': [85.5, 50.2],
  'match_id': 123,
  'period': 1,
  'player_name': 'Erling Haaland',
  'shot_outcome_name': 'Goal',
  'shot_xg': 0.05,
  'team_name': 'Manchester City',
  'timestamp': '00:01:32.100',
  'type_name': 'Shot',
  'xg_rank': 1},
 {'event_id': 5,
  'location': [75.0, 60.0],
  'match_id': 123,
  'period': 1,
  'player_name': 'Bukayo Saka',
  'shot_outcome_name': 'Post',
  'shot_xg': 0.01,
  'team_name': 'Arsenal',
  'timestamp': '00:03:00.000',
  'type_name': 'Shot',
  'xg_rank': 2}]


## Slicing and Sampling the Stream

We've seen `head()` (which is also an alias for `limit()`) already. Here's a recap and a method for taking from the end.

### Limiting Records (`.limit()` / `.head()`)

`limit(n)` returns a new `Flow` that will yield at most the first `n` records from the original `Flow`. The original `Flow`'s internal pointer advances if the limited `Flow` is consumed.

```python
first_five_events = events_flow.limit(5)
# equivalent to:
first_five_events = events_flow.head(5)
```

### Taking the Last Records (`.take_last()`)

`take_last(n)` returns a new `Flow` containing the last `n` records.

Important: This method needs to read the entire stream to determine which records are the last ones. It materializes and consumes the Flow it's called on.

```python
last_three_events_flow = events_flow.take_last(3)
```

### Reservoir Sampling (`.sample()`)

`sample(n, seed=None)` performs reservoir sampling to select exactly `n` records uniformly at random from the stream.

This method also materializes and consumes the `Flow` as it needs to process all records to build the sample reservoir.

If the stream has fewer than `n` items, all items are returned.

```python
# set seed for reproducibility
random_sample_10_events = events_flow.sample(n=10, seed=42) 
```

### Bernoulli Sampling (`.sample_frac()`)

`sample_frac(frac, seed=None)` includes each record in the output `Flow` with a probability of `frac` (a float between 0.0 and 1.0).

This method processes the stream lazily and does not need to materialize everything.

The number of records in the output is approximate.

```python
approx_20_percent_of_events = events_flow.sample_frac(frac=0.2, seed=123)
```

## Combining Flows (`.concat()`)

The `concat()` method allows you to append one or more other `Flow` objects to the end of the current `Flow`, creating a single, longer `Flow`.

```python
match1_events_flow = Flow.statsbomb.from_github_file(match_id=123, type="events")
match2_events_flow = Flow.statsbomb.from_github_file(match_id=456, type="events")

combined_events_flow = match1_events_flow.concat(match2_events_flow)
# Now combined_events_flow will stream all records from match1, followed by all from match2.
```

The original flows are effectively chained together; records are streamed lazily.

## Joining Data (`.join()`)

Joining is a fundamental operation for combining data from two different sources based on a common key. `my_flow.join(other_flow, left_on, right_on, fields=None, how="left")` merges records from my_flow (the "left" side) with records from other_flow (the "right" side).

- **Important**: The other_flow (or list of dicts) is fully materialized into memory to create an efficient lookup table for the join. The my_flow (left side) is streamed.
- `left_on`: The field name in my_flow to join on.
- `right_on`: The field name in other_flow to join on. If None, it defaults to the left_on field name.
- `fields`: An optional list of field names to include from the other_flow's matching records. If `None`, all fields from other_flow (except the right_on key itself) are included.
- how:
    - "left" (default): Keeps all records from my_flow. If a match is found in other_flow, fields are merged. If no match, fields from other_flow are not added (effectively None).
    - "inner": Only keeps records from my_flow where a match is found in other_flow.

### Example: Adding competition name to our events data


In [None]:
events_flow = Flow(events)
matches_flow = Flow(matches)

events_with_matchesflow = events_flow.join(
    other=matches_flow,
    left_on="match_id",  # Key in events_flow
    right_on="match_id",  # Key in match_info_flow
    fields=["competition_name"],  # Only bring this field from match_info_flow
)

results = events_with_matchesflow.collect()

pprint(results[0])

# If a match_id in events_flow didn't exist in matches_flow,
# 'competition_name' would be missing from the output.

### Example: Inner join to get events only for matches we have the competition name for

In [7]:
events_flow = Flow(events)
matches_flow = Flow(matches)

events_with_matchesflow = events_flow.join(
    other=matches_flow,
    left_on="match_id",  # Key in events_flow
    right_on="match_id",  # Key in match_info_flow
    fields=["competition_name"],  # Only bring this field from match_info_flow
    how="inner",
)

results = events_with_matchesflow.collect()

# This time, if a match_id in events_flow didn't exist in matches_flow,
# that record would be dropped from the output.

The `other` argument to `join` can also be a plain Python list of dictionaries instead of a `Flow` object.

## Handling Duplicates

### Dropping Duplicate Records (`.drop_duplicates()`)

- `.drop_duplicates(*fields, keep="first")` removes duplicate records.
- If no `*fields` are specified, it considers the entire record for duplication.
- If `*fields` are provided, only those fields are used to identify duplicates.
- keep:
    - "first" (default): Keeps the first occurrence of a duplicate set.
    - "last": Keeps the last occurrence.
    - False: Drops all records that are part of any duplicate set.

This method generally needs to keep track of "seen" records/keys, so it can be memory-intensive depending on the number of unique keys and size of your data.

### Example: Dropping exact duplicate events (if any)

```python
unique_events_flow = events_flow.drop_duplicates()

first_event_by_player_type_flow = events_flow.drop_duplicates("player_name", "type_name", keep="first")
```

## Getting Unique Field Values or Combinations (`.unique()`)

`.unique(*fields)` returns a `Flow` of unique values or combinations of values for the specified fields.

If one field is given (e.g., `my_flow.unique("player_name")`), the output `Flow` will contain records like `{"player_name": "Player A"}, {"player_name": "Player B"}` for each unique player.

If multiple fields are given (e.g., `my_flow.unique("team_name", "type_name")`), the output `Flow` will contain records representing unique combinations, like `{"team_name": "Team A", "type_name": "Pass"}, {"team_name": "Team A", "type_name": "Shot"}`.

If no fields are specified, it behaves like `.drop_duplicates()` considering the whole record, yielding unique records.

Similar to `drop_duplicates`, this can be memory-intensive depending on the number of unique keys present.

### Example: Get a list of all unique player names

```python
unique_players_flow = events_flow.unique("player_name")
unique_player_names = [r["player_name"] for r in unique_players_flow.collect()]
```

These advanced operations provide significant power for more complex data wrangling and analysis scenarios. The `join` method, in particular, is useful for enriching datasets by combining information from multiple sources. Always be mindful of which operations might consume the entire stream into memory, especially when working with very large datasets.
