-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Thunder integration with OCP #130
Changes from 10 commits
35f3bf8
f092ae2
883b403
72ba1ad
8188580
96b5b69
9f84707
5d91646
2440379
a89ccf7
c1525c0
42d90ea
b55f102
6f15b49
e0e4b38
7ffa635
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
""" | ||
from matplotlib.pyplot import imread | ||
from io import BytesIO | ||
from numpy import array, dstack, frombuffer, ndarray, prod, transpose | ||
from numpy import array, dstack, frombuffer, ndarray, prod, transpose, load, swapaxes | ||
from thunder.rdds.fileio.readers import getParallelReaderForPath | ||
from thunder.rdds.images import Images | ||
|
||
|
@@ -133,6 +133,98 @@ def toArray(idxAndBuf): | |
newDims = tuple(list(dims[:-1]) + [nplanes]) if nplanes else dims | ||
return Images(readerRdd.flatMap(toArray), nrecords=nrecords, dims=newDims, dtype=dtype) | ||
|
||
def fromOCP (self, dataPath, resolution, serverName='ocp.me', startIdx=None, stopIdx=None, minBound=None, maxBound=None ): | ||
"""Sets up a new Image object with data to read from OCP | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Define OCP |
||
|
||
Parameters | ||
---------- | ||
|
||
dataPath: string | ||
Name of the token/bucket in OCP. You can use the token name you created in OCP here. You can also access publicly avaliable data on OCP at this URL "http://ocp.me/ocp/ca/public_tokens/" | ||
|
||
resolution: nonnegative int | ||
Resolution of the data in OCP | ||
|
||
serverName: string. optional. | ||
Name of the server in OCP which has the corresponding token. By default this is always ocp.me but if you have an alternate server, you can set it here. | ||
|
||
startIdx, stopIdx: nonnegative int. optional. | ||
Indices of the first and last-plus-one data file to load, relative to the sorted filenames matching `datapath` and `ext`. Interpreted according to python slice indexing conventions. In OCP this is the starttime and endtime of your data. | ||
|
||
minBound, maxBound: tuple of nonnegative int. optional. | ||
X,Y,Z bounds of the data you want to fetch from OCP. minBound contains the (xMin,yMin,zMin) while maxBound contains (xMax,yMax,zMax) | ||
""" | ||
|
||
# Given a datapath/bucket Query JSON | ||
# Given bounds get a list of URI's | ||
import urllib2 | ||
urlList=[] | ||
url = 'http://{}/ocp/ca/{}/info/'.format(serverName,dataPath) | ||
|
||
try: | ||
f = urllib2.urlopen ( url ) | ||
except urllib2.URLError, e: | ||
raise Exception ( "Faile URL {}".format(url) ) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Faile" -> "Failed"? |
||
|
||
import json | ||
projInfo = json.loads ( f.read() ) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove extra spaces |
||
|
||
# Loading Information from JSON object | ||
ximageSize, yimageSize = projInfo['dataset']['imagesize']['{}'.format(resolution)] | ||
zimageStart, zimageStop = projInfo['dataset']['slicerange'] | ||
timageStart, timageStop = projInfo['dataset']['timerange'] | ||
xDim, yDim, zDim = cubeDims = projInfo['dataset']['cube_dimension']['{}'.format(resolution)] | ||
|
||
# Checking if dimensions are within bounds | ||
if startIdx == None: | ||
startIdx = timageStart | ||
elif startIdx < timageStart or startIdx > timageStop: | ||
raise Exception ( "startIdx out of bounds {},{}".format(timageStart,timageStop) ) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove extra space before |
||
|
||
if stopIdx == None: | ||
stopIdx = timageStop | ||
elif stopIdx < timageStart or stopIdx > timageStop: | ||
raise Exception( "startIdx out of bounds {},{}".format(timageStart,timageStop) ) | ||
|
||
if minBound == None: | ||
minBound = (0,0,zimageStart) | ||
elif minBound < (0,0,zimageStart) or minBound > (ximageSize,yimageSize,zimageStop): | ||
raise Exception ( "minBound is incorrect {},{}".format( (0,0,zimageStart), (ximageSize,yimageSize,zimageStop) ) ) | ||
|
||
if maxBound == None: | ||
maxBound = (ximageSize,yimageSize,zimageStop) | ||
elif maxBound < (0,0,zimageStart) or maxBound > (ximageSize,yimageSize,zimageStop): | ||
raise Exception ( "minBound is incorrect {},{}".format( (0,0,zimageStart), (ximageSize,yimageSize,zimageStop) ) ) | ||
|
||
for t in range(timageStart,timageStop,1): | ||
urlList.append( "http://{}/ocp/ca/{}/npz/{},{}/{}/{},{}/{},{}/{},{}/".format(serverName,dataPath,t,t+1,resolution,minBound[0],maxBound[0],minBound[1],maxBound[1],minBound[2],maxBound[2]) ) | ||
|
||
|
||
def read (url): | ||
"""Featch URL from the server""" | ||
|
||
try: | ||
f = urllib2.urlopen (url) | ||
except urllib2.URLError, e: | ||
print "Failed URL {}.".format(url) | ||
raise | ||
|
||
imgData = f.read() | ||
|
||
import zlib | ||
import cStringIO | ||
pageStr = zlib.decompress ( imgData[:] ) | ||
pageObj = cStringIO.StringIO ( pageStr ) | ||
data = load(pageObj) | ||
# Data comes in as 4d numpy array in t,z,y,x order. Swapping axes and removing the time dimension to give back a 3d numpy array in x,y,z order | ||
data = swapaxes(data[0,:,:,:],0,2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reformat |
||
|
||
return data | ||
|
||
rdd = self.sc.parallelize (enumerate(urlList), len(urlList)).map(lambda (k, v): (k, read(v))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove space after |
||
return Images(rdd, nrecords=len(urlList) ) | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove extra blank line |
||
def fromTif(self, dataPath, ext='tif', startIdx=None, stopIdx=None, recursive=False, nplanes=None, | ||
npartitions=None): | ||
"""Sets up a new Images object with data to be read from one or more tif files. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,12 +94,10 @@ def loadSeries(self, dataPath, nkeys=None, nvalues=None, inputFormat='binary', m | |
keyType=keyType, valueType=valueType) | ||
return data | ||
|
||
def loadImages(self, dataPath, dims=None, inputFormat='stack', ext=None, dtype='int16', | ||
startIdx=None, stopIdx=None, recursive=False, nplanes=None, npartitions=None, | ||
|
||
def loadImages(self, dataPath, dims=None, inputFormat='stack', ext=None, dtype='int16', startIdx=None, stopIdx=None, serverName='ocp.me', minBound=None, maxBound=None, resolution=None, recursive=False, nplanes=None, npartitions=None, | ||
renumber=False): | ||
""" | ||
Loads an Images object from data stored as a binary image stack, tif, or png files. | ||
|
||
Supports single files or multiple files, stored on a local file system, a networked file sytem | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did the first line get deleted here? |
||
(mounted and available on all nodes), or Amazon S3. HDFS is not currently supported for image file data. | ||
|
||
|
@@ -148,6 +146,15 @@ def loadImages(self, dataPath, dims=None, inputFormat='stack', ext=None, dtype=' | |
stopIdx: nonnegative int, optional | ||
See startIdx. | ||
|
||
serverName: string. optional. | ||
Name of the server in OCP which has the corresponding token. By default this is always ocp.me but if you have an alternate server, you can set it here. | ||
|
||
minBound, maxBound: tuple of nonnegative int. optional. | ||
X,Y,Z bounds of the data you want to fetch from OCP. minBound contains the (xMin,yMin,zMin) while maxBound contains (xMax,yMax,zMax) | ||
|
||
resolution: nonnegative int | ||
Resolution of the data in OCP | ||
|
||
recursive: boolean, default False | ||
If true, will recursively descend directories rooted at dataPath, loading all files in the tree that | ||
have an appropriate extension. Recursive loading is currently only implemented for local filesystems | ||
|
@@ -178,11 +185,15 @@ def loadImages(self, dataPath, dims=None, inputFormat='stack', ext=None, dtype=' | |
A newly-created Images object, wrapping an RDD of <int index, numpy array> key-value pairs. | ||
|
||
""" | ||
checkParams(inputFormat, ['stack', 'png', 'tif', 'tif-stack']) | ||
checkParams(inputFormat, ['stack', 'png', 'tif', 'tif-stack', 'ocp']) | ||
|
||
from thunder.rdds.fileio.imagesloader import ImagesLoader | ||
loader = ImagesLoader(self._sc) | ||
|
||
# Checking StartIdx is smaller or equal to StopIdx | ||
if startIdx!=None and stopIdx!=None and startIdx > stopIdx: | ||
raise Exception ( "Error. startIdx {} is larger than stopIdx {}".format(startIdx,stopIdx) ) | ||
|
||
if not ext: | ||
ext = DEFAULT_EXTENSIONS.get(inputFormat.lower(), None) | ||
|
||
|
@@ -192,6 +203,12 @@ def loadImages(self, dataPath, dims=None, inputFormat='stack', ext=None, dtype=' | |
elif inputFormat.lower().startswith('tif'): | ||
data = loader.fromTif(dataPath, ext=ext, startIdx=startIdx, stopIdx=stopIdx, recursive=recursive, | ||
nplanes=nplanes, npartitions=npartitions) | ||
elif inputFormat.lower() == 'ocp': | ||
if nplanes: | ||
raise NotImplementedError("nplanes argument is not implemented for ocp") | ||
if npartitions: | ||
raise NotImplementedError("npartitions argument is not implemented for ocp") | ||
data = loader.fromOCP(dataPath, startIdx=startIdx, stopIdx=stopIdx, minBound=minBound, maxBound=maxBound, serverName=serverName, resolution=resolution ) | ||
else: | ||
if nplanes: | ||
raise NotImplementedError("nplanes argument is not supported for png files") | ||
|
@@ -772,4 +789,4 @@ def setAWSCredentials(self, awsAccessKeyId, awsSecretAccessKey): | |
"mat": "mat", | ||
"npy": "npy", | ||
"txt": "txt" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throughout line lengths should be no longer than 120
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are quite a few deviations from PEP conventions for spacing throughout here, can you please open this in PyCharm and try to fix them? That will be more efficient than my marking all of them.