In [None]:
import os
import re
import rasterio
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import traceback
import dask.dataframe as dd
from dask import delayed
import re
from datetime import datetime, timezone
import numpy as np

# Define the correct band names for columns in the DataFrame
band_names = [
    "BCANGSTR", "BCCMASS", "BCEXTTAU", "BCFLUXU", "BCFLUXV", "BCSCATAU", "BCSMASS",
    "DMSCMASS", "DMSSMASS", "DUANGSTR", "DUCMASS25", "DUCMASS", "DUEXTT25",
    "DUEXTTAU", "DUFLUXU", "DUFLUXV", "DUSCAT25", "DUSCATAU", "DUSMASS25",
    "DUSMASS", "OCANGSTR", "OCCMASS", "OCEXTTAU", "OCFLUXU", "OCFLUXV", "OCSCATAU",
    "OCSMASS", "SO2CMASS", "SO2SMASS", "SO4CMASS", "SO4SMASS", "SSANGSTR",
    "SSCMASS25", "SSCMASS", "SSEXTT25", "SSEXTTAU", "SSFLUXU", "SSFLUXV",
    "SSSCAT25", "SSSCATAU", "SSSMASS25", "SSSMASS", "SUANGSTR", "SUEXTTAU",
    "SUFLUXU", "SUFLUXV", "SUSCATAU", "TOTANGSTR", "TOTEXTTAU", "TOTSCATAU"
]

# Function to save the DataFrame to a Parquet file in UTC with TIMESTAMPZ format
def save_to_parquet(df, date_time):
    try:
        base_dir = "Z:\\Thesis\\Data\\GEE\\MERRA2_aer\\MERRA2_num_data\\thesis"
        
        year_str = str(date_time.year)
        month_str = f"{date_time.month:02d}"
        day_str = f"{date_time.day:02d}"
        
        parquet_dir = os.path.join(base_dir, year_str, month_str)
        os.makedirs(parquet_dir, exist_ok=True)

        parquet_file_path = os.path.join(parquet_dir, f"merra2_numerical_data_{year_str}_{month_str}_{day_str}.parquet")

        table = pa.Table.from_pandas(df)
        pq.write_table(table, parquet_file_path)
        
        print(f"Data successfully saved to Parquet file: {parquet_file_path}")
        return parquet_file_path

    except Exception as e:
        print(f"Error saving data to Parquet: {e}")
        traceback.print_exc()

# Function to process all files for a single day and save as one Parquet file
def process_files_for_day(date_key, file_paths):
    try:
        tasks = []
        for file_path in file_paths:
            file_name = os.path.basename(file_path)
            print(f"Processing file: {file_name}")
            
            with rasterio.open(file_path) as dataset:
                band_indexes = list(range(1, dataset.count + 1))
                band_names = [f'band_{i}' for i in band_indexes]
                
                # Parse the datetime from the file name
                match = re.search(r'(\d{4})(\d{2})(\d{2})(\d{2})', file_name)
                if match:
                    year, month, day, hour = map(int, match.groups())
                    date_time = datetime(year, month, day, hour, tzinfo=timezone.utc)
                else:
                    print(f"Skipping file with unmatched date format: {file_name}")
                    continue

                # Extract transform once, avoid reopening dataset in the task
                transform = dataset.transform

                for ji, window in dataset.block_windows(1):
                    # Read data and add each chunk as a task
                    band_data = dataset.read(window=window, indexes=band_indexes)
                    
                    @delayed
                    def process_chunk(band_data, transform, window, date_time):
                        rows, cols = np.meshgrid(
                            np.arange(window.row_off, window.row_off + window.height),
                            np.arange(window.col_off, window.col_off + window.width),
                            indexing='ij'
                        )
                        lon, lat = rasterio.transform.xy(transform, rows, cols, offset='center')
                        lat, lon = np.array(lat).flatten(), np.array(lon).flatten()

                        band_data_reshaped = band_data.reshape(len(band_indexes), -1).T
                        chunk_df = pd.DataFrame(band_data_reshaped, columns=band_names)
                        chunk_df['lat'] = lat
                        chunk_df['lon'] = lon
                        chunk_df['time'] = pd.Timestamp(date_time, tz=timezone.utc)

                        return chunk_df
                    
                    # Add delayed task to the list
                    tasks.append(process_chunk(band_data, transform, window, date_time))
        
        # Aggregate tasks and process into Dask DataFrame
        if tasks:
            full_ddf = dd.from_delayed(tasks)
            
            # Repartition based on row count to optimize memory usage
            chunk_size = int(1e6)  # Approximate target size per partition
            num_partitions = max(1, int(full_ddf.shape[0].compute() / chunk_size))
            full_ddf = full_ddf.repartition(npartitions=num_partitions)

            # Compute and save to Parquet
            final_df = full_ddf.compute()
            parquet_file_path = save_to_parquet(final_df, date_key)

            print(f"Data processing complete for date {date_key} and saved to {parquet_file_path}")
        else:
            print(f"No data to process for date {date_key}.")
    
    except Exception as e:
        print(f"Error processing files for date {date_key}: {e}")
        import traceback
        traceback.print_exc()


