In [1]:
from firebird import chip
from firebird import aardvark as av
from test import fixtures as fx

import numpy as np

from pyspark import SparkContext, SparkConf
from datetime import datetime

In [2]:
#export PYSPARK_DRIVER_PYTHON=python3
#export PYSPARK_PYTHON=python3
LPW_MESOS_MASTER              = "mesos://192.168.32.3:5050"
LPW_EXECUTOR_IMAGE            = "usgseros/lcmap-pyccd-worker:spark"
LPW_EXECUTOR_CORES            = 3
LPW_EXECUTOR_FORCE_PULL_IMAGE = "true"
LPW_SPARK_PARALLELIZATION     = 2

In [3]:
bands = ('blue', 'cfmask', 'green', 'nir', 'red', 'swir1', 'swir2', 'thermal')
g = globals()

In [4]:
# retrieve chips and specs
for band in bands:
    # chip specs per band... pertinent data should be consistent across bands, no? 
    g[band+'_specs'] = fx.chip_specs(band)
    
    # retrieve chips
    g[band+'_chips'] = av.sort([i for i in fx.chips(band)])
    
    # list of observation dates for this bands chips
    g[band+'_dates'] = av.dates(g[band+'_chips'])

In [5]:
# dates shared across all the bands
shared_dates = av.intersection([g[i+'_dates'] for i in bands])

In [6]:
for band in bands:
    # only want data for shared dates
    g[band+"_trimmed"] = av.trim(g[band+"_chips"], shared_dates)
    
    # chip data as numpy array
    g[band+"_numpy"] = [i for i in av.to_numpy(g[band+"_trimmed"], av.byubid(g[band+"_specs"]))]
    
    # band rods
    g[band+"_rods"] = av.rods(g[band+"_numpy"])
    
    # numpy array of coordinates (lon, lat) for each pixel 
    g[band+"_locs"] = chip.locations(g[band+"_specs"][0]['chip_x'], 
                                     g[band+"_specs"][0]['chip_y'], 
                                     g[band+'_specs'][0])
    
    # tie our rods to the correct lon and lat
    g[band+"_assoc"] = av.assoc(g[band+"_locs"], g[band+"_rods"])

In [7]:
# we now have a dictionary per band, keys being the lon/lat for a rods worth of band values, sorted by date
blue_assoc[(4920.0, -5850.0)]

array([ 8315,  5289, -9999, ...,  1163,  5220,   780], dtype=int16)

In [8]:
print(len(blue_locs))
print(len(blue_locs[0]))

100
100


In [9]:
len(blue_locs)

100

In [12]:
pixel_coords = np.reshape(blue_locs, (10000, 2))

In [16]:
blue_assoc[tuple(pixel_coords[0])]

array([8452, 3687, 6506, ..., 1400, 4912,  612], dtype=int16)

In [17]:
# what we need is a list of tuples:  [((pixel coordinate pair), {dict of band rods})]
rdd_able = []
for coords in pixel_coords:
    _d = dict()
    for band in bands:
        _d[band] = g[band+"_assoc"][tuple(coords)]
    _t = (tuple(coords), _d)
    rdd_able.append(_t)

In [18]:
len(rdd_able)

10000

In [19]:
conf = (SparkConf().setAppName("foo").setMaster(LPW_MESOS_MASTER)\
        .set("spark.mesos.executor.docker.image", LPW_EXECUTOR_IMAGE)\
        .set("spark.executor.cores", LPW_EXECUTOR_CORES)\
        .set("spark.mesos.executor.docker.forcePullImage", LPW_EXECUTOR_FORCE_PULL_IMAGE))

In [20]:
sc = SparkContext(conf=conf)

Could not find valid SPARK_HOME while searching ['/home/caustin/workspace/lcmap-firebird', '/home/caustin/workspace/spark/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark', '/home/caustin/workspace/spark/dist/python/lib/pyspark', '/home/caustin/workspace/spark/dist/python']


NameError: name 'exit' is not defined

In [None]:
# spark rdd.parallelize the coords numpy array
# rdd = sc.parallelize(np.reshape(blue_locs, 10000, 2), 10000)
# 

In [17]:
# 10 vs 100 to avoid overwhelming the notebook
for x in range(0, 10):
    for y in range(0, 10):
        print("x: {}, y: {}".format(x, y))
        _coords = tuple(blue_locs[x][y]) 
        print(_coords)
        print(blue_assoc[_coords])
        # wrapper for ccd.detect to be run by spark
        # run_detect(_coords, chip_x, chip_y, 
        #            blue_assoc[_coords],
        #            cfmask_assoc[_coords],
        #            green_assoc[_coords],
        #            nir_assoc[_coords],
        #            red_assoc[_coords],
        #            swir1_assoc[_coords],
        #            swir2_assoc[_coords],
        #            thermal_assoc[_coords])
        

x: 0, y: 0
(3000.0, -3000.0)
[8452 3687 6506 ..., 1400 4912  612]
x: 0, y: 1
(3000.0, -3030.0)
[8380 3887 6472 ..., 1353 4760  618]
x: 0, y: 2
(3000.0, -3060.0)
[8525 4091 6622 ..., 1343 4839  638]
x: 0, y: 3
(3000.0, -3090.0)
[8461 4103 6448 ..., 1369 4973  671]
x: 0, y: 4
(3000.0, -3120.0)
[ 8518 -9999  6212 ...,  1395  4921   699]
x: 0, y: 5
(3000.0, -3150.0)
[ 8458 -9999  6189 ...,  1563  4813   664]
x: 0, y: 6
(3000.0, -3180.0)
[ 8551 -9999  6281 ...,  1528  4852   722]
x: 0, y: 7
(3000.0, -3210.0)
[ 8560 -9999  6355 ...,  1541  4911   747]
x: 0, y: 8
(3000.0, -3240.0)
[-9999 -9999  6281 ...,  1609  4920   686]
x: 0, y: 9
(3000.0, -3270.0)
[-9999 -9999  6076 ...,  1659  4962   689]
x: 1, y: 0
(3030.0, -3000.0)
[8459 3678 6587 ..., 1458 4797  637]
x: 1, y: 1
(3030.0, -3030.0)
[8413 3758 6451 ..., 1365 4759  611]
x: 1, y: 2
(3030.0, -3060.0)
[8511 3912 6386 ..., 1399 4863  574]
x: 1, y: 3
(3030.0, -3090.0)
[8461 3997 6322 ..., 1423 4921  650]
x: 1, y: 4
(3030.0, -3120.0)
[ 8458 -999