# Pipeline

Now that we have seen the queries, let us see how we can use the `pipeline` component
of IQB to run such queries and store the results locally.

The source code for them lives inside `./library/src/iqb/pipeline`.

As a starting point, let's instantiate the pipeline with:

1. a directory where to cache the query results

2. a billing project on BigQuery

Optionally, we can configure a remote cache to avoid running
queries multiple times. Not needed for now.

In [3]:
from iqb import IQBPipeline

pipeline = IQBPipeline(project="measurement-lab", data_dir="01-pipeline.dir")

OK, we create an entry. This is a lazy operation that does not
download *any* data until we ask it to do so.

In [4]:
from iqb import IQBDatasetGranularity, IQBDatasetMLabTable, iqb_dataset_name_for_mlab

entry = pipeline.get_cache_entry(
    dataset_name=iqb_dataset_name_for_mlab(
        granularity=IQBDatasetGranularity.COUNTRY_CITY_ASN,
        table=IQBDatasetMLabTable.DOWNLOAD,
    ),
    enable_bigquery=True,  # costly/slow so we must opt-in
    start_date="2025-12-01",  # start date is *inclusive*
    end_date="2026-01-01",  # end date is *exclusive*
)

OK, now that we have an entry, we must sync it to get the
data we need from BigQuery.

*Note*: For this code to work you need to be subscribed to the
`discuss@` mailing list and you need to run

```
gcloud auth application-default login
```

to bind the subscribed email to querying BigQuery.

Each subscribed user has a daily quota of queries they can run.

Here we're using the most defensive possible code pattern
in which we protect against concurrent writes.

In [5]:
with entry.lock():
    if not entry.exists():
        entry.sync()



Output()



Output()

Once this is completed, we have synced data inside the cache.

Let us get print information about the entry and the files it contains:

In [7]:
print(entry.dir_path())
print(entry.data_parquet_file_path())
print(entry.stats_json_file_path())

01-pipeline.dir/cache/v1/20251201T000000Z/20260101T000000Z/downloads_by_country_city_asn
01-pipeline.dir/cache/v1/20251201T000000Z/20260101T000000Z/downloads_by_country_city_asn/data.parquet
01-pipeline.dir/cache/v1/20251201T000000Z/20260101T000000Z/downloads_by_country_city_asn/stats.json


The directory structure starts with `cache/v1` identifying this cache iteration.

The start date (inclusive) and end date (exclusive) are represented in ISO format and
allow for representing non-fixed, partially overlapped date ranges.

The specific dataset has a name (poised to change later maybe) where we encode the
granularity that we're using, which includes:

1. country
2. subdivision1
2. city
3. asn

The data is a parquet file containing just the downloads.

There is also a stats file. Let's inspect it first:

In [9]:
import json

with open(entry.stats_json_file_path()) as fp:
    print(json.dumps(json.load(fp), indent=4))

{
    "query_start_time": "2026-01-14T13:16:46.609000Z",
    "query_duration_seconds": 43.947,
    "template_hash": "5a02c1e253053cf9619f49d92c0e9edb96c97d0b9110d304e209677551fc6c9b",
    "total_bytes_processed": 1478555536174,
    "total_bytes_billed": 1478556123136
}


So, as you can see we record the start time and duration.

We also have the template hash, useful for ensuring queries use the same hash.

We also have accounting information on the number of bytes processed.

The design of the queries is not optimized to minimize the number of queries to run, rather
it aims to optimize for clarity and readability.

The result is a parquet file containing a dumped query.

We have efficient routines for reading the parquet file using PyArrow, such
that we only end up reading the bytes we need on disk rather than having
to load the whole file in memory (possibly useful in general).

Let us exercise this functionality as well (this is not how you use `iqb` when
doing research, but I want you to undertand the details):

In [13]:
from iqb.pipeline import iqb_parquet_read

table = iqb_parquet_read(
    entry.data_parquet_file_path(),
    country_code="US",
    subdivision1="Massachusetts",
    city="Boston",
)

The return value is a `pandas.DataFrame`.

Let's see what it contains:

In [17]:
table.columns.values

array(['country_code', 'subdivision1_iso_code', 'subdivision1_name',
       'city', 'asn', 'as_name', 'sample_count', 'download_p1',
       'download_p5', 'download_p10', 'download_p25', 'download_p50',
       'download_p75', 'download_p90', 'download_p95', 'download_p99',
       'latency_p1', 'latency_p5', 'latency_p10', 'latency_p25',
       'latency_p50', 'latency_p75', 'latency_p90', 'latency_p95',
       'latency_p99', 'loss_p1', 'loss_p5', 'loss_p10', 'loss_p25',
       'loss_p50', 'loss_p75', 'loss_p90', 'loss_p95', 'loss_p99'],
      dtype=object)

Let's conclude our overview by just selecting some columns for illustrative purposes:

In [19]:
table[
    [
        "country_code",
        "subdivision1_name",
        "city",
        "asn",
        "as_name",
        "sample_count",
        "download_p50",
        "latency_p50",
        "loss_p50",
    ]
]

Unnamed: 0,country_code,subdivision1_name,city,asn,as_name,sample_count,download_p50,latency_p50,loss_p50
0,US,Massachusetts,Boston,111,Boston University,328,79.922534,8.681,0.000559
1,US,Massachusetts,Boston,156,Northeastern University,339,88.643479,10.991,0.007602
2,US,Massachusetts,Boston,174,Cogent Communications,1108,99.204716,8.485,0.000192
3,US,Massachusetts,Boston,701,"MCI Communications Services, Inc. d/b/a Verizo...",5921,243.179888,9.420,0.002561
4,US,Massachusetts,Boston,714,Apple Inc.,5,73.260620,9.928,0.000000
...,...,...,...,...,...,...,...,...,...
96,US,Massachusetts,Boston,394548,netBlazr,30,136.196817,10.000,0.048689
97,US,Massachusetts,Boston,394784,Massachusetts Eye and Ear Infirmary,1,560.583307,7.723,0.072631
98,US,Massachusetts,Boston,395354,"Starry, Inc.",1478,169.514935,12.862,0.001091
99,US,Massachusetts,Boston,396128,Mount Holyoke College,1,801.655241,10.134,0.214498
