# Mining Frequent Co‑growth Patterns in PM₂.₅ Data with **Dask**

<br>

In this tutorial, we will apply [Dask](https://docs.dask.org/en/latest/) and a scalable Apriori algorithm to explore frequent co-growth patterns from multi-year PM₂.₅ sensor data. The workflow will guide you through the following key steps:

- Represent and preprocess multi-station time series data with Dask DataFrame
- Construct daily transaction sets using Dask Bag
- Implement and run a distributed Apriori algorithm
- Explore frequent co-growth patterns across cities
- Visualize and interpret high-support itemsets


**Credits**

- This tutorial is **adapted from:** Zhang, L., & Yang, G. (2022). Cluster analysis of PM2. 5 pollution in China using the frequent itemset clustering approach. Environmental Research, 204, 112009. <https://www.sciencedirect.com/science/article/pii/S0013935121013049>

- The **dataset is downloaded from:** Bai, K., Li, K., Wu, C., Chang, N. B., & Guo, J. (2020). A homogenized daily in situ PM 2.5 concentration dataset from national air quality monitoring network in China. Earth System Science Data Discussions, 2020, 1-30. <https://doi.pangaea.de/10.1594/PANGAEA.917557>

**Compatibility**

| Platform                      | Compatible | Recommended | Notes                                                                                                                                    |
| ----------------------------- | ---------- | ----------- | ---------------------------------------------------------------------------------------------------------------------------------------- |
| **Local (e.g., 16GB laptop)** | ⚠️ Limited     | ❌ No      | Limited compatibility with a local dask `Client()`. |
| **Google Colab**              | ⚠️ Limited     | ❌ No  |  Limited compatibility with Colab. 2-station co-growths calculation might take too long to complete.   |
| **Midway3 Login Node**        | ✅ Yes      | ✅ Yes       | This notebook is designed to run from a Midway3 Login Node with a dask `SLURMCluster()`.    |
| **Midway3 Compute Node**      | ✅ Yes     | ❌ No      | Runs with a `LocalCluster()` on a compute node. |

## 1. Why Dask?

When analyzing data from hundreds of air quality stations spanning multiple years, a pure‑Pandas approach may exceed memory limits or run very slowly. By deferring execution and parallelizing operations, [Dask](https://docs.dask.org/en/latest/) preserves an interactive experience similar to pandas while distributing computations efficiently. Specifically, Dask enables us to:
- **Scale out** computations across multiple cores or a cluster
- **Partition** large tables into manageable chunks
- Work with familiar APIs such as `DataFrame`, `Bag`, and `Delayed`. Each API constructs **a lazy execution graph** that you trigger with `.compute()` or `.persist()`. This design enables out-of-core processing and parallelism. More details can be found at https://docs.dask.org/en/latest/overview.html.

| API           | Use Case                                 | Key Methods                                                                                                                                                                                                                                                                                                                                                                                                               |
| ------------- | ---------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **DataFrame** | Handles tabular operations (diff, groupby, join) | [`.read_csv()`, `.map_partitions()`](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions), `.compute()`                                                                                                                                                                                                                                                                            |
| **Bag**       | Manages unstructured collections (transactions)  | [`.to_bag()`](https://docs.dask.org/en/latest/generated/dask.dataframe.to_bag.html), [`.map()`](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.map), [`.flatten()`](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.flatten), [`.frequencies()`](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.frequencies), [`.fold()`](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.fold) |
| **Delayed**   | Builds custom task graphs for arbitrary Python functions                       | `dask.delayed`, `.compute()`, `.persist()`                                                                                                                                                                                                                                                                 |


                                                                        
                                       



### Dependencies

In [None]:
%pip install -q dask[complete] dask-jobqueue

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/52.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.0/52.0 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m59.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os

from dask_jobqueue import SLURMCluster
from dask.distributed import Client

import dask.dataframe as dd

## 2. Launching a Dask `SLURMCluster`

**Launching a Dask Cluster *from Colab***

In [None]:
client = Client()
client

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:42801
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44365'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:34607'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:33057 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:33057
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:52466
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:38255 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:38255
INFO:distributed.core:Starting established connection to tcp://127

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 12.67 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42801,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:38255,Total threads: 1
Dashboard: http://127.0.0.1:38593/status,Memory: 6.34 GiB
Nanny: tcp://127.0.0.1:44365,
Local directory: /tmp/dask-scratch-space/worker-2o25n63a,Local directory: /tmp/dask-scratch-space/worker-2o25n63a

0,1
Comm: tcp://127.0.0.1:33057,Total threads: 1
Dashboard: http://127.0.0.1:33371/status,Memory: 6.34 GiB
Nanny: tcp://127.0.0.1:34607,
Local directory: /tmp/dask-scratch-space/worker-jhqc6npg,Local directory: /tmp/dask-scratch-space/worker-jhqc6npg


**Launching a Dask Cluster *from Midway Login Node***

We use the following block to configure and launches a **Dask cluster on a SLURM-managed HPC system** (for example, Midway) using the `dask-jobqueue` package:

- `SLURMCluster(...)` allocates compute resources via SLURM (e.g., Midway3).
- Parameters define cores, memory, wall time, and network interface.
- `cluster.scale(jobs=1)` requests 1 SLURM job (each with 10 Dask workers here).
- Output and error logs are saved to `./logs/`.

The `SLURMCluster` class handles Slurm job submission under the hood and allow you to launch and interact with the cluster directly **from the login node**.

In [None]:
SLURM_ACCOUNT = "macs40123"

In [None]:
# Create logging dir
os.makedirs("./logs", exist_ok=True)

In [None]:
# Compose SLURM script
cluster = SLURMCluster(
    queue="amd",
    cores=10,
    memory="60GB",
    processes=10,
    walltime="04:00:00",
    interface="ib0",
    job_extra_directives=[
        f"--account={SLURM_ACCOUNT}",
        "--output=logs/dask-%j.out",
        "--error=logs/dask-%j.err",
    ],
)

# Request resources
cluster.scale(jobs=1)

In [None]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://172.25.0.65:8787/status,

0,1
Dashboard: http://172.25.0.65:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://172.25.0.65:40815,Workers: 0
Dashboard: http://172.25.0.65:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


## 3. Data Preprocessing

The [PM2.5 dataset](https://doi.pangaea.de/10.1594/PANGAEA.917557) (Bai et al., 2020) is a five-year homogenized, daily, in-situ record of PM2.5 concentrations compiled from data retrieved via a web crawler from the China National Environmental Monitoring Center (CNEMC) for the period of 2015-2019. A set of methods for gap filling, data merging, homogeneity test, and bias correction was applied to enhance data integrity and adjust discontinuities in the original observations. After rigorous quality control, the resulting dataset provides 1,309 long-term, temporally complete PM2.5 time series at daily resolution.

Each row corresponds to a **date**, and each column represents a **monitoring station** (`S0001`, `S0002`, ..., `S1309`), capturing the spatial-temporal dynamics of air pollution across the country.

We now load the homogenized PM₂.₅ text file into a Dask DataFrame for scalable processing.

In [None]:
# Run this if you are on Colab or your local machine

!mkdir ./datasets
!wget -q -P "./datasets" https://store.pangaea.de/Publications/Bai-etal_2020/PM2.5_data_in_China.zip
!cd ./datasets && unzip -o PM2.5_data_in_China.zip

DATA_DIR = "./datasets/PM2.5 data in China"

Archive:  PM2.5_data_in_China.zip
   creating: PM2.5 data in China/
  inflating: PM2.5 data in China/Homogenized_PM25_data.txt  
  inflating: PM2.5 data in China/Goelocation.txt  


In [None]:
# Run this if you are on Midway
DATA_DIR = "../datasets/pm25"

In [None]:
# Read the data, skipping metadata lines
df = dd.read_csv(
    os.path.join(DATA_DIR, "Homogenized_PM25_data.txt"),
    sep=r"\s+",
    skiprows=4,
    parse_dates=["Date"],
).set_index("Date")

In [None]:
df.head()

Unnamed: 0_level_0,S0001,S0002,S0003,S0004,S0005,S0006,S0007,S0008,S0009,S0010,...,S1300,S1301,S1302,S1303,S1304,S1305,S1306,S1307,S1308,S1309
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2015-01-01,106,89,102,106,112,99,104,97,81,108,...,143,138,85,152,100,113,113,31,67,65
2015-01-02,53,46,44,61,56,56,59,28,37,42,...,137,88,60,176,57,60,158,39,44,70
2015-01-03,154,160,192,173,179,140,123,136,96,138,...,213,151,23,209,89,65,133,46,62,42
2015-01-04,143,173,213,169,221,150,176,176,95,183,...,208,240,48,199,167,106,187,44,82,60
2015-01-05,72,114,98,83,108,84,144,105,77,142,...,465,349,110,166,33,119,132,32,48,46


## 4. Building Your Transaction Bag

To extract association rules, we will convert each day’s Boolean mask into a **transaction**: a list of station IDs that recorded an increase on that day.


The[`.diff()`](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.diff.html) method computes the difference between each element and its predecessor along the index (i.e., the date). In this PM₂.₅ context:

- **Operation:** for every station column, `df.diff()` subtracts the previous day’s concentration from the current day’s concentration.
- **Result:** a DataFrame of the same shape where each value represents the **day‑to‑day change** in PM₂.₅.
- **Why it matters:** By checking whether the change is `> 0`, we transform numeric differences into a Boolean mask—`True` when PM₂.₅ **increased** compared to the previous day, and `False` otherwise.


In [None]:
# Compute Boolean mask: True if PM2.5 increased vs. previous day
inc_mask = df.map_partitions(lambda pdf: pdf.diff() > 0)

In [None]:
inc_mask.head()

Unnamed: 0_level_0,S0001,S0002,S0003,S0004,S0005,S0006,S0007,S0008,S0009,S0010,...,S1300,S1301,S1302,S1303,S1304,S1305,S1306,S1307,S1308,S1309
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2015-01-01,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
2015-01-02,False,False,False,False,False,False,False,False,False,False,...,False,False,False,True,False,False,True,True,False,True
2015-01-03,True,True,True,True,True,True,True,True,True,True,...,True,True,False,True,True,True,False,True,True,False
2015-01-04,False,True,True,False,True,True,True,True,False,True,...,False,True,True,False,True,True,True,False,True,True
2015-01-05,False,False,False,False,False,False,False,False,False,False,...,True,True,True,False,False,True,False,False,False,False


We then use the `inc_mask` dask DataFrame to create a **dask bag** of Python lists, where each list contains the station IDs that recorded an increase on a given day.


### What Are Dask Bags?

A [**Dask Bag**](https://docs.dask.org/en/stable/bag.html) is a high‑level Dask collection for processing **unordered** and **unstructured** data — essentially a parallelized list of Python objects such as dictionaries, lists, or pandas objects.

In our workflow:

1. We start with a Boolean Dask DataFrame (`inc_mask`).
2. Using **`.to_bag(record=True)`**, each row is converted into a pandas.Series, producing one element per day.
3. Applying **`.map(lambda row: …)`** to transform each Series into a plain Python `list` of station IDs marked `True` — representing our daily “transaction.”

Check [Dask Bag API reference](https://docs.dask.org/en/latest/bag-api.html) for more details.


In [None]:
# Convert DataFrame → Bag of daily Series
bag = inc_mask.to_bag(format="dict")

# Map each Series → list of station IDs with True flags
transactions = bag.map(lambda rec: [col for col, val in rec.items() if val])

In [None]:
print(transactions.take(2)[1])

['S0022', 'S0042', 'S0070', 'S0071', 'S0072', 'S0073', 'S0074', 'S0076', 'S0077', 'S0078', 'S0079', 'S0080', 'S0081', 'S0082', 'S0083', 'S0084', 'S0129', 'S0154', 'S0180', 'S0184', 'S0295', 'S0296', 'S0299', 'S0302', 'S0303', 'S0304', 'S0305', 'S0306', 'S0307', 'S0308', 'S0309', 'S0310', 'S0311', 'S0313', 'S0314', 'S0315', 'S0316', 'S0320', 'S0321', 'S0326', 'S0328', 'S0329', 'S0330', 'S0331', 'S0332', 'S0333', 'S0334', 'S0335', 'S0336', 'S0337', 'S0339', 'S0341', 'S0342', 'S0346', 'S0347', 'S0348', 'S0349', 'S0350', 'S0351', 'S0352', 'S0353', 'S0354', 'S0355', 'S0356', 'S0357', 'S0358', 'S0359', 'S0360', 'S0361', 'S0362', 'S0363', 'S0364', 'S0365', 'S0366', 'S0367', 'S0368', 'S0369', 'S0370', 'S0371', 'S0372', 'S0373', 'S0374', 'S0375', 'S0376', 'S0377', 'S0378', 'S0379', 'S0380', 'S0381', 'S0382', 'S0383', 'S0384', 'S0385', 'S0386', 'S0388', 'S0389', 'S0390', 'S0391', 'S0392', 'S0393', 'S0394', 'S0395', 'S0396', 'S0397', 'S0411', 'S0412', 'S0413', 'S0414', 'S0415', 'S0416', 'S0417', 

## 5. Distributed Apriori Step‑by‑Step

The Apriori algorithm finds **frequent itemsets** (e.g. city pairs) that co-occur in transactions (days) with PM₂.₅ increases. It applies the **downward closure property** (also called the Apriori property):

> “If an itemset is frequent, all of its subsets must also be frequent.”

This property allows us to **prune** a large number of candidate early to save time and memory when processing large-scale datasets: there's no need to test a $k$-itemset if any of its $(k-1)$ subsets are infrequent.

We first calculate the **support** of an itemset $X$  as the fraction of transactions where **all elements** of $X$ appear together:

$$
s(X) = \frac{|\{ t \in \mathbb{D} \mid X \subseteq t \}|}{|\mathbb{D}|}
$$

In this task, it is the fraction of days where *all stations in $X$* reported increase in PM₂.₅.


**The Algorithm:**

1. **Level-1** (singletons): count $s(\{x\})$ using `flatten().frequencies()`.
2. **Join Step**: generate $k$-item candidates **from frequent $(k-1)$-itemsets**.
3. **Calculate support**: check which $X \subseteq t$ across all transactions $t$ in parallel.
4. **Filter**: retain itemsets where $s(X) \geq \text{min_support}$.

Repeat until no new frequent itemsets or `max_length` is reached. The result is a list of dictionaries from 1-itemsets to k-itemsets with their support ratios.


In [None]:
import dask.bag as db
from itertools import combinations


def apriori_dask(transactions, min_support=0.5, max_length=None):
    """
    Apriori algorithm for frequent itemset mining using Dask,
    *returning support ratios* for each itemset.

    Parameters:
    ------------
    transactions : dask.bag.Bag
        A Dask Bag where each element is an iterable of items (one transaction).
    min_support : float or int, default=0.5
        Minimum support threshold. If <=1, treated as fraction of total transactions;
        if >1, treated as absolute transaction count.
    max_length : int or None, default=None
        Maximum size of itemsets to consider. None for no limit.

    Returns:
    --------
    freq_itemsets : list of dict
        freq_itemsets[k-1] is a dict mapping each frequent k-itemset (frozenset)
        to its support ratio (count / total_transactions).
    """
    # Total number of transactions
    total_transactions = transactions.count().compute()

    # Determine absolute support count threshold
    if min_support <= 1:
        min_count = min_support * total_transactions
    else:
        min_count = min_support

    freq_itemsets = []

    # 1-itemsets: count single items
    freq1 = transactions.flatten().frequencies().compute()
    L1 = {
        frozenset([item]): count / total_transactions
        for item, count in freq1
        if count >= min_count
    }
    freq_itemsets.append(L1)

    # Prepare for k ≥ 2
    prev_L = list(L1.keys())
    k = 2

    # Iteratively generate larger itemsets
    while prev_L and (max_length is None or k <= max_length):
        print(f"Processing level {k}...")
        # All unique items from previous level
        items = set().union(*prev_L)
        # Generate all k‑combinations as candidates
        candidates = [frozenset(c) for c in combinations(sorted(items), k)]

        # Count support for each candidate in parallel
        def find_candidates_in_txn(txn):
            txn_set = set(txn)
            return [c for c in candidates if c.issubset(txn_set)]

        candidate_counts = (
            transactions.map(find_candidates_in_txn).flatten().frequencies().compute()
        )

        # Filter by support threshold and compute support ratio
        Lk = {
            itemset: count / total_transactions
            for itemset, count in candidate_counts
            if count >= min_count
        }
        if not Lk:
            break

        freq_itemsets.append(Lk)
        prev_L = list(Lk.keys())
        k += 1

    return freq_itemsets

## 6. Putting It All Together

### Mining Frequent Itemsets (min_support = 0.5)

We first apply the self-defined function `apriori_dask` to identify frequent itemsets (combinations of stations that frequently co-grow) in the daily PM₂.₅ change transactions:

* We set `min_support = 0.5` to capture only **very frequent** itemsets—those appearing in at least 50% of all days.
* We use `max_length=6` to limit the size of discovered patterns to at most 6 stations.
* The input `transactions` is repartitioned and persisted in memory to optimize repeated access.

This process produces a list of dictionaries: one per itemset length (1‑itemsets, 2‑itemsets, etc.), where each entry maps an itemset to its support ratio.

In [None]:
# This cell might take a couple minutes on a slower maching such as Colab
n_days = inc_mask.shape[0].compute()

# Persist transactions in memory for faster repeated use
transactions_bag = transactions.repartition(50).persist()

MIN_SUPPORT = 0.5
freq_itemsets = apriori_dask(transactions_bag, MIN_SUPPORT, max_length=6)

In [None]:
freq_itemsets

[{frozenset({'S0022'}): 0.5470974808324206,
  frozenset({'S0042'}): 0.5323110624315444,
  frozenset({'S0070'}): 0.5290251916757941,
  frozenset({'S0071'}): 0.5306681270536692,
  frozenset({'S0072'}): 0.536144578313253,
  frozenset({'S0073'}): 0.5328587075575028,
  frozenset({'S0074'}): 0.5268346111719606,
  frozenset({'S0076'}): 0.5372398685651698,
  frozenset({'S0077'}): 0.5279299014238773,
  frozenset({'S0078'}): 0.5279299014238773,
  frozenset({'S0079'}): 0.5372398685651698,
  frozenset({'S0080'}): 0.5388828039430449,
  frozenset({'S0081'}): 0.5339539978094195,
  frozenset({'S0082'}): 0.5405257393209201,
  frozenset({'S0083'}): 0.523001095290252,
  frozenset({'S0084'}): 0.5175246440306681,
  frozenset({'S0154'}): 0.5049288061336255,
  frozenset({'S0180'}): 0.5208105147864184,
  frozenset({'S0184'}): 0.515881708652793,
  frozenset({'S0349'}): 0.5016429353778752,
  frozenset({'S0352'}): 0.5021905805038335,
  frozenset({'S0359'}): 0.5268346111719606,
  frozenset({'S0360'}): 0.529572836

### Visualizing Pairwise Co-Growth with a Support Heatmap

This step visualizes the support of all 2‑station co-growths patterns using a heatmap:

* We re-run the Apriori algorithm with `min_support = 0.1` and `max_length = 2` to capture **a broader range of ** 2-item patterns.
* For each 2‑station itemset, we extract its pairwise support value and fill in a symmetric matrix.
* The matrix is visualized as a heatmap using a diverging `bwr` colormap: red (≈0.5) indicates strong co-growth; while blue represents weak or infrequent co-growth.

This visualization provides an interpretable overview of which station pairs tend to experience PM₂.₅ increases **simultaneously**, revealing possible spatial synchrony or shared pollution sources.

**Note: It's recommended to run this section on an HPC environment such as Midway** as 2-itemset mining with a low min_support is particularly computationally intensive.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 1. Mine frequent itemsets up to size 2, with a more inclusive min_support of 0.1
freq_sets = apriori_dask(transactions, min_support=0.1, max_length=2)

# 2. Extract the 2-itemset support dictionary
two_item_support = freq_sets[1]  # level 2 corresponds to index 1

# 3. Gather all unique items involved in 2-itemsets
items = sorted({item for itemset in two_item_support for item in itemset})

# 4. Build a square support matrix
support_matrix = pd.DataFrame(0.0, index=items, columns=items)
for itemset, support in two_item_support.items():
    i, j = sorted(itemset)
    support_matrix.at[i, j] = support
    support_matrix.at[j, i] = support

In [None]:
# 5. Plot the heatmap
plt.figure(figsize=(12, 10))
sns.heatmap(
    support_matrix,
    cmap=sns.color_palette("bwr", as_cmap=True),
    vmin=0.0,
    vmax=0.5,
)
plt.title("Support Heatmap of 2-Itemsets")
plt.xlabel("Station ID")
plt.ylabel("Station ID")
plt.tight_layout()
plt.show()

The figure above shows a 2D matrix of **pairwise support values** between all city pairs whose PM₂.₅ concentrations increased on the same day in at least 10% of the observed days between 2015 and 2018.

- Each **row and column** represents a weather station (ID: `S0001` to `S0338`), grouped into 7 macro-regions:  
  **N (1–35), C (36–78), E (79–156), S (157–195), NE (196–232), SW (233–286), NW (287–338)**.
- The **color** in each grid cell reflects the **support** of that 2-city co-growth pattern:  
  - **Red**: high co-growth frequency (cities often increase together)  
  - **Blue**: low or no co-growth (rarely increase together)

> This visualization reproduces **Figure 4** from Zhang & Yang (2020), offering a spatial lens into the temporal co-variation of urban PM₂.₅ pollution.

### What This Means:

- **Bright red diagonal blocks** indicate **intra-region coherence**, where cities within the same region (e.g. Eastern or Northern China) frequently experience simultaneous PM₂.₅ increases—likely due to shared weather patterns, industrial belts, or transport.
- **Off-diagonal red patches** reveal **inter-region linkages**, potentially identifying long-range pollution transport corridors.
- The grid offers a **data-driven similarity measure** for cities’ pollution dynamics. These can serve as input to clustering, rule-mining, or network models of air quality propagation.

## 7. Tuning, Tips & Next Steps

- **Adjust `min_support`**: Lower thresholds yield more itemsets but increase compute.
- **Optimize partitions**: Use `df.repartition(npartitions=...)` to control task sizes.
- **Monitor**: Use the Dask dashboard to inspect task progress, memory, and CPU usage.
- **Extensions**: Compute rule confidences/lifts, visualize with NetworkX, or cluster high‑lift patterns for spatial analysis.

With this scalable pattern mining workflow, you can efficiently analyze large environmental datasets and reveal hidden co‑growth relationships across cities.


---

## **Exercise: Mine Association Rules to Identify Potential Pollution Sources**

Your goal is to extend the pattern mining analysis to find **directional association rules**. Specifically, you'll identify which station's PM₂.₅ increase might be a precursor to another's. This can help generate hypotheses about pollution transport pathways or shared atmospheric conditions.

### **From Itemsets to Rules**

An association rule takes the form $X \rightarrow Y$, where $X$ (the **antecedent**) and $Y$ (the **consequent**) are non-empty itemsets that have no items in common. To evaluate the strength of a rule, we use two key metrics:

* **Confidence**: The probability of seeing itemset $Y$, given we see itemset $X$, measuring the reliability of a rule:

    $$
    \text{Confidence}(X \rightarrow Y) = \frac{\text{support}(X \cup Y)}{\text{support}(X)}
    $$

* **Lift**: Measures how much more likely $X$ and $Y$ are to co-occur than if they were independent. **lift > 1** suggests a positive correlation, making the rule more interesting than random chance.

    $$
    \text{Lift}(X \rightarrow Y) = \frac{\text{support}(X \cup Y)}{\text{support}(X) \times \text{support}(Y)}
    $$

### **Your Task**

Now, you'll generate rules from **all** the frequent itemsets you discovered (`freq_itemsets`), not just pairs.

1.  **Iterate Through Frequent Itemsets**: Loop through each level in your `freq_itemsets` list, starting from itemsets of size 2.

2.  **Generate Candidate Rules**: For each frequent itemset `I`, you need to generate all possible rules. This involves splitting `I` into an antecedent `X` and a consequent `Y`.
    * For a given frequent itemset `I`, find all of its non-empty, proper subsets. Each of these subsets can serve as an antecedent `X`.
    * The consequent `Y` is simply the set of items in `I` that are not in `X`.
    * For example, if `{A, B, C}` is a frequent itemset, you would generate rules like $\{A\} \rightarrow \{B, C\}$, $\{B\} \rightarrow \{A, C\}$, $\{C\} \rightarrow \{A, B\}$, $\{A, B\} \rightarrow \{C\}$, and so on.

3.  **Calculate Confidence and Lift**: For each generated rule $X \rightarrow Y$, calculate its confidence and lift. You will need the support values for the itemsets `X`, `Y`, and their union `X` $\cup$ `Y` (which is the original itemset `I`).

4.  **Filter and Interpret**: Collect all the generated rules into a pandas DataFrame. Filter them based on meaningful thresholds (e.g., `confidence > 0.7`, `lift > 1.2`) and analyze the results. Do more complex rules (with more items) provide deeper insights into pollution dynamics?


## **Bonus Challenge: Introduce a Time Lag**

The current model only looks at same-day increases. A more powerful analysis would be to see if an increase at a station *today* predicts an increase at another station **tomorrow**.

Modify the initial transaction creation process to test this hypothesis. You'll likely need to use Dask DataFrame's `shift()` method to align data from different days before creating the boolean mask and subsequent transactions.