# IBM SETI Tutorial  
### Transfer Data  to Local Object Store

This tutorial builds on the information presented in the [introduction to the HTTP API notebook](https://github.com/ibm-cds-labs/seti_at_ibm/blob/master/notebooks/ibmseti_intro_to_http_api.ipynb).

### Goal

The goal is to use the REST API to extract a set of data from the REST API and store it into our Object Store. We want to retain the raw data and SignalDB rows in order to later perform data analysis. 

In [1]:
import requests

#### Interesting Target

Start with the coordinates for our interesting target, Kepler 1229b. We ensured that data existed for this celestial location (RA, DEC)  in the SETI Public data set in the [HTTP API introduction notebook](https://github.com/ibm-cds-labs/seti_at_ibm/blob/master/notebooks/ibmseti_intro_to_http_api.ipynb).

In [2]:
ra=19.832
dec=46.997


### Strategy:
    
    * Build RDD with meta-data container/objectname
    * map function to get temporary URL and data
    * save data to Object Store in various ways
        * RDD as pickled Hadoop file on Object Store
        * Individual files on Object Store
    

## Build RDD with meta-data

Retrieve the meta data for each raw data object. 

Use the skip parameter to paginate through the results and extract all of the SignalDB rows for our particular RA/DEC coordinate.

In [3]:
skip = 0
skip_delta = 2000
all_rows = []

while True:
    params = {'skip':skip}
    r = requests.get('https://setigopublic.mybluemix.net/v1/aca/meta/{}/{}?limit=2000'.format(ra,dec), 
                     params=params)
    r.raise_for_status()
    
    if r.json()['returned_num_rows'] == 0:
        break
        
    all_rows += r.json()['rows']
    skip += skip_delta

In [4]:
len(all_rows)  #We have 392 rows.

392

In [5]:
rdd = sc.parallelize( all_rows )

## Pull data into RDD

We now want to get the raw data and combine it with the SignalDB data. The SignalDB data should be retained since it could contain useful information to be used as features in a machine-learning analysis. You should at least retain the celestial coordinates of the raw data since one characteristic of an expected SETI signal is multiple observations of a signal from the same location in the sky. 

Recall that each raw data file can show up multiple times in SignalDB, for various resasons. We want to package all of these rows together with a single raw data file.

##### GroupBy

To do this, we use `groupBy` to re-organize the data returned from the setigopublic server into `(K,  <iterator V>)` pairs, where `K` is the concatenation of the `<containter>-<objectname>`, which should be completely unique, and `<iterator V>` is an iterator over the SignalDB rows. 

Recall that each `row` returned by the API server is dictionary where each key is the name of a column in the SignalDB. 

#### Group By Raw Data File Name

This creates an RDD of `(K, <iterator V>)` rows.

In [6]:
rdd_klv = rdd.groupBy(lambda row: row['container'] + '-' + row['objectname'])

#### Add Temporary URLs to RDD
From this `(K, <iterator V>)` pair, we then grab the data with the temporary URLs. 

We define a function to request the temporary URLs. 

Also, we modify each row into a `(K, V)` pair such that `V` is now a list containing the HTTP status code, the temprorary URL and the iterable list of the SignalDB rows. The key, `K`, becomes just the name of the raw data file. 

##### Access Token

Before requesting the temporary URLs, you must first attain an access token. This will require an IBM DSX account. 

Click here: https://setigopublic.mybluemix.net/token

Copy the token returned below

In [7]:
access_token='9798b5d71b649b36d5307919940372e7'

In [8]:
def add_temp_urls(row):
    '''
    This function will get a temporary URL to the data. 
    
    Note the new format of each Row.
    '''
    
    #each row in RDD is a tuple2, The first element is the container-objectname
    container, objectname = row[0].split('-',1) 
    
    temp_url_api = 'https://setigopublic.mybluemix.net/v1/data/url/{}/{}'.format(container, objectname)
    r = requests.get(temp_url_api, params={'access_token':access_token})
    
    temp_url = ''
    if r.status_code == 200:
        temp_url = r.json()['temp_url']
        
    #Object names look like: 2013-01-05/act1779/2013-01-05_07-21-33_UTC.act1779.dx3016.id-15.R.archive-compamp
    #We just want `2013-01-05_07-12-33_UTC.act1779.dx3016.id-15.R.archive-compamp`
    newkey = objectname.split('/')[-1]  
    
    return (newkey, [r.status_code, temp_url, row[1]])

In [9]:
rdd_with_url = rdd_klv.map(add_temp_urls)

##### Cache and Count the RDD. 

The RDD is cached here so that Spark doesn't obtain the temporary data URLs for a second time when the data is saved to Object Storage later in this notebook. 

In [10]:
rdd_with_url.cache()

PythonRDD[5] at RDD at PythonRDD.scala:43

In [11]:
%time rdd_with_url.count()

CPU times: user 17.5 ms, sys: 8.98 ms, total: 26.5 ms
Wall time: 1min 4s


206

Note that 206 rows were returned here whereas we initiall started with 392 rows. This is because out of the 392 rows returned above, 186 of them referred to the same data file as another.  Grouping the `rdd` by `container-objectname`, removed the duplicate raw data files, but still let us retain each of the SignalDB rows for that file. 

#### Get Data

Note that we again transform our RDD Row. 

In [12]:
def get_data(row):
    '''
    We use the temporary URL to pull the data into each Row.
    '''
    r = requests.get(row[1][1])

    # here we transform the data to something a little easier to use
    retDataVal = {"aca_file_name":row[0],
                  "target_name":"kepler1229b", #could be helpful later to attach name
                  "raw_data":r.content, 
                  "http_status":r.status_code, 
                  "signaldb_rows":row[1][2]
                 }

    return retDataVal

In [13]:
#filter out rows where HTTP requests did not return 200
rdd_with_data = rdd_with_url.filter(lambda x: x[1][0] == 200)\
                .map(get_data)\
                .filter(lambda x: x['http_status'] == 200)

## Various Ways To Save

### Using Object Storage

Moving the data form the SETI Object Storage to your personal Object Storage will provide the best performance for your analysis. This tutorial assumes that you've provisioned your services via IBM Data Science Experience, which automatically generates a Spark and Object Storage service for you. 

 
#### Insert Credentials from this notebook

These instructions assume you're running this notebook from within DSX.

1. Click the ![10/01](../img/1001.png "10/01") button in the set of icons on the top right to reveal the files in your Object Storage container associated with this notebook.
> If there appears to be no Object Storage linked with your account, save and leave this notebook, go to your DSX project settings and add a new Object Storage instance.  
3. From any file listed in the right-side panel, select `Insert to Code` and then choose `Insert Credentials`.
> If there are no data shown, the easiest way to obtain your credentials is by adding a small file to your object storage. From your desktop, drag in a small text file. Then you may obtain credentials via the `Insert to Code` pull-down. 

##### Alternatively, from Bluemix

Alternatively, you can obtain the credentials from Bluemix. 
1. Log into your IBM Bluemix account: https://bluemix.net. (Use your DSX credentials. Your Bluemix account was automatically created for you.) 
2. Scroll down and select your Object Storage service.
3. Select `Service Credentials` and then `View Credentials`. Any set of credentials should work. 

**Note in the commented code below:** set the `credentials_1['container']` key to the name of an existing container in your Object Storage if it is not already set.

In [14]:
# @hidden_cell
credentials_1 = {
  'auth_uri':'',
    'global_account_auth_uri':'',
    'auth_url':'https://identity.open.softlayer.com',
  'project':'object_storage_205c4f18_fee1_4d57_8b3d_a796f1125e04',
  'project_id':'aa22daa1070347848cbdd103df44dfbb',
  'region':'dallas',
  'user_id':'47404e1c8d8b46eeb84f45cff881bae0',
  'domain_id':'e15e35e73d58464389fbf14d81b52f20',
  'domain_name':'1318277',
  'username':'member_cdd310eeb7086f719fbe75a724578af5086579cb',
  'password':"""ws_.~Uw#64cRS_L#""",
  'container':'SETI1',
  'tenantId':'undefined',
  }

### Pickle RDD to Object Store

In [15]:
#!pip install --user --upgrade ibmos2spark
#This is a tool that configures the Spark Hadoop configuration, needed for connecting to Object Store via the swift driver.
#https://github.com/ibm-cds-labs/ibmos2spark

import ibmos2spark

In [16]:
configuration_name = 'my_dsx_object_storage' 
bmos = ibmos2spark.bluemix(sc, credentials_1, configuration_name) #note, I use version "2d"

In [17]:
%time rdd_with_data.saveAsPickleFile(bmos.url(credentials_1['container'], 'kepler1229b.sigdb.archive-compamps.rdd.dict.pickle'))

CPU times: user 12.3 ms, sys: 5.52 ms, total: 17.8 ms
Wall time: 24 s


In [18]:
%time rdd_with_data.count()

CPU times: user 8.63 ms, sys: 3.26 ms, total: 11.9 ms
Wall time: 12.3 s


206

###### Quick Verification

In [19]:
rdd_read_data = sc.pickleFile(bmos.url(credentials_1['container'], 'kepler1229b.sigdb.archive-compamps.rdd.dict.pickle'))
%time rdd_read_data.count()

CPU times: user 9.96 ms, sys: 583 µs, total: 10.5 ms
Wall time: 4.12 s


206

In [20]:
rdd_read_data_firsttwo = rdd_read_data.take(2)

In [21]:
print type(rdd_read_data_firsttwo[1]['signaldb_rows'])
print ''
print list(rdd_read_data_firsttwo[1]['signaldb_rows'])[0]

<class 'pyspark.resultiterable.ResultIterable'>

{u'inttimes': 94, u'pperiods': None, u'drifthzs': 0.041, u'tgtid': 150096, u'sigreason': u'Confrm', u'freqmhz': 9567.273377852, u'dec2000deg': 46.997, u'container': u'setiCompAmp', u'objectname': u'2014-10-07/act37464/2014-10-07_04-20-00_UTC.act37464.dx1014.id-1.R.archive-compamp', u'ra2000hr': 19.832, u'npul': None, u'acttype': u'target', u'power': 49.339, u'widhz': 0.087, u'catalog': u'keplerHZ', u'snr': 0.181, u'uniqueid': u'kepler8ghz_37464_1014_1_2281530', u'beamno': 1, u'sigclass': u'Cand', u'sigtyp': u'CwC', u'tscpeldeg': 0, u'pol': u'both', u'candreason': u'Confrm', u'time': u'2014-10-07T04:20:00Z', u'tscpazdeg': 0}


In [22]:
print rdd_read_data_firsttwo[1]['aca_file_name'], len(rdd_read_data_firsttwo[1]['raw_data'])

2014-10-07_04-20-00_UTC.act37464.dx1014.id-1.R.archive-compamp 1061928


### Save individual files

Saving files individually will not be the most performant. **Do not do this.** Optimal object sizes to load from Object Store to Spark are in the 64 to 128 MB range. You are encouraged to save the entire RDD as a single pickled object. 

We leave this method here for instructional purposes. 

In [25]:
#!pip install --user --upgrade python-swiftclient

import swiftclient.client as swiftclient

conn = swiftclient.Connection(
    key=credentials_1['password'],
    authurl=credentials_1['auth_url']+"/v3",
    auth_version='3',
    os_options={
        "project_id": credentials_1['project_id'],
        "user_id": credentials_1['user_id'],
        "region_name": credentials_1['region']})

In [26]:
import cPickle 

def pickle_rows_to_objects(row):
    pickled_row = cPickle.dumps(row, protocol=cPickle.HIGHEST_PROTOCOL)
    #just to be clear
    new_file_name = 'single_files/' + row['aca_file'] + '.pickle_with_sigdb'
    resp = {}
    etag = conn.put_object(credentials_1['container'], new_file_name , pickled_row, response_dict=resp)
    return (new_file_name, etag, resp, len(pickled_row))

In [27]:
rdd_pickle_rows_to_objects = rdd_with_data.map(pickle_rows_to_objects)

In [28]:
%time results = rdd_pickle_rows_to_objects.collect()

CPU times: user 8.43 ms, sys: 4.96 ms, total: 13.4 ms
Wall time: 46.6 s


###### Quick Verification

In [29]:
results[0]

(u'single_files/2014-06-13_11-26-48_UTC.act18681.dx2018.id-2.L.archive-compamp.pickle_with_sigdb',
 '216b6bd2fb5b95ea106c3197da1d7633',
 {'headers': {u'content-length': u'0',
   u'content-type': u'text/html; charset=UTF-8',
   u'date': u'Tue, 16 Aug 2016 19:06:20 GMT',
   u'etag': u'216b6bd2fb5b95ea106c3197da1d7633',
   u'last-modified': u'Tue, 16 Aug 2016 19:06:21 GMT',
   u'x-trans-id': u'tx4dbfb9263c094ba8be4ea-0057b3642b'},
  'reason': 'Created',
  'response_dicts': [{'headers': {u'content-length': u'0',
     u'content-type': u'text/html; charset=UTF-8',
     u'date': u'Tue, 16 Aug 2016 19:06:20 GMT',
     u'etag': u'216b6bd2fb5b95ea106c3197da1d7633',
     u'last-modified': u'Tue, 16 Aug 2016 19:06:21 GMT',
     u'x-trans-id': u'tx4dbfb9263c094ba8be4ea-0057b3642b'},
    'reason': 'Created',
    'status': 201}],
  'status': 201},
 1063278)

In [30]:
rdd_read_data_pickle_with_sigdb = sc.binaryFiles(bmos.url(credentials_1['container'], 'single_files/*'))

In [31]:
%time rdd_read_data_pickle_with_sigdb.count()

CPU times: user 15.6 ms, sys: 8.1 ms, total: 23.7 ms
Wall time: 16.4 s


206