In [1]:
import math
import string
import datetime
import numpy as np
import matplotlib.pyplot
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pylab
sqlContext = SQLContext(sc)

%matplotlib inline

In [2]:
def config_softlayer_acct(name, auth_url, username, password):
   prefix = "fs.swift.service." + name
   hconf = sc._jsc.hadoopConfiguration()
   hconf.set(prefix + ".auth.url", auth_url)
   hconf.set(prefix + ".username", username)
   hconf.set(prefix + ".tenant", username)
   hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
   hconf.setInt(prefix + ".http.port", 8080)
   hconf.set(prefix + ".apikey", password)
   hconf.setBoolean(prefix + ".public", True)
   hconf.set(prefix + ".use.get.auth", "true")
   hconf.setBoolean(prefix + ".location-aware", False)
   hconf.set(prefix + ".password", password)

config_softlayer_acct("seti","https://dal05.objectstorage.softlayer.net/auth/v1.0","IBMOS294544-2:npoore@us.ibm.com","abde9540378cd1e662de10df155ea50ccd88a6137af5575cc639957e6b635b7d")

In [3]:
def parseLineIgnoreHeaders(line):
    words = []
    if line.startswith('UniqueId'):
        return words
    words = line
    return words

sqlContext = SQLContext(sc)

data = sc.textFile("swift://setiSignalDB.seti/*")

dataForRowNames = sc.textFile("swift://setiSignalDB.seti/xaa")
rowNames = dataForRowNames.first().split("\t")

rowNamesClear = []

#We need to modify column names with "/" in it, because of issues when call these columns later
for name in rowNames:
    if name.find("/"):
        rowNamesClear.append(name.replace("/",""))
    else: 
        rowNamesClear.append(name)

cleanData = data.map(lambda line:parseLineIgnoreHeaders(line)).filter(lambda words: len(words)>0)
rowRDD = cleanData.map(lambda line:line.split("\t")).map(lambda d:(d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7],d[8],d[9], d[10],d[11],d[12],d[13],d[14],d[15],d[16],d[17],d[18],d[19],d[20],d[21],d[22]))

fields = [StructField(field_name, StringType(), True) for field_name in rowNamesClear]
schema = StructType(fields)

# Create a Dataframe of the entire Signal database ~ 200M rows
fullSetiDataFrame = sqlContext.createDataFrame(rowRDD, schema)

In [4]:
fullSetiDataFrame.count()

196816238

In [5]:
fullSetiDataFrame.show(10)

+--------------------+-------------------+----------+------+-------+--------+----------+-------+----+--------------+--------+-----+----+------+--------+----+--------+---------+---------+------+--------+---------+----------+
|            UniqueId|               Time|    ActTyp| TgtId|catalog|RA2000Hr|Dec2000Deg|  Power| SNR|       FreqMHz|DriftHzs|WidHz| Pol|SigTyp|PPeriodS|NPul|IntTimeS|TscpAzDeg|TscpElDeg|BeamNo|SigClass|SigReason|CandReason|
+--------------------+-------------------+----------+------+-------+--------+----------+-------+----+--------------+--------+-----+----+------+--------+----+--------+---------+---------+------+--------+---------+----------+
| antisolar_14_36_0_1|2010-06-11 21:02:54|target1off|109471| habcat|  13.236|   -14.282|166.000|NULL|1424.974541161|  -0.315|0.662|both|   CwP|    NULL|NULL|      98|  180.330|   34.862|     3|    Cand|   PsPwrT|   SnMulBm|
| antisolar_14_29_0_2|2010-06-11 21:02:55|target1off|109539| habcat|  13.302|   -14.780|182.000|NULL|142

### The next cell commented out, but shows how to read/write parquet files to avoid the text dump parse/load in the cell above

In [6]:
# uncomment the folloing line if you want to save the  data as a parquet file. 
# Parquets are columnar store formats, and makes it easy/faster to re-load the data for later use... see commented text below.
#fullSetiDataFrame.write.parquet("signalDB.parquet")

# uncomment this line if the signalDB parquet already exists. You can load it directly into the dataframe and skip the text dump parse and load in the cell above
#fullSetiDataFrame = sqlContext.read.parquet("signalDB.parquet")

### Example query of signalDB dataframe... this creates a new dataframe containing only those signal event records which are confimed 'candidate' signals with a stored archive-CompAmp file

In [8]:
# create a DF of just the confirmed signals
# Replace this SQL statement with any queries or sequences of queries to mine the signal database for matching records and patterns

fullSetiDataFrame.registerTempTable("signaldb")
confirmedSignalsDF = sqlContext.sql("SELECT * FROM signaldb WHERE SigClass='Cand' AND CandReason IN ('PsPwrT', 'RConfrm', 'PsCohD', 'Confrm')")
confirmedSignalsDF.count()

288802

In [None]:
# uncomment the folloing line if you want to save the confirmed signal data as a parquet file. 
# This will allow you to directly load the parquet into a dataframe in the future and begin processing the confirmed candidate 
# signals without repeating any of the preceeding steps.

#confirmedSignalsDF.write.parquet("confirmedSignalDB.parquet")