In [162]:
import geopandas as gpd
from shapely.geometry import Polygon, Point
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
import uuid

# Generate grids for LA region

In [163]:
def convert_to_geopanda(df):
  ''' df must have a 'wkt' column'''
  df['geometry'] = gpd.GeoSeries.from_wkt(df['wkt']) #convert the wkt column to a polygon object
  df = df.drop(labels=['wkt'], axis=1)
  df = gpd.GeoDataFrame(df, geometry='geometry')     # set the wkt column to be the geometry column

  #idk what this is but it fixed the areas (they were like 0.001...)
  # df.crs = 'epsg:4326'
  # df = df.to_crs("+proj=cea +lat_0=35.68250088833567 +lon_0=139.7671 +units=m")

  df['area'] = df.area /1000000                                   #it looks like its computing areas in meters, so divide by 1mil
  
  df['boundary'] = df.boundary                        #compute the boundaries of each grid
  df['centroid'] = df.centroid                         #compute centroids

  return df

In [164]:
location_polygons = convert_to_geopanda(pd.read_csv('/mnt/d/airdata/la_metadata_with_station_v2.csv'))

grid_metadata = pd.read_csv("/mnt/d/airdata/grid_metadata.csv").set_index('grid_id')
grid_metadata = convert_to_geopanda(grid_metadata)
grid_to_color = create_colors(grid_metadata, 'hsv')
g = pd.DataFrame.from_dict(grid_to_color).transpose()
g['color'] = g.values.tolist()
grid_metadata = grid_metadata.join(g['color'].map(lambda x: tuple(x)))

In [165]:
location_polygons['centroid'].total_bounds

array([-118.54018764,   33.66484012, -116.87830437,   34.18660804])

In [166]:
def w_h(df):
    

    minx, miny, maxx, maxy = df['geometry'].bounds

    width = maxx - minx
    height = maxy - miny
    return width, height


location_polygons[['width', 'height']] = location_polygons.apply(w_h, axis=1, result_type ='expand')

In [167]:
location_polygons['width'].mean(), location_polygons['width'].std()

(0.04491576420599367, 3.005976711873475e-14)

In [168]:
location_polygons['height'].mean(), location_polygons['height'].std()

(0.037240274823499404, 6.89354812462686e-05)

In [169]:
minx, miny, maxx, maxy = location_polygons['centroid'].total_bounds
box_y = location_polygons['height'].mean()
box_x = location_polygons['width'].mean()

xs = np.arange(minx, maxx, box_x)
ys = np.arange(miny, maxy, box_y)

centers = []
for x in xs:
    for y in ys:
        p1 = Point(x+box_x, y+box_y)
        p2 = Point(x-box_x, y+box_y)
        p3 = Point(x+box_x, y-box_y)
        p4 = Point(x-box_x, y-box_y)
        centers.append(Polygon([p1, p2, p3, p4, p1]))

In [170]:
lat = [i.centroid.x for i in centers] 
long = [i.centroid.y for i in centers] 

In [171]:
df = pd.DataFrame({'wkt': centers, 'Latitudes': lat, 'Longitudes': long})
df.to_csv('la_generated_grid.csv', index=False)

# Extract AOD values


Note, we need to restart the kernel everytime because of a bug in multiprocess

In [1]:
import random
from pathlib import Path
from typing import Dict, List, Union

import geopandas as gpd
import pandas as pd
from cloudpathlib import S3Client, S3Path
from pqdm.processes import pqdm
# from pqdm.threads import pqdm 
from pyproj import CRS, Proj

import os
import re

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pyproj
from pyhdf.SD import SD, SDC, SDS

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
DATA_PATH = Path("/mnt/d/airdata")
RAW = DATA_PATH / "raw"
INTERIM = DATA_PATH / "interim"
pm_md = pd.read_csv(
    INTERIM / "pm25_satellite_metadata.csv",
        parse_dates=["time_start", "time_end"],
    index_col=0
)

grid_md = pd.read_csv(
    DATA_PATH / "la_generated_grid.csv",
)
grid_md['location'] = "Los Angeles (SoCAB)"
grid_md['grid_id'] = grid_md['wkt'].apply(lambda x: hash(str(x)) )

In [3]:
def transform_arrays(
    xv: Union[np.array, float],
    yv: Union[np.array, float],
    crs_from: CRS,
    crs_to: CRS
):
    """Transform points or arrays from one CRS to another CRS.
    
    Args:
        xv (np.array or float): x (longitude) coordinates or value.
        yv (np.array or float): y (latitude) coordinates or value.
        crs_from (CRS): source coordinate reference system.
        crs_to (CRS): destination coordinate reference system.
    
    Returns:
        lon, lat (tuple): x coordinate(s), y coordinate(s)
    """
    transformer = pyproj.Transformer.from_crs(
        crs_from,
        crs_to,
        always_xy=True,
    )
    lon, lat = transformer.transform(xv, yv)
    return lon, lat
