In [59]:
import os
import numpy as np
import botocore
import boto3
from glob import glob
import pandas as pd
import string
from joblib import Parallel, delayed
from time import time

from aqmsp.debug_utils import verbose_print, set_verbose

In [62]:
def create_station_ds(self, save_dir: str, n_jobs: int = 1) -> None:
    """Create a dataset containing latitude-longitude information by reading historical data from OpenAQ. The latitude-longitude information is not available in the raw data files from CPCB.
    
    Args:
        save_dir (str): Directory to save the dataset.
        n_jobs (int, optional): Number of parallel jobs to run. Defaults to 1.
    """
    
    s3 = boto3.client('s3', config=botocore.config.Config(signature_version=botocore.UNSIGNED))
    
    source_bucket = 'openaq-data-archive'
    
    # Get all locations in India provided by CPCB
    prefix = 'records/csv.gz/provider=caaqm/country=in/'
    object = s3.list_objects_v2(Bucket=source_bucket, Prefix=prefix, Delimiter='/')
    
    all_location_prefixes = [common_prefix['Prefix'] for common_prefix in object.get('CommonPrefixes', [])]
    verbose_print(f"Found {len(all_location_prefixes)} locations")
    
    # For each location, get one file
    folder_name = ''.join(np.random.choice(list(string.ascii_lowercase), 10))
    tmp_dir = os.path.join("/tmp", folder_name)
    os.makedirs(tmp_dir, exist_ok=True)
    verbose_print(f"Using temporary directory {tmp_dir}")
    def get_one_file(prefix):
        s3 = boto3.client('s3', config=botocore.config.Config(signature_version=botocore.UNSIGNED))
        object = s3.list_objects_v2(Bucket=source_bucket, Prefix=prefix)
        all_files = [file['Key'] for file in object.get('Contents', [])]
        file = np.random.choice(all_files)
        
        # Download the file
        file_name = file.split('/')[-1]
        download_path = os.path.join(tmp_dir, file_name)
        s3.download_file(source_bucket, file, download_path)
    
    init = time()
    Parallel(n_jobs=n_jobs)(delayed(get_one_file)(prefix) for prefix in all_location_prefixes)
    check_files = glob(os.path.join(tmp_dir, '*.csv.gz'))
    assert len(check_files) == len(all_location_prefixes), f"Downloaded {len(check_files)} files, expected {len(all_location_prefixes)}"
    verbose_print(f"Downloaded all files in {time() - init} seconds")
    
    # Find the latitude-longitude information
    def get_lat_lon(file):
        df = pd.read_csv(file)
        return {df.location.iloc[0]: {"latitude": float(df.latitude.iloc[0]), "longitude": float(df.longitude.iloc[0])}}
    
    init = time()
    lat_lon_dict = Parallel(n_jobs=n_jobs)(delayed(get_lat_lon)(file) for file in check_files)
    verbose_print(f"Found latitude-longitude information for {len(lat_lon_dict)} locations in {time() - init} seconds")
    
    # create a dataframe
    df = pd.DataFrame.from_dict({k: v for d in lat_lon_dict for k, v in d.items()}, orient='index')
    verbose_print(df.head())
    
    # delete all files
    for file in check_files:
        os.remove(file)

set_verbose(True)
create_station_ds(None, None, n_jobs=32)

Found 589 locations
Using temporary directory /tmp/fzgabeunyo
Downloaded all files in 35.51127529144287 seconds


In [15]:
# !aws s3 --no-sign-request cp s3://openaq-data-archive/records/csv.gz/provider=caaqm/country=in/locationid=10484/year=2022/month=10/location-10484-20221031.csv.gz .
!aws s3 --no-sign-request ls s3://openaq-data-archive/records/csv.gz/provider=caaqm/country=in

download: s3://openaq-data-archive/records/csv.gz/provider=caaqm/country=in/locationid=10484/year=2022/month=10/location-10484-20221031.csv.gz to ./location-10484-20221031.csv.gz


In [16]:
import pandas as pd

df = pd.read_csv('location-10484-20221031.csv.gz')
df