# Process function to iterate over files for years and include the hourly timestamp
def process_aod_ducmass_data(start_year, end_year):
    base_dir = "Z:\\Thesis\\Data\\GEE\\MERRA2_aer\\AOD_ducmass"
    
    for year in range(start_year, end_year + 1):
        year_dir = os.path.join(base_dir, str(year))
        
        if os.path.exists(year_dir):
            daily_files = {}
            for file_name in sorted(os.listdir(year_dir)):
                if file_name.endswith(".tif"):
                    local_file_path = os.path.join(year_dir, file_name)
                    match = re.search(r'(\d{4})(\d{2})(\d{2})(\d{2})', file_name)
                    if match:
                        year, month, day, hour = map(int, match.groups())
                        date_time = datetime(year, month, day, hour, tzinfo=timezone.utc)
                        date_key = date_time.date()
                        
                        if date_key not in daily_files:
                            daily_files[date_key] = []
                        daily_files[date_key].append(local_file_path)

            for date_key, file_paths in daily_files.items():
                print(f"Processing files for date: {date_key}")
                process_files_for_day(date_key, file_paths)
        else:
            print(f"Year directory {year_dir} not found.")

# Process data for AOD DUCMASS from 1980 to 2023
process_aod_ducmass_data(1980, 1981)

print("Processing complete for all years!")

Processing files for date: 1980-01-01
Processing file: 1980010100.tif
Processing file: 1980010101.tif
Processing file: 1980010102.tif
Processing file: 1980010103.tif
Processing file: 1980010104.tif
Processing file: 1980010105.tif
Processing file: 1980010106.tif
Processing file: 1980010107.tif
Processing file: 1980010108.tif
Processing file: 1980010109.tif
Processing file: 1980010110.tif
Processing file: 1980010111.tif
Processing file: 1980010112.tif
Processing file: 1980010113.tif
Processing file: 1980010114.tif
Processing file: 1980010115.tif
Processing file: 1980010116.tif
Processing file: 1980010117.tif
Processing file: 1980010118.tif
Processing file: 1980010119.tif
Processing file: 1980010120.tif
Processing file: 1980010121.tif
Processing file: 1980010122.tif
Processing file: 1980010123.tif
Error processing files for date 1980-01-01: Cannot pass a datetime or Timestamp with tzinfo with the tz parameter. Use tz_convert instead.
Processing files for date: 1980-01-02
Processing file: 

Traceback (most recent call last):
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 101, in process_files_for_day
    full_ddf = dd.from_delayed(tasks)
               ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask_expr\io\_delayed.py", line 118, in from_delayed
    meta = delayed(make_meta)(dfs[0]).compute()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 372, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 660, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 92, in process_chunk
    chunk_df['time'] = pd.Timestamp(date_time, tz=timezone.utc)
                 

Processing file: 1980010202.tif
Processing file: 1980010203.tif
Processing file: 1980010204.tif
Processing file: 1980010205.tif
Processing file: 1980010206.tif
Processing file: 1980010207.tif
Processing file: 1980010208.tif
Processing file: 1980010209.tif
Processing file: 1980010210.tif
Processing file: 1980010211.tif
Processing file: 1980010212.tif
Processing file: 1980010213.tif
Processing file: 1980010214.tif
Processing file: 1980010215.tif
Processing file: 1980010216.tif
Processing file: 1980010217.tif
Processing file: 1980010218.tif
Processing file: 1980010219.tif
Processing file: 1980010220.tif
Processing file: 1980010221.tif
Processing file: 1980010222.tif
Processing file: 1980010223.tif
Error processing files for date 1980-01-02: Cannot pass a datetime or Timestamp with tzinfo with the tz parameter. Use tz_convert instead.
Processing files for date: 1980-01-03
Processing file: 1980010300.tif
Processing file: 1980010301.tif


Traceback (most recent call last):
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 101, in process_files_for_day
    full_ddf = dd.from_delayed(tasks)
               ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask_expr\io\_delayed.py", line 118, in from_delayed
    meta = delayed(make_meta)(dfs[0]).compute()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 372, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 660, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 92, in process_chunk
    chunk_df['time'] = pd.Timestamp(date_time, tz=timezone.utc)
                 