def calculate_features(
    feature_df: gpd.GeoDataFrame,
):
    """Given processed AOD data and training labels (train) or 
    submission format (test), return a feature GeoDataFrame that contains
    features for mean, max, and min AOD.
    
    Args:
        feature_df (gpd.GeoDataFrame): GeoDataFrame that contains
            Points and associated values.
        label_df (pd.DataFrame): training labels (train) or
            submission format (test).
        stage (str): "train" or "test".
    
    Returns:
        full_data (gpd.GeoDataFrame): GeoDataFrame that contains `mean_aod`,
            `max_aod`, and `min_aod` for each grid cell and datetime.   
    """
    # Add `day` column to `feature_df` and `label_df`
    feature_df["datetime"] = pd.to_datetime(
        feature_df.granule_id.str.split("_", expand=True)[0],
        format="%Y%m%dT%H:%M:%S",
        utc=True
    )
    feature_df["day"] = feature_df.datetime.dt.date
    # label_df["day"] = label_df.datetime.dt.date

    # Calculate average AOD per day/grid cell for which we have feature data
    feature_df['geometry'] = feature_df['geometry'].apply(lambda x: str(x))
    avg_aod_day = feature_df.groupby(["day", "geometry"]).agg(
        {"value": ["mean", "min", "max"]}
    )
    avg_aod_day.columns = ["mean_aod", "min_aod", "max_aod"]
    avg_aod_day = avg_aod_day.reset_index()
    return avg_aod_day
    # # Join labels/submission format with feature data
    # how = "inner" if stage == "train" else "left"
    # full_data = pd.merge(
    #     label_df,
    #     avg_aod_day,
    #     how=how,
    #     left_on=["day", "grid_id"],
    #     right_on=["day", "grid_id"]
    # )
    # return full_data

def create_meshgrid(alignment_dict: Dict, shape: List[int]):
    """Given an image shape, create a meshgrid of points
    between bounding coordinates.
    
    Args:
        alignment_dict (Dict): dictionary containing, at a minimum,
            `upper_left` (tuple), `lower_right` (tuple), `crs` (str),
            and `crs_params` (tuple).
        shape (List[int]): dataset shape as a list of
            [orbits, height, width].
    
    Returns:
        xv (np.array): x (longitude) coordinates.
        yv (np.array): y (latitude) coordinates.
    """
    # Determine grid bounds using two coordinates
    x0, y0 = alignment_dict["upper_left"]
    x1, y1 = alignment_dict["lower_right"]
    
    # Interpolate points between corners, inclusive of bounds
    x = np.linspace(x0, x1, shape[2], endpoint=True)
    y = np.linspace(y0, y1, shape[1], endpoint=True)
    
    # Return two 2D arrays representing X & Y coordinates of all points
    xv, yv = np.meshgrid(x, y)
    return xv, yv


def calibrate_data(dataset: SDS, shape: List[int], calibration_dict: Dict):
    """Given a MAIAC dataset and calibration parameters, return a masked
    array of calibrated data.
    
    Args:
        dataset (SDS): dataset in SDS format (e.g. blue band AOD).
        shape (List[int]): dataset shape as a list of [orbits, height, width].
        calibration_dict (Dict): dictionary containing, at a minimum,
            `valid_range` (list or tuple), `_FillValue` (int or float),
            `add_offset` (float), and `scale_factor` (float).
    
    Returns:
        corrected_AOD (np.ma.MaskedArray): masked array of calibrated data
            with a fill value of nan.
    """
    corrected_AOD = np.ma.empty(shape, dtype=np.double)
    for orbit in range(shape[0]):
        data = dataset[orbit, :, :].astype(np.double)
        invalid_condition = (
            (data < calibration_dict["valid_range"][0]) |
            (data > calibration_dict["valid_range"][1]) |
            (data == calibration_dict["_FillValue"])
        )
        data[invalid_condition] = np.nan
        data = (
            (data - calibration_dict["add_offset"]) *
            calibration_dict["scale_factor"]
        )
        data = np.ma.masked_array(data, np.isnan(data))
        corrected_AOD[orbit, : :] = data
    corrected_AOD.fill_value = np.nan
    return corrected_AOD


