In [17]:
# default_exp core

# tciaclient

> A TCIA (The Cancer Imaging Archive) download client for Python.

In [18]:
#hide
from nbdev.showdoc import *
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [25]:
#export
import os
import urllib.request, urllib.error, urllib.parse
import urllib.request, urllib.parse, urllib.error
import sys
import math
import json

In [37]:
#export
class TCIAClient:
    GET_IMAGE = "getImage"
    GET_MANUFACTURER_VALUES = "getManufacturerValues"
    GET_MODALITY_VALUES = "getModalityValues"
    GET_COLLECTION_VALUES = "getCollectionValues"
    GET_BODY_PART_VALUES = "getBodyPartValues"
    GET_PATIENT_STUDY = "getPatientStudy"
    GET_SERIES = "getSeries"
    GET_PATIENT = "getPatient"
    GET_SERIES_SIZE = "getSeriesSize"
    CONTENTS_BY_NAME = "ContentsByName"

    def __init__(self, baseUrl="https://services.cancerimagingarchive.net/services/v4", resource="TCIA"):
        self.baseUrl = baseUrl + "/" + resource

    def execute(self, url, queryParameters={}):
        queryParameters = dict((k, v) for k, v in queryParameters.items() if v)
        queryString = "?%s" % urllib.parse.urlencode(queryParameters)
        requestUrl = url + queryString
        request = urllib.request.Request(url=requestUrl)
        resp = urllib.request.urlopen(request)
        return resp
    
    def jsonify(self, resp):
        return json.loads(resp.read())

    def get_modality_values(self,collection = None , bodyPartExamined = None , modality = None , outputFormat = "json" ):
        serviceUrl = self.baseUrl + "/query/" + self.GET_MODALITY_VALUES
        queryParameters = {"Collection" : collection , "BodyPartExamined" : bodyPartExamined , "Modality" : modality , "format" : outputFormat }
        resp = self.execute(serviceUrl , queryParameters)
        if outputFormat == "json" : resp = self.jsonify(resp)
        return resp

    def get_series_size(self, SeriesInstanceUID = None, outputFormat = "json"):
        serviceUrl = self.baseUrl + "/query/" + self.GET_SERIES_SIZE
        queryParameters = {"SeriesInstanceUID" : SeriesInstanceUID, "format" :
                           outputFormat}
        resp = self.execute(serviceUrl, queryParameters)
        if outputFormat == "json" : resp = self.jsonify(resp)
        return resp

    def contents_by_name(self, name = None):
        serviceUrl = self.baseUrl + "/query/" + self.CONTENTS_BY_NAME
        queryParameters = {"name" : name}
        print(serviceUrl)
        resp = self.execute(serviceUrl,queryParameters)
        return resp

    def get_manufacturer_values(self,collection = None , bodyPartExamined = None, modality = None , outputFormat = "json"):
        serviceUrl = self.baseUrl + "/query/" + self.GET_MANUFACTURER_VALUES
        queryParameters = {"Collection" : collection , "BodyPartExamined" : bodyPartExamined , "Modality" : modality , "format" : outputFormat }
        resp = self.execute(serviceUrl , queryParameters)
        if outputFormat == "json" : resp = self.jsonify(resp)
        return resp

    def get_collection_values(self,outputFormat = "json" ):
        serviceUrl = self.baseUrl + "/query/" + self.GET_COLLECTION_VALUES
        queryParameters = { "format" : outputFormat }
        resp = self.execute(serviceUrl , queryParameters)
        if outputFormat == "json" : resp = self.jsonify(resp)
        return resp

    def get_body_part_values(self,collection = None , bodyPartExamined = None , modality = None , outputFormat = "json" ):
        serviceUrl = self.baseUrl + "/query/" + self.GET_BODY_PART_VALUES
        queryParameters = {"Collection" : collection , "BodyPartExamined" : bodyPartExamined , "Modality" : modality , "format" : outputFormat }
        resp = self.execute(serviceUrl , queryParameters)
        if outputFormat == "json" : resp = self.jsonify(resp)
        return resp

    def get_patient_study(self,collection = None , patientId = None , studyInstanceUid = None , outputFormat = "json" ):
        serviceUrl = self.baseUrl + "/query/" + self.GET_PATIENT_STUDY
        queryParameters = {"Collection" : collection , "PatientID" : patientId , "StudyInstanceUID" : studyInstanceUid , "format" : outputFormat }
        resp = self.execute(serviceUrl , queryParameters)
        if outputFormat == "json" : resp = self.jsonify(resp)
        return resp

    def get_series(self, collection = None , modality = None , studyInstanceUid = None , outputFormat = "json" ):
        "Search for series by collection name and/or study name and/or modality."
        serviceUrl = self.baseUrl + "/query/" + self.GET_SERIES
        queryParameters = {"Collection" : collection , "StudyInstanceUID" : studyInstanceUid , "Modality" : modality , "format" : outputFormat }
        resp = self.execute(serviceUrl , queryParameters)
        if outputFormat == "json" : resp = self.jsonify(resp)
        return resp

    def get_patient(self,collection = None , outputFormat = "json" ):
        serviceUrl = self.baseUrl + "/query/" + self.GET_PATIENT
        queryParameters = {"Collection" : collection , "format" : outputFormat }
        resp = self.execute(serviceUrl , queryParameters)
        if outputFormat == "json" : resp = self.jsonify(resp)
        return resp

    def get_image(self , seriesInstanceUid , downloadPath, zipFileName):
        "Get a set of images as a zip file."
        serviceUrl = self.baseUrl + "/query/" + self.GET_IMAGE
        queryParameters = { "SeriesInstanceUID" : seriesInstanceUid }
        os.umask(0o002)
        try:
            if not os.path.exists(downloadPath):
                os.makedirs(downloadPath)
            file = os.path.join(downloadPath, zipFileName)
            resp = self.execute( serviceUrl , queryParameters)
            downloaded = 0
            CHUNK = 256 * 10240
            with open(file, 'wb') as fp:
                while True:
                    chunk = resp.read(CHUNK)
                    downloaded += len(chunk)
                    if not chunk: break
                    fp.write(chunk)
        except urllib.error.HTTPError as e:
            print("HTTP Error:",e.code , serviceUrl)
            return False
        except urllib.error.URLError as e:
            print("URL Error:",e.reason , serviceUrl)
            return False

        return True

