_Note: This is intended to be run as an EMR notebook_

In [1]:
import pandas as pd
import boto3
import time
import dask.dataframe as dd
from dask_yarn import YarnCluster
from dask.distributed import Client

In [2]:
# Create a cluster where each worker has 1 cores and 4 GiB of memory:
cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz",
                      worker_vcores = 1,
                      worker_memory = "4GiB"
                      )

# Scale cluster out to 8 such workers:
cluster.scale(8)

# Connect to the cluster (before proceeding, you should wait for workers to be registered by the dask scheduler, as below):
client = Client(cluster)

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tcp://172.31.10.195:44409
distributed.scheduler - INFO -   dashboard at:                    :43499
distributed.scheduler - INFO - Receive client connection: Client-165af528-c235-11eb-a92b-0283bac7ac51
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.31.5.202:39943', name: dask.worker_5, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://172.31.5.202:39943
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.31.5.202:44887', name: dask.worker_2, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://172.31.5.202:44887
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.31.15.167:396

In [4]:
client

0,1
Client  Scheduler: tcp://172.31.10.195:44409  Dashboard: /proxy/43499/status,Cluster  Workers: 8  Cores: 8  Memory: 32.00 GiB


In [22]:
# We can't read a gzipped file into task directly, so we'll read it into
# pandas and then read it into a dask dataframe
df = pd.read_csv('https://landsat-pds.s3.amazonaws.com/c1/L8/scene_list.gz')
scene_list = dd.from_pandas(df, npartitions=8)

# Getting really odd behavior when reading CSV directly into dask, so read it into
# pandas first, then dask
df = pd.read_csv('https://detecting-poverty.s3.amazonaws.com/lscale/point2pathrow.csv')
pathrows = dd.from_pandas(df, npartitions=8)

In [23]:
# Add columns for scene date
scene_list['year'] = scene_list['acquisitionDate'].str[:4].astype(int)
scene_list['month'] = scene_list['acquisitionDate'].str[5:7].astype(int)
scene_list['day'] = scene_list['acquisitionDate'].str[8:10].astype(int)
scene_list['day_of_year'] = scene_list['month'] * scene_list['day']

pathrows['year'] = 2016  # Pathrows file should have a year column
pathrows['path'] = pathrows['path'].astype(int)
pathrows['row'] = pathrows['row'].astype(int)
pathrows['lonlat'] = pathrows['# lon'] + pathrows['lat']

In [24]:
# We only want scenes corresponding to a certain year
pathrows_mg = pathrows.merge(scene_list, on=['path', 'row', 'year'])

# Get min day of year for each pathrow. Can't get this to work in dask, so compute and
# convert back to dask (returns <800 rows, so not blowing up memory)
min_day = pd.DataFrame(pathrows_mg.groupby(['path', 'row'])['day_of_year'].min().compute()).reset_index()
min_day_dd = dd.from_pandas(min_day, npartitions=8)

distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.84s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.


In [25]:
# Get only those scenes corresponding to min day of year for which we have scene
pathrows_filt = pathrows_mg.merge(min_day_dd, on=['path', 'row', 'day_of_year'])

# Drop duplicate rows (e.g., if there are multiple scenes on same day)
pathrows_filt = pathrows_filt.drop_duplicates(subset='lonlat', ignore_index=True)

In [26]:
# Add columns with links to bands
pathrows_filt['link_prefix'] = pathrows_filt['download_url'].str.replace('index.html', '') \
                                + pathrows_filt['productId'] + '_'
pathrows_filt['B2_link'] = pathrows_filt['link_prefix'] + 'B2.TIF'
pathrows_filt['B3_link'] = pathrows_filt['link_prefix'] + 'B3.TIF'
pathrows_filt['B4_link'] = pathrows_filt['link_prefix'] + 'B4.TIF'

# Keep only columns we need
pathrows_filt = pathrows_filt[['# lon', 'lat', 'path', 'row',
                               'B2_link', 'B3_link', 'B4_link']]

  out = getattr(getattr(obj, accessor, obj), attr)(*args, **kwargs)


In [27]:
pathrows_export = pathrows_filt.compute()

distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.67s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.


In [28]:
pathrows_export.head()

Unnamed: 0,# lon,lat,path,row,B2_link,B3_link,B4_link
0,6.22495,11.044828,190,52,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...
1,5.104116,11.40166,190,52,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...
2,5.562867,11.632577,190,52,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...
3,6.513935,11.53564,190,52,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...
4,5.144629,10.999051,190,52,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...,https://s3-us-west-2.amazonaws.com/landsat-pds...


In [29]:
pathrows_export.shape

(999827, 7)

In [30]:
pathrows_export.iloc[0, -1]

'https://s3-us-west-2.amazonaws.com/landsat-pds/c1/L8/190/052/LC08_L1TP_190052_20160102_20170404_01_T1/LC08_L1TP_190052_20160102_20170404_01_T1_B4.TIF'

In [44]:
# Export to CSV
# df.to_csv('my_file.gz', compression='gzip')
pathrows_export.to_csv('pathrow_scenelinks.zip', index=False, compression='zip')