def convert_array_to_df(
    corrected_arr: np.ma.MaskedArray,
    lat:np.ndarray,
    lon: np.ndarray,
    granule_id: str,
    crs: CRS,
    total_bounds: np.ndarray = None
):
    """Align data values with latitude and longitude coordinates
    and return a GeoDataFrame.
    
    Args:
        corrected_arr (np.ma.MaskedArray): data values for each pixel.
        lat (np.ndarray): latitude for each pixel.
        lon (np.ndarray): longitude for each pixel.
        granule_id (str): granule name.
        crs (CRS): coordinate reference system
        total_bounds (np.ndarray, optional): If provided, will filter out points that fall
            outside of these bounds. Composed of xmin, ymin, xmax, ymax.
    """
    lats = lat.ravel()
    lons = lon.ravel()
    n_orbits = len(corrected_arr)
    size = lats.size
    values = {
        "value": np.concatenate([d.data.ravel() for d in corrected_arr]),
        "lat": np.tile(lats, n_orbits),
        "lon": np.tile(lons, n_orbits),
        "orbit": np.arange(n_orbits).repeat(size),
        "granule_id": [granule_id] * size * n_orbits
        
    }
    
    df = pd.DataFrame(values).dropna()
    if total_bounds is not None:
        x_min, y_min, x_max, y_max = total_bounds
        df = df[df.lon.between(x_min, x_max) & df.lat.between(y_min, y_max)]
    
    gdf = gpd.GeoDataFrame(df)
    gdf["geometry"] = gpd.points_from_xy(gdf.lon, gdf.lat)
    gdf.crs = crs
    return gdf[["granule_id", "orbit", "geometry", "value"]].reset_index(drop=True)


def create_calibration_dict(data: SDS):
    """Define calibration dictionary given a SDS dataset,
    which contains:
        - name
        - scale factor
        - offset
        - unit
        - fill value
        - valid range
    
    Args:
        data (SDS): dataset in the SDS format.
    
    Returns:
        calibration_dict (Dict): dict of calibration parameters.
    """
    return data.attributes()


def create_alignment_dict(hdf: SD):
    """Define alignment dictionary given a SD data file, 
    which contains:
        - upper left coordinates
        - lower right coordinates
        - coordinate reference system (CRS)
        - CRS parameters
    
    Args:
        hdf (SD): hdf data object
    
    Returns:
        alignment_dict (Dict): dict of alignment parameters.
    """
    group_1 = hdf.attributes()["StructMetadata.0"].split("END_GROUP=GRID_1")[0]
    hdf_metadata = dict([x.split("=") for x in group_1.split() if "=" in x])
    alignment_dict = {
        "upper_left": eval(hdf_metadata["UpperLeftPointMtrs"]),
        "lower_right": eval(hdf_metadata["LowerRightMtrs"]),
        "crs": hdf_metadata["Projection"],
        "crs_params": hdf_metadata["ProjParams"]
    }
    return alignment_dict



def preprocess_maiac_data(
    granule_path: str,
    grid_cell_gdf: gpd.GeoDataFrame,
    dataset_name: str,
    total_bounds: np.ndarray = None
):
    """
    Given a granule s3 path and competition grid cells, 
    create a GDF of each intersecting point and the accompanying
    dataset value (e.g. blue band AOD).
    
    Args:
        granule_path (str): a path to a granule on s3.
        grid_cell_gdf (gpd.GeoDataFrame): GeoDataFrame that contains,
            at a minimum, a `grid_id` and `geometry` column of Polygons.
        dataset_name (str): specific dataset name (e.g. "Optical_Depth_047").
        total_bounds (np.ndarray, optional): If provided, will filter out points that fall
            outside of these bounds. Composed of xmin, ymin, xmax, ymax.    
    Returns:
        GeoDataFrame that contains Points and associated values.
    """
    # Load blue band AOD data
    # s3_path = S3Path(granule_path, S3Client(no_sign_request=True))
    hdf = SD(str(granule_path), SDC.READ)
    aod = hdf.select(dataset_name)
    shape = aod.info()[2]

    # Calibrate and align data
    calibration_dict = aod.attributes()
    alignment_dict = create_alignment_dict(hdf)
    corrected_AOD = calibrate_data(aod, shape, calibration_dict)
    xv, yv = create_meshgrid(alignment_dict, shape)
    lon, lat = transform_arrays(xv, yv, sinu_crs, wgs84_crs)

    # Save values that align with granules
    granule_gdf = convert_array_to_df(corrected_AOD, lat, lon, granule_path.name, wgs84_crs, grid_cell_gdf.total_bounds)
    df = gpd.sjoin(grid_cell_gdf, granule_gdf, how="inner")
    
    # Clean up files
    # Path(s3_path.fspath).unlink()
    hdf.end()
    return df.drop(columns="index_right").reset_index()

    
