In [1]:
input_file = "gtex_rnaseq_prep_profiles.tsv"
n = 1000
pivot_url = "http://34.205.23.216:9191"

In [2]:
import livy, pprint, requests, time, json

In [3]:
host = livy.getHostUrl(pivot_url)

In [4]:
session_url = livy.openSession(host)

..........................

In [5]:
upload_file_url = session_url + "/upload-file"

In [6]:
requests.post(upload_file_url, files={"file": open(input_file,"rb")})

<Response [200]>

In [7]:
def code1(input_file, n):
    return '''
from pyspark import SparkContext
from pyspark import SparkFiles
from pyspark.ml.linalg import Vectors, Matrices
from pyspark.ml.stat import Correlation
import math

input_file = ''' + json.dumps(input_file) + '''
n = ''' + json.dumps(n) + '''
path = SparkFiles.get(input_file)

dataheaderindex = 2
max_rows = n

def to_float(i):
    def f(jx):
        j, x = jx
        try:
            return float(x)
        except:
            print("WARNING: non number \\"{0}\\" at ({1}, {2}) is set to 0".format(i, j, x))
            return 0
    return f
        
        
with open(path) as inp:
    headers = inp.readline().strip().split("\t")
    # ids = []
    # features = []
    ncols = len(headers) - dataheaderindex
    nrows = 0
    data = []
    for line in inp:
        if nrows >= max_rows:
            break
        row = line.strip().split('\t')
        row2 = row[dataheaderindex:]
        if len(row2) != ncols:
            print("WARNING: number of cols doesn't match at row {0} truncated with default value 0".format(nrows))
            while len(row2) < ncols:
                row2.append("0")
            row2 = row2[:ncols] 
        # id = row[:dataheaderindex]
        feature = list(map(to_float(nrows), enumerate(row2)))
        data.append(Vectors.dense(*feature))
        nrows += 1
'''

In [8]:
def code2(input_file, n):
    return code1(input_file, n) + '''
rdd = sc.parallelize(data)

rddi = rdd.zipWithIndex()

def dot_self(v):
    return v.dot(v)
    
AA = rdd.map(dot_self).collect()

def tanimoto(veci):
    vec, i = veci
    data2 = [ABij / (AA[i] + AA[j] - ABij) for j, ABij in enumerate(map(lambda vec2 : vec.dot(vec2), data[:i]))]
    return data2
  
tanimotodf = rddi.map(tanimoto).collect()
%table tanimotodf
'''

In [9]:
t0 = time.time()
r = livy.execStatement(host, session_url, code2(input_file, n))
t1 = time.time()
print()
print(t1 - t0)

............
15.550307512283325


In [10]:
pprint.pprint(r.json()["output"]["data"]["application/vnd.livy.table.v1+json"]["data"][:10])

[[],
 ['0.0905296261004'],
 ['0.546902733666', '0.0463593007012'],
 ['0.217650225637', '0.199209918696', '0.141772602675'],
 ['0.0585268213177', '0.10894499485', '0.0400457811145', '0.381359762029'],
 ['0.171528863768',
  '0.0293593661712',
  '0.329402532159',
  '0.0721594474811',
  '0.0232641885395'],
 ['0.131745067722',
  '0.0135852397835',
  '0.276427379125',
  '0.0321172057187',
  '0.00956200637227',
  '0.0727902976352'],
 ['0.701740729271',
  '0.0967625559077',
  '0.653769713455',
  '0.268487354256',
  '0.0725326912849',
  '0.311162330404',
  '0.141826486645'],
 ['0.64478131582',
  '0.0769402841855',
  '0.480598491193',
  '0.3052366563',
  '0.0795799835284',
  '0.241119380994',
  '0.0999408762708',
  '0.708298387545'],
 ['0.509372123107',
  '0.149857244534',
  '0.36286006184',
  '0.552544568935',
  '0.148303858523',
  '0.192931521568',
  '0.0736253279493',
  '0.660676333988',
  '0.639326407225']]


In [11]:
def code3(input_file, n):
    return code1(input_file, n) + '''
data2 = list(map(lambda x : [Vectors.dense(*x)], zip(*data)))

df = spark.createDataFrame(data2, ["features"])

r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\\n" + str(r1))

r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\\n" + str(r2))
'''

In [12]:
t0 = time.time()
r = livy.execStatement(host, session_url, code3(input_file, n))
t1 = time.time()
print()
print(t1 - t0)

..............
14.964808464050293


In [13]:
pprint.pprint(r.json()["output"]["data"])

               'Pearson correlation matrix:\n'
               'Row(pearson(features)=DenseMatrix(1000, 1000, [1.0, 0.2447, '
               '0.2068, 0.4914, 0.3069, -0.0747, 0.1278, 0.3957, ..., 0.1628, '
               '0.7181, 0.1993, 0.5056, 0.0839, 0.276, 0.8454, 1.0], False))\n'
               'Spearman correlation matrix:\n'
               'Row(spearman(features)=DenseMatrix(1000, 1000, [1.0, 0.223, '
               '0.3006, 0.4801, 0.3286, 0.3539, 0.37, 0.5528, ..., 0.111, '
               '0.6925, 0.5141, 0.4888, 0.2645, 0.1221, 0.7179, 1.0], False))'}


In [14]:
livy.closeSession(session_url)