Processing file: 1980010302.tif
Processing file: 1980010303.tif
Processing file: 1980010304.tif
Processing file: 1980010305.tif
Processing file: 1980010306.tif
Processing file: 1980010307.tif
Processing file: 1980010308.tif
Processing file: 1980010309.tif
Processing file: 1980010310.tif
Processing file: 1980010311.tif
Processing file: 1980010312.tif
Processing file: 1980010313.tif
Processing file: 1980010314.tif
Processing file: 1980010315.tif
Processing file: 1980010316.tif
Processing file: 1980010317.tif
Processing file: 1980010318.tif
Processing file: 1980010319.tif
Processing file: 1980010320.tif
Processing file: 1980010321.tif
Processing file: 1980010322.tif
Processing file: 1980010323.tif
Error processing files for date 1980-01-03: Cannot pass a datetime or Timestamp with tzinfo with the tz parameter. Use tz_convert instead.
Processing files for date: 1980-01-04
Processing file: 1980010400.tif
Processing file: 1980010401.tif


Traceback (most recent call last):
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 101, in process_files_for_day
    full_ddf = dd.from_delayed(tasks)
               ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask_expr\io\_delayed.py", line 118, in from_delayed
    meta = delayed(make_meta)(dfs[0]).compute()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 372, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 660, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 92, in process_chunk
    chunk_df['time'] = pd.Timestamp(date_time, tz=timezone.utc)
                 

Processing file: 1980010402.tif
Processing file: 1980010403.tif
Processing file: 1980010404.tif
Processing file: 1980010405.tif
Processing file: 1980010406.tif
Processing file: 1980010407.tif
Processing file: 1980010408.tif
Processing file: 1980010409.tif
Processing file: 1980010410.tif
Processing file: 1980010411.tif
Processing file: 1980010412.tif
Processing file: 1980010413.tif
Processing file: 1980010414.tif
Processing file: 1980010415.tif
Processing file: 1980010416.tif
Processing file: 1980010417.tif
Processing file: 1980010418.tif
Processing file: 1980010419.tif
Processing file: 1980010420.tif
Processing file: 1980010421.tif
Processing file: 1980010422.tif
Processing file: 1980010423.tif
Error processing files for date 1980-01-04: Cannot pass a datetime or Timestamp with tzinfo with the tz parameter. Use tz_convert instead.
Processing files for date: 1980-01-05
Processing file: 1980010500.tif
Processing file: 1980010501.tif


Traceback (most recent call last):
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 101, in process_files_for_day
    full_ddf = dd.from_delayed(tasks)
               ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask_expr\io\_delayed.py", line 118, in from_delayed
    meta = delayed(make_meta)(dfs[0]).compute()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 372, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 660, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 92, in process_chunk
    chunk_df['time'] = pd.Timestamp(date_time, tz=timezone.utc)
                 

Processing file: 1980010502.tif
Processing file: 1980010503.tif
Processing file: 1980010504.tif
Processing file: 1980010505.tif
Processing file: 1980010506.tif
Processing file: 1980010507.tif
Processing file: 1980010508.tif
Processing file: 1980010509.tif
Processing file: 1980010510.tif
Processing file: 1980010511.tif
Processing file: 1980010512.tif
Processing file: 1980010513.tif
Processing file: 1980010514.tif
Processing file: 1980010515.tif
Processing file: 1980010516.tif
Processing file: 1980010517.tif
Processing file: 1980010518.tif
Processing file: 1980010519.tif
Processing file: 1980010520.tif
Processing file: 1980010521.tif
Processing file: 1980010522.tif
Processing file: 1980010523.tif
Error processing files for date 1980-01-05: Cannot pass a datetime or Timestamp with tzinfo with the tz parameter. Use tz_convert instead.
Processing files for date: 1980-01-06
Processing file: 1980010600.tif
Processing file: 1980010601.tif


Traceback (most recent call last):
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 101, in process_files_for_day
    full_ddf = dd.from_delayed(tasks)
               ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask_expr\io\_delayed.py", line 118, in from_delayed
    meta = delayed(make_meta)(dfs[0]).compute()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 372, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\anaconda3\envs\gee\Lib\site-packages\dask\base.py", line 660, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Charl\AppData\Local\Temp\ipykernel_21264\293223962.py", line 92, in process_chunk
    chunk_df['time'] = pd.Timestamp(date_time, tz=timezone.utc)
                 

Processing file: 1980010602.tif
Processing file: 1980010603.tif
Processing file: 1980010604.tif
Processing file: 1980010605.tif
Processing file: 1980010606.tif
Processing file: 1980010607.tif
Processing file: 1980010608.tif
Processing file: 1980010609.tif
Processing file: 1980010610.tif
Processing file: 1980010611.tif
Processing file: 1980010612.tif
Processing file: 1980010613.tif
Processing file: 1980010614.tif
Processing file: 1980010615.tif
Processing file: 1980010616.tif
Processing file: 1980010617.tif
Processing file: 1980010618.tif
Processing file: 1980010619.tif
Processing file: 1980010620.tif
Processing file: 1980010621.tif
Processing file: 1980010622.tif
Processing file: 1980010623.tif
