In [1]:
import os

from time import time
from glob import glob

import numpy as np
import pandas as pd
import xarray as xr
from os.path import join
import re
from multiprocessing import Pool
from joblib import Parallel, delayed

from tqdm import tqdm

from shapely import Point
import matplotlib.pyplot as plt


In [2]:
base_path = "/home/patel_zeel/OpenAQ/PurpleAir"
cal_df = pd.read_csv(join(base_path, "california_files.csv.gz"))
cal_df

Unnamed: 0,file_path,location_id,latitude,longitude
0,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,224873,37.80689,-122.213740
1,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,224873,37.80689,-122.213740
2,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,224873,37.80689,-122.213740
3,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,224873,37.80689,-122.213740
4,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,224873,37.80689,-122.213740
...,...,...,...,...
4398782,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,63314,36.97484,-121.989395
4398783,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,63314,36.97484,-121.989395
4398784,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,63314,36.97484,-121.989395
4398785,/home/patel_zeel/OpenAQ/PurpleAir/data/locatio...,63314,36.97484,-121.989395


In [3]:
loc_wise_files = cal_df.groupby("location_id").agg({"file_path": list})
loc_wise_files

Unnamed: 0_level_0,file_path
location_id,Unnamed: 1_level_1
61935,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...
61938,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...
61942,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...
61943,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...
61948,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...
...,...
1049903,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...
1631817,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...
1636543,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...
1637277,[/home/patel_zeel/OpenAQ/PurpleAir/data/locati...


In [4]:
def combine_files(files):
    location_id = re.findall(r"locationid=(\d+)", files[0])[0]
    save_path = join(base_path, f"loc_combined/{location_id}.nc")
    if os.path.exists(save_path):
        return
    dfs = []
    for file in files:
        df = pd.read_csv(file)
        dfs.append(df)
    df = pd.concat(dfs)
    # 2023-02-03T00:00:27-08:00
    df = df[df.parameter == "pm25"]
    df["time"] = pd.to_time(df["time"], format="%Y-%m-%dT%H:%M:%S%z", utc=True)
    df = df.sort_values("time")
    df = df.set_index("time")
    df = df[["value", "location_id"]]
    try:
        df = df.resample("H").agg({"value": np.mean, "location_id": np.max})
    except Exception as e:
        print(location_id)
        raise e
    ds = df.reset_index().set_index(["time", "location_id"]).to_xarray()
    ds = ds.set_coords(lat=("location_id", df.lat.unique()), lon=("location_id", df.lon.unique()))
    # format time as time64[ns]
    ds["time"] = ds["time"].astype("time64[ns]")
    ds.to_netcdf(save_path)


init = time()
with Pool(48) as p:
    _ = list(
        tqdm(
            p.imap(combine_files, loc_wise_files.file_path.values),
            total=len(loc_wise_files.file_path.values),
        )
    )

print(f"Time taken: {(time() - init)/60} minutes")
# tqdm(Parallel(n_jobs=48)(delayed(combine_files)(files) for files in loc_wise_files.file_path.values), total=len(loc_wise_files))

100%|██████████| 11618/11618 [00:02<00:00, 5226.58it/s]


Time taken: 0.05336660941441854 minutes


## Combine all into one

In [5]:
file_paths = glob(join(base_path, f"loc_combined/*.nc"))
len(file_paths)

11618

In [6]:
# partiion file_paths into 32 sets
chunks = np.array_split(file_paths, 64)
# ds = xr.merge([xr.open_dataset(chunks[0][i]) for i in range(len(chunks[0]))])

In [8]:
%%time

def merger(files):
    ds = xr.merge([xr.open_dataset(file) for file in files])
    return ds

ds_list = Parallel(64)(delayed(merger)(files) for files in chunks)

CPU times: user 3.14 s, sys: 5.29 s, total: 8.43 s
Wall time: 19min 24s


The above process took 19 minutes on 64 core machine and took around 200 GB of RAM.

In [9]:
%%time

final_ds = xr.merge(ds_list)

CPU times: user 5min 12s, sys: 2min 20s, total: 7min 32s
Wall time: 7min 34s


The above process took 8 minutes on 64 core machine and took around 250 GB of RAM.

In [10]:
final_ds

In [11]:
final_ds.to_netcdf(join(base_path, "purpleair_california.nc"))

In [12]:
base_path

'/home/patel_zeel/OpenAQ/PurpleAir'