def preprocess_aod_47(granule_paths, grid_cell_gdf, n_jobs=16):
    """
    Given a set of granule s3 paths and competition grid cells, 
    parallelizes creation of GDFs containing AOD 0.47 µm values and points.
    
    Args:
        granule_paths (List): list of paths on s3.
        grid_cell_gdf (gpd.GeoDataFrame): GeoDataFrame that contains,
            at a minimum, a `grid_id` and `geometry` column of Polygons.
        n_jobs (int, Optional): The number of parallel processes. Defaults to 2.
    
    Returns:
        GeoDataFrame that contains Points and associated values for all granules.
    """    
    args = ((gp, grid_cell_gdf, "Optical_Depth_047") for gp in granule_paths)
    
    results = pqdm(args, preprocess_maiac_data, n_jobs=n_jobs, argument_type="args")
    return pd.concat(results)

In [4]:

reg_code, reg_name = ("la", "Los Angeles (SoCAB)")
split = 'train'

maiac_md = pm_md[(pm_md["product"] == "maiac") & (pm_md["split"] == split)].copy()
la_file = maiac_md[maiac_md.location == reg_code].iloc[0]
la_file_path = S3Path(la_file.us_url, S3Client(no_sign_request=True))

hdf = SD(la_file_path.fspath, SDC.READ)
print(hdf.info())
for dataset, metadata in hdf.datasets().items():
    dimensions, shape, _, _ = metadata
    print(f"{dataset}\n    Dimensions: {dimensions}\n    Shape: {shape}")
    
blue_band_AOD = hdf.select("Optical_Depth_047")

name, num_dim, shape, types, num_attr = blue_band_AOD.info()

print(
f"""Dataset name: {name}
Number of dimensions: {num_dim}
Shape: {shape}
Data type: {types}
Number of attributes: {num_attr}"""
)
calibration_dict = blue_band_AOD.attributes()
print(calibration_dict)
raw_attr = hdf.attributes()["StructMetadata.0"]

group_1 = raw_attr.split("END_GROUP=GRID_1")[0]
hdf_metadata = dict([x.split("=") for x in group_1.split() if "=" in x])

# Parse expressions still wrapped in apostrophes
for key, val in hdf_metadata.items():
    try:
        hdf_metadata[key] = eval(val)
    except:
        pass
    
alignment_dict = {
    "upper_left": hdf_metadata["UpperLeftPointMtrs"],
    "lower_right": hdf_metadata["LowerRightMtrs"],
    "crs": hdf_metadata["Projection"],
    "crs_params": hdf_metadata["ProjParams"]
}

# DATA PROCESSING

# Loop over orbits to apply the attributes

corrected_AOD = calibrate_data(blue_band_AOD, shape, calibration_dict)




xv, yv = create_meshgrid(alignment_dict, shape)



# Source: https://spatialreference.org/ref/sr-org/modis-sinusoidal/proj4js/
sinu_crs = Proj(f"+proj=sinu +R={alignment_dict['crs_params'][0]} +nadgrids=@null +wktext").crs
wgs84_crs = CRS.from_epsg("4326")

# Project sinu grid onto wgs84 grid
lon, lat = transform_arrays(xv, yv, sinu_crs, wgs84_crs)


gdf = convert_array_to_df(corrected_AOD, lat, lon, la_file_path.stem, wgs84_crs)

# Identify LA granule filepaths (2019) and grid cells
maiac_md = maiac_md[maiac_md.location == reg_code]

la_fp = sorted(list(Path('/mnt/d/airdata/raw/train/maiac').glob(r'**/*.hdf')))

la_gc = grid_md[grid_md.location == reg_name].copy()

# Load training labels
# train_labels = pd.read_csv(DATA_PATH / "train_labels.csv", parse_dates=["datetime"])
# train_labels.rename(columns={"value": "pm25"}, inplace=True)
la_polys = gpd.GeoSeries.from_wkt(la_gc.wkt, crs=wgs84_crs) # used for WGS 84
la_polys.name = "geometry"
la_polys_gdf = gpd.GeoDataFrame(la_polys)
la_polys_gdf['grid_id'] = la_polys_gdf['geometry'].apply(lambda x: hash(str(x))) 
xmin, ymin, xmax, ymax = la_polys_gdf.total_bounds
gpd.sjoin(la_polys_gdf, gdf.cx[xmin:xmax, ymin:ymax], how="inner").groupby("grid_id")["value"].mean()