In [38]:
show_doc(TCIAClient.get_series)

<h4 id="TCIAClient.get_series" class="doc_header"><code>TCIAClient.get_series</code><a href="__main__.py#L78" class="source_link" style="float:right">[source]</a></h4>

> <code>TCIAClient.get_series</code>(**`collection`**=*`None`*, **`modality`**=*`None`*, **`studyInstanceUid`**=*`None`*, **`outputFormat`**=*`'json'`*)

Search for series by collection name and/or study name and/or modality.

In [39]:
tc = TCIAClient()
tc.get_series(collection="TCGA-GBM")[:2]

[{'PatientID': 'TCGA-08-0244',
  'StudyInstanceUID': '1.3.6.1.4.1.14519.5.2.1.7695.4001.130563880911723253267280582465',
  'SeriesInstanceUID': '1.3.6.1.4.1.14519.5.2.1.7695.4001.306204232344341694648035234440',
  'Modality': 'MR',
  'SeriesDate': '1998-12-08',
  'SeriesDescription': '3DSPGR AXIAL',
  'BodyPartExamined': 'BRAIN',
  'SeriesNumber': '2.000000',
  'Collection': 'TCGA-GBM',
  'Manufacturer': 'GE MEDICAL SYSTEMS',
  'ManufacturerModelName': 'GENESIS_SIGNA',
  'SoftwareVersions': '07',
  'Visibility': '1',
  'ImageCount': 124},
 {'PatientID': 'TCGA-08-0244',
  'StudyInstanceUID': '1.3.6.1.4.1.14519.5.2.1.7695.4001.130563880911723253267280582465',
  'SeriesInstanceUID': '1.3.6.1.4.1.14519.5.2.1.7695.4001.180700359927709468630440576839',
  'Modality': 'MR',
  'SeriesDate': '1998-12-08',
  'SeriesDescription': 'FMPSPGR SAG',
  'BodyPartExamined': 'BRAIN',
  'SeriesNumber': '1.000000',
  'Collection': 'TCGA-GBM',
  'Manufacturer': 'GE MEDICAL SYSTEMS',
  'ManufacturerModelName':

In [33]:
show_doc(TCIAClient.get_image)

<h4 id="TCIAClient.get_image" class="doc_header"><code>TCIAClient.get_image</code><a href="__main__.py#L92" class="source_link" style="float:right">[source]</a></h4>

> <code>TCIAClient.get_image</code>(**`seriesInstanceUid`**, **`downloadPath`**, **`zipFileName`**)

Get a set of images as a zip file

In [40]:
tc = TCIAClient()
tc.get_image(seriesInstanceUid = "1.3.6.1.4.1.14519.5.2.1.7695.4001.180700359927709468630440576839",
             downloadPath = "./", zipFileName = "images.zip")

True

In [34]:
#hide
from nbdev.export import notebook2script; notebook2script()

Converted 00_core.ipynb.
Converted index.ipynb.
