# This section will read the data from the Blackrock Aladdin API
## The first step is to identify what stocks to read would have built out a full function where all that was needed to be passed in is the stock ticker name

In [1]:
import csv
from pyspark.sql.types import *
from StringIO import StringIO



Creating SparkContext as 'sc'


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1474704881191_0007,pyspark,idle,Link,Link,✔


Creating HiveContext as 'sqlContext'
SparkContext and HiveContext created. Executing user code ...


### Make API call to Aladdin

In [14]:
from urllib2 import Request, urlopen, URLError

def aladdinCall(stockToken):
    
    request = Request("https://www.blackrock.com/tools/json/performance?identifiers=%s&outputFormat=json&useCache=true" % stockToken)
    try:
        response = urlopen(request)
        raw_stock_json = response.read()
        return raw_stock_json
    except URLError, e:
        print 'Error:', e

### Parse the data  

In [139]:
import json
from pyspark.sql import Row
from collections import OrderedDict

def convert_to_row(d):
    return Row(d)

raw_data = aladdinCall('AAPL')

stockJson = json.loads(raw_data)


### Convert Data to Data Frame

In [78]:
#sinceStartDateAnnualized', u'oneMonth', u'level', u'sinceStartDate', u'asOfDate', u'drawdown', u'oneDay', u'@type'

dataPoints = []
for obj in stockJson["resultMap"]["RETURNS"][0]['returnsMap']:
    temp = []
    stop = False
    for item in stockJson["resultMap"]["RETURNS"][0]['returnsMap'][obj]:
        if stockJson["resultMap"]["RETURNS"][0]['returnsMap'][obj][item] != '':
            temp.append(stockJson["resultMap"]["RETURNS"][0]['returnsMap'][obj][item])
        else:
            stop = True
    if len(temp) > 7 or stop:
        #throw away data point, error in the processing
        continue
    dataPoints.append(temp)

rdd = sc.parallelize(dataPoints)

#print rdd

df = rdd.toDF(['sinceStartDateAnnualized', 'oneMonth','level','sinceStartDate','asOfDate',
              'drawnDown', 'oneDay'])

### Clean Data

In [186]:
from pyspark.ml.feature import VectorAssembler

df = df.orderBy('sinceStartDate') # sort by date of month

df = df.drop('any')

# Start Machine Learning 
## Will train the linear regression model to identify level as the label, factors that we can use to determine that are the sinceStartDate, asOfDate, and sinceStartDateAnnualized
### Cannot have any null values in the fields, and have use a map reduce function to create a list of Labeled points to store the data in

In [272]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.ml.regression import LinearRegression

sqlContext.registerDataFrameAsTable(df, "table1")

df2 = sqlContext.sql("SELECT sinceStartDate AS f1, level as f2, asOfDate as f3, sinceStartDateAnnualized as f4 from table1")
df2.collect()    
df2 = df2.orderBy('f1')

df3 = df2.filter(~ df2.f2.isNull() & ~ df2.f3.isNull() & ~ df2.f4.isNull())

def pointReduce(row):
    temp = [float(x) for x in row]
    return LabeledPoint(temp[1],[temp[0]] + temp[2:])

data = df3.map(pointReduce)


#### Here we actually train the linear regression model

In [277]:
model = LinearRegressionWithSGD.train(data, iterations=2, step=.00000001)


In [278]:
temp = model.predict([20040402.0,0.0,182.666001102])
print temp

-76782202546.7

# livy Rest endpoint to my model

In [235]:
# Evaluate the model on training data
valuesAndPreds = data.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds \
    .map(lambda (v, p): (v - p)**2) \
    .reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))


#Expose the algorithm here

Mean Squared Error = 1.61657348901e+281