# la_train_gdf = preprocess_aod_47(la_fp, la_polys_gdf)
# # la_train_gdf.to_file(f"{reg_code}_{split}_{i}.shp")
# full_data = calculate_features(la_train_gdf)
# full_data.to_csv(f"{reg_code}_{split}.csv", index=False)


(13, 8)
Optical_Depth_047
    Dimensions: ('Orbits:grid1km', 'YDim:grid1km', 'XDim:grid1km')
    Shape: (4, 1200, 1200)
Optical_Depth_055
    Dimensions: ('Orbits:grid1km', 'YDim:grid1km', 'XDim:grid1km')
    Shape: (4, 1200, 1200)
AOD_Uncertainty
    Dimensions: ('Orbits:grid1km', 'YDim:grid1km', 'XDim:grid1km')
    Shape: (4, 1200, 1200)
FineModeFraction
    Dimensions: ('Orbits:grid1km', 'YDim:grid1km', 'XDim:grid1km')
    Shape: (4, 1200, 1200)
Column_WV
    Dimensions: ('Orbits:grid1km', 'YDim:grid1km', 'XDim:grid1km')
    Shape: (4, 1200, 1200)
AOD_QA
    Dimensions: ('Orbits:grid1km', 'YDim:grid1km', 'XDim:grid1km')
    Shape: (4, 1200, 1200)
AOD_MODEL
    Dimensions: ('Orbits:grid1km', 'YDim:grid1km', 'XDim:grid1km')
    Shape: (4, 1200, 1200)
Injection_Height
    Dimensions: ('Orbits:grid1km', 'YDim:grid1km', 'XDim:grid1km')
    Shape: (4, 1200, 1200)
cosSZA
    Dimensions: ('Orbits:grid5km', 'YDim:grid5km', 'XDim:grid5km')
    Shape: (4, 240, 240)
cosVZA
    Dimensions: ('Orb

grid_id
-9209984482513666380    0.122083
-9170713913398490059    0.083295
-9149710595722273778    0.067205
-9136909972871109155    0.110354
-9134378635282384679    0.124435
                          ...   
 9082022442658484871    0.089872
 9085670462040955654    0.064207
 9096064789242280470    0.084000
 9164369667763217457    0.095200
 9217372132709696705    0.077207
Name: value, Length: 551, dtype: float64

In [5]:
p1,p2,p3,p4,p5 = np.array_split(la_polys_gdf, 5)

In [6]:
# la_train_gdf = preprocess_aod_47(la_fp, p1)
# # # la_train_gdf.to_file(f"{reg_code}_{split}_{i}.shp")
# full_data = calculate_features(la_train_gdf)
# full_data.to_csv(f"{reg_code}_{split}_1.csv", index=False)

In [7]:
# la_train_gdf = preprocess_aod_47(la_fp, p2)
# # # la_train_gdf.to_file(f"{reg_code}_{split}_{i}.shp")
# full_data = calculate_features(la_train_gdf)
# full_data.to_csv(f"{reg_code}_{split}_2.csv", index=False)

In [8]:
# la_train_gdf = preprocess_aod_47(la_fp, p3)
# # # la_train_gdf.to_file(f"{reg_code}_{split}_{i}.shp")
# full_data = calculate_features(la_train_gdf)
# full_data.to_csv(f"{reg_code}_{split}_3.csv", index=False)

In [9]:
# la_train_gdf = preprocess_aod_47(la_fp, p4)
# # # la_train_gdf.to_file(f  "{reg_code}_{split}_{i}.shp")
# full_data = calculate_features(la_train_gdf)
# full_data.to_csv(f"{reg_code}_{split}_4.csv", index=False)

In [10]:
la_train_gdf = preprocess_aod_47(la_fp, p5)
# # la_train_gdf.to_file(f"{reg_code}_{split}_{i}.shp")
full_data = calculate_features(la_train_gdf)
full_data.to_csv(f"{reg_code}_{split}_5.csv", index=False)

QUEUEING TASKS | : 4260it [00:00, 7830.94it/s] 
PROCESSING TASKS | : 100%|██████████| 4260/4260 [13:02<00:00,  5.45it/s]
COLLECTING RESULTS | : 100%|██████████| 4260/4260 [00:00<00:00, 896479.61it/s]


# Merge all the data together

In [12]:
import pandas as pd

files = [f'la_train_{i}.csv' for i in range(1, 6)]
dfs = [pd.read_csv(i) for i in files]
df = pd.concat(dfs)

df.to_csv("la_train_grid.csv")

In [15]:
df.groupby('day').count()['geometry'].value_counts()

555    305
554     29
553     22
525     20
551     18
      ... 
54       1
442      1
429      1
191      1
283      1
Name: geometry, Length: 278, dtype: int64