In [None]:
# !pip install -q condacolab -q
# import condacolab
# condacolab.install()
# !conda install geopandas


In [None]:
# !pip install awscli
# !pip install cloudpathlib
# !pip install geopandas
# !pip install rasterio
# !pip install pyhdf
# !pip install cloudpathlib[s3]
# !pip install rtree
# !pip install pqdm

In [None]:
import os
import re
import random
import pickle

from pathlib import Path
os.chdir('/content/drive/MyDrive/datadriven/airathon')
DATA_PATH = Path.cwd() / 'data'
RAW = DATA_PATH / 'raw'
PROCESSED = DATA_PATH / 'processed'

from utils import * 

import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt

from osgeo import gdal
import geopandas as gpd
from pyhdf.SD import SD, SDC, SDS
from typing import Dict, List, Union

import pyproj
from pyproj import CRS, Proj
from pqdm.processes import pqdm

# import multiprocessing
# n_cpus = multiprocessing.cpu_count()

mpl.rcParams['figure.dpi'] = 100


In [None]:
satellite = pd.read_csv(RAW / 'pm25_satellite_metadata.csv', parse_dates=['time_start', 'time_end'], index_col=0)

grid = pd.read_csv(RAW / 'grid_metadata.csv', index_col=0)

train_labels = pd.read_csv(RAW / 'train_labels.csv', parse_dates=["datetime"])
train_labels.rename(columns={'value': 'pm25'}, inplace=True)

submission = pd.read_csv(RAW / 'submission_format.csv', parse_dates=['datetime'])
submission.value = submission.value.replace(0, np.NaN)

In [None]:
train_maiac = satellite[(satellite["product"] == "maiac") & (satellite["split"] == "train")].copy()
datasets1 = ['Optical_Depth_047', 'Optical_Depth_055', 'AOD_Uncertainty', 'Column_WV']
datasets2 = ['cosSZA', 'cosVZA', 'RelAZ', 'Scattering_Angle', 'Glint_Angle']

train_dict = {}
locations = [['la', 'Los Angeles (SoCAB)'], ['dl', 'Delhi'], ['tpe', 'Taipei']]

for loc in locations:
    print(f'Processing: {loc[1]}\n')
    train_dict[loc[0]] = {}

    satellite_subset = train_maiac[train_maiac.location == loc[0]].copy()
    file_paths = list(satellite_subset.us_url)
    grid_subset = grid[grid.location == loc[1]].copy()

    assert grid_subset.index.isin(train_labels.grid_id).all()

    polys = gpd.GeoSeries.from_wkt(grid_subset.wkt, crs=wgs84_crs) 
    polys.name = "geometry"
    polys_gDF = gpd.GeoDataFrame(polys)

    train_dict[loc[0]]['train1'] = parallel_process(file_paths, polys_gDF, datasets1, grid_type='grid1')
    train_dict[loc[0]]['train2'] = parallel_process(file_paths, polys_gDF, datasets2, grid_type='grid2')

    train_labels_subset = train_labels[train_labels.grid_id.isin(grid[grid.location==loc[1]].index)].copy()
    train_dict[loc[0]]['features1'] = calculate_features(feature_df=train_dict[loc[0]]['train1'], labels_df=train_labels_subset, datasets=datasets1, type_='train')
    train_dict[loc[0]]['features2'] = calculate_features(feature_df=train_dict[loc[0]]['train2'], labels_df=train_labels_subset, datasets=datasets2, type_='train')

    train_dict[loc[0]]['train_data'] = pd.merge(train_dict[loc[0]]['features1'], train_dict[loc[0]]['features2'], how='inner')
    train_dict[loc[0]]['train_data'].to_csv(str(PROCESSED / f'train_{loc[0]}.csv'), index=False)


with open(str(PROCESSED / 'train_dict.pickle'), 'wb') as handle:
    pickle.dump(train_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)


Processing: Los Angeles (SoCAB)



QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1065 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1065 [00:00<?, ?it/s]

QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1065 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1065 [00:00<?, ?it/s]

Processing: Delhi



QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1065 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1065 [00:00<?, ?it/s]

QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1065 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1065 [00:00<?, ?it/s]

Processing: Taipei



QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/2130 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/2130 [00:00<?, ?it/s]

QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/2130 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/2130 [00:00<?, ?it/s]

In [None]:
test_maiac = satellite[(satellite["product"] == "maiac") & (satellite["split"] == "test")].copy()
grid_test = grid[grid.index.isin(submission.grid_id)]

datasets1 = ['Optical_Depth_047', 'Optical_Depth_055', 'AOD_Uncertainty', 'Column_WV']
datasets2 = ['cosSZA', 'cosVZA', 'RelAZ', 'Scattering_Angle', 'Glint_Angle']

test_dict = {}
locations = [['la', 'Los Angeles (SoCAB)'], ['dl', 'Delhi'], ['tpe', 'Taipei']]

for loc in locations:
    print(f'Processing: {loc[1]}\n')
    test_dict[loc[0]] = {}

    satellite_subset = test_maiac[test_maiac.location == loc[0]].copy()
    file_paths = list(satellite_subset.us_url)
    grid_subset = grid_test[grid_test.location == loc[1]].copy()

    assert grid_subset.index.isin(submission.grid_id).all()

    polys = gpd.GeoSeries.from_wkt(grid_subset.wkt, crs=wgs84_crs) 
    polys.name = "geometry"
    polys_gDF = gpd.GeoDataFrame(polys)

    test_dict[loc[0]]['test1'] = parallel_process(file_paths, polys_gDF, datasets1, grid_type='grid1')
    test_dict[loc[0]]['test2'] = parallel_process(file_paths, polys_gDF, datasets2, grid_type='grid2')

    submission_subset = submission[submission.grid_id.isin(grid[grid.location==loc[1]].index)].copy()
    test_dict[loc[0]]['features1'] = calculate_features(feature_df=test_dict[loc[0]]['test1'], labels_df=submission_subset, datasets=datasets1, type_='test')
    test_dict[loc[0]]['features2'] = calculate_features(feature_df=test_dict[loc[0]]['test2'], labels_df=submission_subset, datasets=datasets2, type_='test')

    test_dict[loc[0]]['test_data'] = pd.merge(test_dict[loc[0]]['features1'], test_dict[loc[0]]['features2'], how='inner')
    test_dict[loc[0]]['test_data'].to_csv(str(PROCESSED / f'test_{loc[0]}.csv'), index=False)


with open(str(PROCESSED / 'test_dict.pickle'), 'wb') as handle:
    pickle.dump(test_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)


Processing: Los Angeles (SoCAB)



QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/611 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/611 [00:00<?, ?it/s]

QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/611 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/611 [00:00<?, ?it/s]

Processing: Delhi



QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/611 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/611 [00:00<?, ?it/s]

QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/611 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/611 [00:00<?, ?it/s]

Processing: Taipei



QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1222 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1222 [00:00<?, ?it/s]

QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1222 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1222 [00:00<?, ?it/s]