Unnamed: 0,location_id,sensors_id,location,datetime,lat,lon,parameter,units,value
0,10484,34781,"Sri Aurobindo Marg, Delhi - DPCC-10484",2022-10-31T06:15:00+05:30,28.531346,77.190156,pm10,µg/m³,309.0
1,10484,34772,"Sri Aurobindo Marg, Delhi - DPCC-10484",2022-10-31T06:15:00+05:30,28.531346,77.190156,pm25,µg/m³,232.0
2,10484,34757,"Sri Aurobindo Marg, Delhi - DPCC-10484",2022-10-31T06:15:00+05:30,28.531346,77.190156,o3,µg/m³,6.1
3,10484,34761,"Sri Aurobindo Marg, Delhi - DPCC-10484",2022-10-31T06:15:00+05:30,28.531346,77.190156,co,µg/m³,1300.0
4,10484,34758,"Sri Aurobindo Marg, Delhi - DPCC-10484",2022-10-31T06:15:00+05:30,28.531346,77.190156,no2,µg/m³,25.4
5,10484,34759,"Sri Aurobindo Marg, Delhi - DPCC-10484",2022-10-31T06:15:00+05:30,28.531346,77.190156,so2,µg/m³,3.8


In [33]:
from io import BytesIO

# S3 object details
bucket_name = 'openaq-data-archive'
object_key = 'records/csv.gz/provider=caaqm/country=in/locationid=10484/year=2022/month=10/location-10484-20221031.csv.gz'

# Read the S3 object into a pandas DataFrame
s3_object = s3.get_object(Bucket=bucket_name, Key=object_key)
# df = pd.read_csv(BytesIO(s3_object['Body'].read()))
# df
s3_object

{'ResponseMetadata': {'RequestId': 'C500FDSVP3JBAEMX',
  'HostId': 'N3vFgGpxdYO3HWuqeXS51PxRLdR/aB7KPW8FceJUPTTN9Mnms5nZTLqwlgbiWeUYTkNVZkB4bAaTjXAMmWeVmw==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'N3vFgGpxdYO3HWuqeXS51PxRLdR/aB7KPW8FceJUPTTN9Mnms5nZTLqwlgbiWeUYTkNVZkB4bAaTjXAMmWeVmw==',
   'x-amz-request-id': 'C500FDSVP3JBAEMX',
   'date': 'Fri, 22 Sep 2023 04:54:10 GMT',
   'last-modified': 'Mon, 31 Oct 2022 18:34:12 GMT',
   'etag': '"946df341881e1eb43fcb83a2beff45e0"',
   'accept-ranges': 'bytes',
   'content-type': 'binary/octet-stream',
   'server': 'AmazonS3',
   'content-length': '246'},
  'RetryAttempts': 0},
 'AcceptRanges': 'bytes',
 'LastModified': datetime.datetime(2022, 10, 31, 18, 34, 12, tzinfo=tzutc()),
 'ContentLength': 246,
 'ETag': '"946df341881e1eb43fcb83a2beff45e0"',
 'ContentType': 'binary/octet-stream',
 'Metadata': {},
 'Body': <botocore.response.StreamingBody at 0x7f546d4104c0>}

In [35]:
s3_object['Body'].read()

b'\x1f\x8b\x08\x00#\x15`c\x02\xff\xb4\xd0Mj\xc30\x10\x05\xe0}N!\xb2\xedX\x1d\xfdY\xb2w%\xd9\x16\n\xed\xbe\xa8\xb1H\x05\xb2\x14$\xb9\x17+\xbd@N\xd6\xb4\x14\xdb=\x80w\x8f\xc7\xc0\xc7\xbc\x90N\xb6\xfa\x14_\xfd\x00\xc5\xc5\x92r\xf9\x89\xe1\xaf\x86\xc1VW\xfd\xe8 \xd8zk#\\l\xb6\xa3\xab.\xc3\x14}-\xf0a\xc3\xe4v\x0c\xa5\x91 \xa46\x0c\xf6\xcf\xd9\x93\x87)\xa77\x1f\x87D\x1em>\x039\xba\xf0\xeeIC\x8eO\x87C\xf3{\xbd\x07\x8e\x9c\xdfr#\xd8\x0b\xb6=S=\xe2\x1d\xaa^ pC\x95`B\xb6\xa05e\x1d2\xd5\xc2ed\x08\xd7\xaf\xf3\xfdx\xfd\x04\x81\x1d\xc5E\xd5|;\x95\xabY\xe5\x82\xafU\xa5\xb7R\x93\x98\xcd\x96\xb2El7[\xf7\x94f\x91\t\xc4\x7fo\x9a\xad\xd0\x98\xf8\xb2\xad\xa2rev[\x99ee\njv\xdf\x00\x00\x00\xff\xff\x03\x00t\x1e\xb8\xa0\x01\x03\x00\x00'