In [1]:
import findspark
findspark.init()
import pyspark
import random
'''
 The building block of the Spark API is its RDD API. In the RDD API, there are two types of operations: 
 transformations, which define a new dataset based on previous ones, and actions, which kick off a job 
 to execute on a cluster. On top of Spark’s RDD API, high level APIs are provided, e.g. DataFrame API and 
 Machine Learning API. These high level APIs provide a concise way to conduct certain data operations. 

1> PI ESTIMATION
This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) 
and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.

'''
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

3.14168416


In [11]:
'''
2> WORD COUNT
In this example, we use a few transformations to build a dataset of (String, Int) pairs called counts and then save 
it to a file.
'''
sc = pyspark.SparkContext(appName="WordCount")
text_file = sc.textFile("UIMASummerSchool2003.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("Spark_Wordcount_output.txt")
sc.stop()

In [90]:
'''
MLlib, Spark’s Machine Learning (ML) library, provides many distributed ML algorithms. These algorithms cover tasks 
such as feature extraction, classification, regression, clustering, recommendation, and more. MLlib also provides 
tools such as ML Pipelines for building workflows, CrossValidator for tuning parameters, and model persistence for 
saving and loading models.

3> PREDICTION WITH LOGISTIC REGRESSION
In this example, we take a dataset of labels and feature vectors. We learn to predict the labels from feature vectors 
using the Logistic Regression algorithm.
'''
from pyspark.sql import SQLContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors


sc = pyspark.SparkContext(appName="LogisticRegression")

diabetes_train = load_diabetes()
sqlContext = SQLContext(sc)

# Prepare test data
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
sc.stop()

+-----+--------------+--------------------+--------------------+----------+
|label|      features|       rawPrediction|         probability|prediction|
+-----+--------------+--------------------+--------------------+----------+
|  1.0|[-1.0,1.5,1.3]|[-7.4926373274095...|[5.56861309905893...|       1.0|
|  0.0|[3.0,2.0,-0.1]|[5.86563908503424...|[0.99717280516679...|       0.0|
|  1.0|[0.0,2.2,-1.5]|[-6.8135956206001...|[0.00109752926021...|       1.0|
+-----+--------------+--------------------+--------------------+----------+



In [89]:
sc.stop()

In [118]:
print(diabetes.data)

[[ 0.03807591  0.05068012  0.06169621 ... -0.00259226  0.01990842
  -0.01764613]
 [-0.00188202 -0.04464164 -0.05147406 ... -0.03949338 -0.06832974
  -0.09220405]
 [ 0.08529891  0.05068012  0.04445121 ... -0.00259226  0.00286377
  -0.02593034]
 ...
 [ 0.04170844  0.05068012 -0.01590626 ... -0.01107952 -0.04687948
   0.01549073]
 [-0.04547248 -0.04464164  0.03906215 ...  0.02655962  0.04452837
  -0.02593034]
 [-0.04547248 -0.04464164 -0.0730303  ... -0.03949338 -0.00421986
   0.00306441]]


In [28]:
'''
4> Predicting Diabetes using LinearRegression from MLib (Machine Learning library from Spark) 

This Diabetes dataset downloaded from Sklearn has ten baseline variables, age, sex, body mass index, average blood 
pressure, and six blood serum measurements were obtained for each of n = 442 diabetes patients, as well as the 
response of interest, a quantitative measure of disease progression one year after baseline.

A fasting blood sugar level less than 100 mg/dL (5.6 mmol/L) is normal. A fasting blood sugar level from 100 to 
125 mg/dL (5.6 to 6.9 mmol/L) is considered prediabetes. If it's 126 mg/dL (7 mmol/L) or higher on two separate 
tests, you have diabetes. Oral glucose tolerance test.
'''
import findspark
findspark.init()
import pyspark
import random

from sklearn import datasets
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics

# Import and clean data. Pyspark uses its own type system and unfortunately it doesn't deal with numpy well. 
# It works with python types though. So you need to manually convert the numpy.float64 to float.

diabetes = datasets.load_diabetes()
diabetes_features= []

# Spark uses breeze under the hood for high performance Linear Algebra in Scala. In Spark, MLlib and other 
# ML algorithms depends on org.apache.spark.mllib.libalg.Vector type which is rather dense or sparse.

for feature_list in diabetes.data:
    temp= [float(i) for i in feature_list]
    diabetes_features.append(Vectors.dense(temp))
    
diabetes_target = [float(i) for i in diabetes.target]
features_and_predictions = list(zip(diabetes_target, diabetes_features))

sc = pyspark.SparkContext(appName="LinearRegression_Diabetes")
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(features_and_predictions, ["label", "features"])

# Only max iterations is set. We will set parameters for the algorithm after ParamGridSearch
lr = LinearRegression(maxIter=10)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()


# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
LR_model = tvs.fit(df)

# Make predictions on test data. LR_model is the model with combination
# of parameters that performed best.

LR_model.transform(df)\
    .select("features","label", "prediction").show()

Dataframe = LR_model.transform(df)\
    .select("label", "prediction")

# Metrics object needs to have an RDD of (prediction, observation) pairs.
# Convert the dataframe object to an RDD

valuesAndPreds = Dataframe.rdd.map(tuple)

# Instantiate metrics object
metrics = RegressionMetrics(valuesAndPreds)

# Squared Error
print("MSE = %s" % metrics.meanSquaredError)
print("RMSE = %s" % metrics.rootMeanSquaredError)

# R-squared
print("R-squared = %s" % metrics.r2)

# Mean absolute error
print("MAE = %s" % metrics.meanAbsoluteError)

# Explained variance
print("Explained variance = %s" % metrics.explainedVariance)

sc.stop()

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[0.03807590643342...|151.0|206.07345904776457|
|[-0.0018820165277...| 75.0| 68.11130074493124|
|[0.08529890629667...|141.0| 176.8404128347199|
|[-0.0890629393522...|206.0|166.85823029221248|
|[0.00538306037424...|135.0|128.45256889870683|
|[-0.0926954778032...| 97.0|106.33594406619089|
|[-0.0454724779400...|138.0| 73.98067035208025|
|[0.06350367559056...| 63.0|118.92115653222103|
|[0.04170844488444...|110.0|158.82436723778153|
|[-0.0709002470971...|310.0| 213.5851645958205|
|[-0.0963280162542...|101.0| 97.14573768173105|
|[0.02717829108036...| 69.0| 95.26108457593739|
|[0.01628067572730...|179.0|115.05862144632293|
|[0.00538306037424...|185.0|164.62484148553483|
|[0.04534098333546...|118.0|103.05032974805697|
|[-0.0527375548420...|171.0|177.10280850293867|
|[-0.0055145549788...|166.0|211.70553577361468|
|[0.0707687524926,...|144.0|182.82798585

In [10]:
print(LR_model)

TrainValidationSplitModel_3b046625878d


In [55]:
from pyspark.mllib.evaluation import RegressionMetrics

# Metrics object needs to have an RDD of (prediction, observation) pairs.


valuesAndPreds = 

# Instantiate metrics object
metrics = RegressionMetrics(valuesAndPreds)

# Squared Error
print("MSE = %s" % metrics.meanSquaredError)
print("RMSE = %s" % metrics.rootMeanSquaredError)

# R-squared
print("R-squared = %s" % metrics.r2)

# Mean absolute error
print("MAE = %s" % metrics.meanAbsoluteError)

# Explained variance
print("Explained variance = %s" % metrics.explainedVariance)




AttributeError: 'TrainValidationSplitModel' object has no attribute 'metricName'

In [27]:
sc.stop()

In [47]:
print(diabetes.data)
print(len(temporary))

[[ 0.03807591  0.05068012  0.06169621 ... -0.00259226  0.01990842
  -0.01764613]
 [-0.00188202 -0.04464164 -0.05147406 ... -0.03949338 -0.06832974
  -0.09220405]
 [ 0.08529891  0.05068012  0.04445121 ... -0.00259226  0.00286377
  -0.02593034]
 ...
 [ 0.04170844  0.05068012 -0.01590626 ... -0.01107952 -0.04687948
   0.01549073]
 [-0.04547248 -0.04464164  0.03906215 ...  0.02655962  0.04452837
  -0.02593034]
 [-0.04547248 -0.04464164 -0.0730303  ... -0.03949338 -0.00421986
   0.00306441]]
442


In [28]:
'''
!!!! COPY !!!!

4> Predicting Diabetes using LinearRegression from MLib (Machine Learning library from Spark) 

This Diabetes dataset downloaded from Sklearn has ten baseline variables, age, sex, body mass index, average blood pressure, and six blood serum measurements were 
obtained for each of n = 442 diabetes patients, as well as the response of interest, a quantitative measure of disease 
progression one year after baseline.

A fasting blood sugar level less than 100 mg/dL (5.6 mmol/L) is normal. A fasting blood sugar level from 100 to 
125 mg/dL (5.6 to 6.9 mmol/L) is considered prediabetes. If it's 126 mg/dL (7 mmol/L) or higher on two separate 
tests, you have diabetes. Oral glucose tolerance test.
'''

from pyspark.ml.regression import LinearRegression
from sklearn import datasets

# Import and clean data. Pyspark uses its own type system and unfortunately it doesn't deal with numpy well. 
# It works with python types though. So you need to manually convert the numpy.float64 to float.

diabetes = datasets.load_diabetes()
diabetes_data= []

# Spark uses breeze under the hood for high performance Linear Algebra in Scala. In Spark, MLlib and other 
# ML algorithms depends on org.apache.spark.mllib.libalg.Vector type which is rather dense or sparse.

for feature_list in diabetes.data[:]:
    temp= [float(i) for i in feature_list]
    diabetes_data.append(Vectors.dense(temp))
    
diabetes_target = [float(i) for i in diabetes.target]
features_and_predictions = list(zip(diabetes_target, diabetes_train))


sc = pyspark.SparkContext(appName="LinearRegression_Diabetes")
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(features_and_predictions, ["label", "features"])

# Only max iterations is set. We will set parameters for the algorithm after ParamGridSearch
lr = LinearRegression(maxIter=10)


# Fit the model to the data.
lrModel = lr.fit(df)


# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
sc.stop()

