In [1]:
%fs ls /databricks-datasets/structured-streaming/events/

In [2]:
%fs head /databricks-datasets/structured-streaming/events/file-0.json

In [3]:
%scala
import org.apache.spark.sql.types._

val inputPath = "/databricks-datasets/structured-streaming/events/"

// Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)

val staticInputDF = 
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)

display(staticInputDF)

In [4]:
%scala
import org.apache.spark.sql.functions._

val staticCountsDF = 
  staticInputDF
    .groupBy($"action", window($"time", "1 hour"))
    .count()   

// Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [5]:
%sql select action, sum(count) as total_count from static_counts group by action

In [6]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

In [7]:
%scala
import org.apache.spark.sql.functions._

// Similar to definition of staticInputDF above, just using `readStream` instead of `read`
val streamingInputDF = 
  spark
    .readStream                       // `readStream` instead of `read` for creating streaming DataFrame
    .schema(jsonSchema)               // Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  // Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)

// Same query as staticInputDF
val streamingCountsDF = 
  streamingInputDF
    .groupBy($"action", window($"time", "1 hour"))
    .count()

// Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [8]:
%scala
spark.conf.set("spark.sql.shuffle.partitions", "1")  // keep the size of shuffles small

val query =
  streamingCountsDF
    .writeStream
    .format("memory")        // memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     // counts = name of the in-memory table
    .outputMode("complete")  // complete = all the counts should be in the table
    .start()

In [9]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

In [10]:
%scala
Thread.sleep(5000)  // wait a bit more for more data to be computed

In [11]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

In [12]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

In [13]:
%sql select action, sum(count) as total_count from counts group by action order by action

In [14]:
spark

In [15]:
%fs ls /databricks-datasets/structured-streaming/events/

In [16]:
%fs head /databricks-datasets/structured-streaming/events/file-0.json

In [17]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# Static DataFrame representing data in the JSON files
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)

In [18]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action, 
       window(staticInputDF.time, "1 hour"))    
    .count()
)
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")


In [19]:
%sql select action, sum(count) as total_count from static_counts group by action

In [20]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

In [21]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [22]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [23]:
from time import sleep
sleep(5)  # wait a bit for computation to start

In [24]:
sleep(5)  # wait a bit more for more data to be computed

In [25]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

In [26]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

In [27]:
%sql select action, sum(count) as total_count from counts group by action order by action

In [28]:
%fs ls databricks-datasets/adult/adult.data

In [29]:
%sql DROP TABLE IF EXISTS adult

In [30]:
%sql

CREATE TABLE adult (
  age DOUBLE,
  workclass STRING,
  fnlwgt DOUBLE,
  education STRING,
  education_num DOUBLE,
  marital_status STRING,
  occupation STRING,
  relationship STRING,
  race STRING,
  sex STRING,
  capital_gain DOUBLE,
  capital_loss DOUBLE,
  hours_per_week DOUBLE,
  native_country STRING,
  income STRING)
USING com.databricks.spark.csv
OPTIONS (path "/databricks-datasets/adult/adult.data", header "true")

In [31]:
dataset = spark.table("adult")
cols = dataset.columns

In [32]:
display(dataset)

In [33]:
###One-Hot Encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [34]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "income", outputCol = "label")
stages += [label_stringIdx]

In [35]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [36]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

In [37]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

In [38]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [39]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [40]:
predictions.printSchema()

In [41]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

In [42]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [43]:
evaluator.getMetricName()

In [44]:
print lr.explainParams()

In [45]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [46]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)

In [47]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [48]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [49]:
print 'Model Intercept: ', cvModel.bestModel.intercept

In [50]:
weights = cvModel.bestModel.weights
# on Spark 2.X weights are available as ceofficients
# weights = cvModel.bestModel.coefficients
weights = map(lambda w: (float(w),), weights)  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

In [51]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

In [52]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [53]:
print "numNodes = ", dtModel.numNodes
print "depth = ", dtModel.depth

In [54]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [55]:
predictions.printSchema()

In [56]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

In [57]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [58]:
dt.getImpurity()

In [59]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())

In [60]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

In [61]:
print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth

In [62]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [63]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [64]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

In [65]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [66]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [67]:
predictions.printSchema()

In [68]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

In [69]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [70]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [71]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [72]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [73]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [74]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

In [75]:
bestModel = cvModel.bestModel

In [76]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)

In [77]:
# Evaluate best model
evaluator.evaluate(finalPredictions)

In [78]:
finalPredictions.createOrReplaceTempView("finalPredictions")

In [79]:
%sql
SELECT occupation, incomePrediction, count(*) AS count
FROM deploymentTable
GROUP BY occupation, incomePrediction
ORDER BY occupation

In [80]:
%sql
SELECT age, incomePrediction, count(*) AS count
FROM deploymentTable
GROUP BY age, incomePrediction
ORDER BY age

In [81]:
# We use the sqlContext.read method to read the data and set a few options:
#  'format': specifies the Spark CSV data source
#  'header': set to true to indicate that the first line of the CSV data file is a header
# The file is called 'hour.csv'.
if sc.version >= '2.0':
  # Spark 2.0+ includes CSV as a native Spark SQL datasource.
  df = sqlContext.read.format('csv').option("header", 'true').load("/databricks-datasets/bikeSharing/data-001/hour.csv")
else:
  # Earlier Spark versions can use the Spark CSV package
  df = sqlContext.read.format('com.databricks.spark.csv').option("header", 'true').load("/databricks-datasets/bikeSharing/data-001/hour.csv")
# Calling cache on the DataFrame will make sure we persist it in memory the first time it is used.
# The following uses will be able to read from memory, instead of re-reading the data from disk.
df.cache()

In [82]:
display(df)

In [83]:
print "Our dataset has %d rows." % df.count()

In [84]:
df = df.drop("instant").drop("dteday").drop("casual").drop("registered")
display(df)


In [85]:
df.printSchema()

In [86]:
# The following call takes all columns (df.columns) and casts them using Spark SQL to a numeric type (DoubleType).
from pyspark.sql.functions import col  # for indicating a column using a string in the line below
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

In [87]:
# Split the dataset randomly into 70% for training and 30% for testing.
train, test = df.randomSplit([0.7, 0.3])
print "We have %d training examples and %d test examples." % (train.count(), test.count())

In [88]:
display(train.select("hr", "cnt"))


In [89]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('cnt')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [90]:
from pyspark.ml.regression import GBTRegressor
# Takes the "features" column and learns to predict "cnt"
gbt = GBTRegressor(labelCol="cnt")

In [91]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble
# In this example notebook, we keep these values small.  In practice, to get the highest accuracy, you would likely want to try deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [92]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [93]:
pipelineModel = pipeline.fit(train)

In [94]:
predictions = pipelineModel.transform(test)

In [95]:
display(predictions.select("cnt", "prediction", *featuresCols))

In [96]:
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse

In [97]:
display(predictions.select("hr", "prediction"))

In [98]:
displayHTML(sc.wholeTextFiles("/databricks-datasets/Rdatasets/data-001/doc/ggplot2/diamonds.html").take(1)[0][1])

In [99]:
%r
data(diamonds)

In [100]:


displayHTML(sc.wholeTextFiles("/databricks-datasets/Rdatasets/data-001/doc/ggplot2/diamonds.html").take(1)[0][1])

# Load data into a Pandas dataframe
import pandas
import cStringIO
from pyspark.sql import *
localData = sc.wholeTextFiles("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv").collect()[0][1]
output = cStringIO.StringIO(localData)
pandasData = pandas.read_csv(output)
pandasData = pandasData.iloc[:,1:] # remove line number


In [101]:
pandasData

In [102]:
import matplotlib.pyplot as plt
plt.clf()
plt.plot(pandasData['carat'], pandasData['price'], '.')
plt.xlabel('carat')
plt.ylabel('price')
display()

In [103]:
# Create this plot by calling display on the Spark DataFrame, clicking the plot icon, selecting Plot Options, and creating a Histogram of 'carat' values.
sparkDataframe = sqlContext.createDataFrame(pandasData)
display(sparkDataframe)

In [104]:
pandasData['cut'] = pandasData['cut'].replace({'Fair':0, 'Good':1, 'Very Good':2, 'Premium':3, 'Ideal':4})
pandasData['color'] = pandasData['color'].replace({'J':0, 'I':1, 'H':2, 'G':3, 'F':4, 'E':5, 'D':6})
pandasData['clarity'] = pandasData['clarity'].replace({'I1':0, 'SI1':1, 'SI2':2, 'VS1':3, 'VS2':4, 'VVS1':5, 'VVS2':6, 'IF':7})
pandasData

In [105]:
# Split data into a labels dataframe and a features dataframe
labels = pandasData['price'].values
featureNames = ['carat', 'cut', 'color', 'clarity', 'depth', 'table', 'x', 'y', 'z']
features = pandasData[featureNames].values

In [106]:
# Normalize features (columns) to have unit variance
from sklearn.preprocessing import normalize
features = normalize(features, axis=0)
features

In [107]:
# Hold out 30% of the data for testing.  We will use the rest for training.
from sklearn.cross_validation import train_test_split
trainingLabels, testLabels, trainingFeatures, testFeatures = train_test_split(labels, features, test_size=0.3)
ntrain, ntest = len(trainingLabels), len(testLabels)
print 'Split data randomly into 2 sets: %d training and %d test instances.' % (ntrain, ntest)

In [108]:
# Train a model with fixed hyperparameters, and print out the intercept and coefficients.
from sklearn import linear_model
origAlpha = 0.5 # "alpha" is the regularization hyperparameter
origClf = linear_model.Ridge(alpha=origAlpha)
origClf.fit(features, labels)
print 'Trained model with fixed alpha = %g' % origAlpha
print '  Model intercept: %g' % origClf.intercept_
print '  Model coefficients:'
for i in range(len(featureNames)):
  print '    %g\t%s' % (origClf.coef_[i], featureNames[i])

In [109]:
# Score the initial model.  It does not do that well.
origScore = origClf.score(trainingFeatures, trainingLabels)
origScore

In [110]:
# We use scikit-learn's cross_validation module, which helps split our data randomly into k equal-size parts ("folds").
from sklearn import cross_validation
numFolds = 3 # You may want to use more (10 or so) in practice
kf = cross_validation.KFold(ntrain, n_folds=numFolds)



In [111]:
# "alphas" is a list of hyperparameter values to test
alphas = [0.0, 0.0001, 0.001, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0]
# Create a list of tasks to distribute
tasks = []
for alpha in alphas:
  for fold in range(numFolds):
    tasks = tasks + [(alpha, fold)]

In [112]:
# Create an RDD of tasks.  We set the number of partitions equal to the number of tasks to ensure maximum parallelism.
tasksRDD = sc.parallelize(tasks, numSlices = len(tasks))

In [113]:
trainingFeaturesBroadcast = sc.broadcast(trainingFeatures)
trainingLabelsBroadcast = sc.broadcast(trainingLabels)

In [114]:
def trainOneModel(alpha, fold):
  """
  Given 1 task (1 hyperparameter alpha value + 1 fold index), train the corresponding model.
  Return: model, score on the fold's test data, task info.
  """
  # Extract indices for this fold
  trainIndex, valIndex = [], []
  fold_ = 0 # index into folds 'kf'
  for trainIndex_, valIndex_ in kf:
    if fold_ == fold:
      trainIndex, valIndex = trainIndex_, valIndex_
      break
    fold_ += 1
  # Get training data from the broadcast variables
  localTrainingFeatures = trainingFeaturesBroadcast.value
  localTrainingLabels = trainingLabelsBroadcast.value
  X_train, X_val = localTrainingFeatures[trainIndex], localTrainingFeatures[valIndex]
  Y_train, Y_val = localTrainingLabels[trainIndex], localTrainingLabels[valIndex]
  # Train the model, and score it
  clf = linear_model.Ridge(alpha=alpha)
  clf.fit(X_train, Y_train)
  score = clf.score(X_val, Y_val)
  return clf, score, alpha, fold

In [115]:
# LEARN!  We now map our tasks RDD and apply the training function to each task.
# After we call an action ("count") on the results, the actual training is executed.
trainedModelAndScores = tasksRDD.map(lambda alpha_fold: trainOneModel(alpha_fold[0], alpha_fold[1]))
trainedModelAndScores.cache()
trainedModelAndScores.count()

In [116]:
# Since we are done with our broadcast variables, we can clean them up.
# (This will happen automatically, but we can make it happen earlier by explicitly unpersisting the broadcast variables.
trainingFeaturesBroadcast.unpersist()
trainingLabelsBroadcast.unpersist()

In [117]:
# Collect the results.
allScores = trainedModelAndScores.map(lambda x: (x[1], x[2], x[3])).collect()
# Average scores over folds
avgScores = dict(map(lambda alpha: (alpha, 0.0), alphas))
for score, alpha, fold in allScores:
  avgScores[alpha] += score
for alpha in alphas:
  avgScores[alpha] /= numFolds
avgScores

In [118]:
# Find best score
bestAlpha = -1
bestScore = -1
for alpha in alphas:
  if avgScores[alpha] > bestScore:
    bestAlpha = alpha
    bestScore = avgScores[alpha]
print 'Found best alpha: %g, which gives score: %g' % (bestAlpha, bestScore)

In [119]:
# Use Databricks' display() function to plot the scores vs. alpha.  We use a namedtuple to tell Databricks names for the columns (alpha and the score).
import numpy
from collections import namedtuple
Score = namedtuple('Score', 'log_alpha score')
display(map(lambda alpha: Score(float(numpy.log(alpha + 0.00000001)), float(avgScores[alpha])), avgScores))

In [120]:
# Use bestAlpha, and train a final model.
tunedClf = linear_model.Ridge(alpha=bestAlpha)
tunedClf.fit(trainingFeatures, trainingLabels)

In [121]:
origTrainingScore, origTestScore = origClf.score(trainingFeatures, trainingLabels), origClf.score(testFeatures, testLabels)
tunedTrainingScore, tunedTestScore = tunedClf.score(trainingFeatures, trainingLabels), tunedClf.score(testFeatures, testLabels)
print 'Compare original model (without hyperparameter tuning) and final model (with tuning) on test data\n'
print 'Model   \tAlpha\tTraining   \tTest'
print 'Original\t%g\t%g\t%g' % (origAlpha, origTrainingScore, origTestScore)
print 'Tuned   \t%g\t%g\t%g' % (bestAlpha, tunedTrainingScore, tunedTestScore)

In [122]:
print 'Tuned model with best alpha = %g' % bestAlpha
print '  Model intercept: %g' % tunedClf.intercept_
print '  Model coefficients:'
for i in range(len(featureNames)):
  print '    %g\t%s' % (tunedClf.coef_[i], featureNames[i])

In [123]:
# Convert the scikit-learn model into an equivalent MLlib model
from pyspark.mllib.regression import LinearRegressionModel
mllibModel = LinearRegressionModel(tunedClf.coef_, tunedClf.intercept_)
mllibModel

In [124]:
# Demonstrate that the models compute the same predictions
sklearnPredictions = tunedClf.predict(testFeatures)
mllibPredictions = numpy.array(map(lambda x: mllibModel.predict(x), testFeatures))
differences = sklearnPredictions - mllibPredictions
sumSquaredDifferences = sum(differences * differences)
print 'Total difference between scikit-learn and MLlib model predictions: %g' % sumSquaredDifferences

In [125]:
import org.apache.spark.h2o._
val h2oConf = new H2OConf(sc).set("spark.ui.enabled", "false")
val h2oContext = H2OContext.getOrCreate(sc, h2oConf)

In [126]:
%fs ls /databricks-datasets/sms_spam_collection/data-001

In [127]:
%sql 
DROP TABLE IF EXISTS smsData;
CREATE TABLE smsData(hamorspam string, msg string)
USING com.databricks.spark.csv
OPTIONS (path "dbfs:/databricks-datasets/sms_spam_collection/data-001/smsData.csv", delimiter "\t", inferSchema "true")

In [128]:
spark.table("smsData").printSchema

In [129]:
%sql select * from smsData

In [130]:
%sql select  hamorspam, count(1) cnt from smsdata group by hamorspam

In [131]:
%scala
case class sms(id : Long, hamorspam : String, msg : String)
val smsData = spark.table("smsData").rdd.zipWithIndex.map{x => sms(x._2, x._1.getString(0), x._1.getString(1))}.toDF()

In [132]:
%scala
spark.udf.register("removeSpecialChars", (s : String) => {
    val ignoredChars = Seq(',', ':', ';', '/', '<', '>', '"', '.', '(', ')', '?', '-', '\'','!','0', '1')
    var result : String = s
    for( c <- ignoredChars) {
         result = result.replace(c, ' ')
    }
    result
 })
 
 val smsDataNoSpecialChars = smsData.selectExpr("*", "removeSpecialChars(msg) as msg_no_special_chars")

In [133]:
%scala
import org.apache.spark.ml.feature.{Tokenizer, StopWordsRemover, IDF, VectorAssembler, HashingTF, StringIndexer}
import org.apache.spark.ml.Pipeline


 

val tok = new Tokenizer()
  .setInputCol("msg_no_special_chars")
  .setOutputCol("words")
val sw = new StopWordsRemover()
  .setInputCol("words")
  .setOutputCol("filtered_words")
val tf = new HashingTF()
  .setInputCol("filtered_words")
  .setOutputCol("tf")
  .setNumFeatures(10000)
val idf = new IDF()
  .setInputCol("tf")
  .setOutputCol("tf_idf")
val labeler = new StringIndexer()
  .setInputCol("hamorspam")
  .setOutputCol("label")
val assembler = new VectorAssembler()
  .setInputCols(Array("tf_idf"))
  .setOutputCol("features")

val pipeline = new Pipeline().setStages(Array(tok, sw, tf, idf, labeler, assembler))

In [134]:
%scala
val sparkDataPrepModel = pipeline.fit(smsDataNoSpecialChars)
val sparkPreparedData = sparkDataPrepModel.transform(smsDataNoSpecialChars)

In [135]:
%scala
display(sparkPreparedData)

In [136]:
%scala
import h2oContext._
import h2oContext.implicits._
import hex.deeplearning.{DeepLearningModel, DeepLearning}
import hex.deeplearning.DeepLearningModel.DeepLearningParameters

In [137]:
%scala
val Array(trainDf, testDf) = sparkPreparedData.randomSplit(Array(0.7, 0.3))

In [138]:
%scala
val train = h2oContext.asH2OFrame(trainDf.select("id", "label", "features"))
val valid = h2oContext.asH2OFrame(testDf.select("id", "label", "features"))

In [139]:
%scala
val dlParams = new DeepLearningParameters()
    dlParams._train = train
    dlParams._valid = valid
    dlParams._response_column = 'target
    dlParams._epochs = 10
    dlParams._l1 = 0.001
    dlParams._hidden = Array[Int](200, 200)
    dlParams._response_column = "label"
    dlParams._ignored_columns = Array("id")

val dl = new DeepLearning(dlParams)
val dlModel = dl.trainModel.get

In [140]:
%scala
dlModel

In [141]:
%scala
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, explode, udf}
import org.apache.spark.sql.types.{DataTypes, StructType}

/**
 * An example demonstrating how to write a custom Transformer in a 3rd-party application.
 * This example intentionally avoids using any private Spark APIs.
 *
 * @param uid  All types inheriting from `Identifiable` require a `uid`.
 *             This includes Transformers, Estimators, and Models.
 */
class MyFlatMapTransformer(override val uid: String) extends Transformer {

  // Transformer Params
  // Defining a Param requires 3 elements:
  //  - Param definition
  //  - Param getter method
  //  - Param setter method
  // (The getter and setter are technically not required, but they are nice standards to follow.)

  /**
   * Param for input column name.
   */
  final val inputCol: Param[String] = new Param[String](this, "inputCol", "input column name")

  final def getInputCol: String = $(inputCol)

  final def setInputCol(value: String): MyFlatMapTransformer = set(inputCol, value)

  /**
   * Param for output column name.
   */
  final val outputCol: Param[String] = new Param[String](this, "outputCol", "output column name")

  final def getOutputCol: String = $(outputCol)

  final def setOutputCol(value: String): MyFlatMapTransformer = set(outputCol, value)

  // (Optional) You can set defaults for Param values if you like.
  setDefault(inputCol -> "myInputCol", outputCol -> "myOutputCol")

  // Transformer requires 3 methods:
  //  - transform
  //  - transformSchema
  //  - copy

  // Our flatMap will split strings by commas.
  private val myFlatMapFunction: String => Seq[String] = { input: String =>
    input.split(",")
  }

  /**
   * This method implements the main transformation.
   * Its required semantics are fully defined by the method API: take a Dataset or DataFrame,
   * and return a DataFrame.
   *
   * Most Transformers are 1-to-1 row mappings which add one or more new columns and do not
   * remove any columns.  However, this restriction is not required.  This example does a flatMap,
   * so we could either (a) drop other columns or (b) keep other columns, making copies of values
   * in each row as it expands to multiple rows in the flatMap.  We do (a) for simplicity.
   */
  override def transform(dataset: Dataset[_]): DataFrame = {
    val flatMapUdf = udf(myFlatMapFunction)
    dataset.select(explode(flatMapUdf(col($(inputCol)))).as($(outputCol)))
  }

  /**
   * Check transform validity and derive the output schema from the input schema.
   *
   * We check validity for interactions between parameters during `transformSchema` and
   * raise an exception if any parameter value is invalid. Parameter value checks which
   * do not depend on other parameters are handled by `Param.validate()`.
   *
   * Typical implementation should first conduct verification on schema change and parameter
   * validity, including complex parameter interaction checks.
   */
  override def transformSchema(schema: StructType): StructType = {
    // Validate input type.
    // Input type validation is technically optional, but it is a good practice since it catches
    // schema errors early on.
    val actualDataType = schema($(inputCol)).dataType
    require(actualDataType.equals(DataTypes.StringType),
      s"Column ${$(inputCol)} must be StringType but was actually $actualDataType.")

    // Compute output type.
    // This is important to do correctly when plugging this Transformer into a Pipeline,
    // where downstream Pipeline stages may expect use this Transformer's output as their input.
    DataTypes.createStructType(
      Array(
        DataTypes.createStructField($(outputCol), DataTypes.StringType, false)
      )
    )
  }

  /**
   * Creates a copy of this instance.
   * Requirements:
   *  - The copy must have the same UID.
   *  - The copy must have the same Params, with some possibly overwritten by the `extra`
   *    argument.
   *  - This should do a deep copy of any data members which are mutable.  That said,
   *    Transformers should generally be immutable (except for Params), so the `defaultCopy`
   *    method often suffices.
   * @param extra  Param values which will overwrite Params in the copy.
   */
  override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
}

In [142]:
%scala
val data = spark.createDataFrame(Seq(
  ("hi,there", 1),
  ("a,b,c", 2),
  ("no", 3)
)).toDF("myInputCol", "id")
val myTransformer = new MyFlatMapTransformer("myFlatMapper")
println(s"Original data has ${data.count()} rows.")

In [143]:
%scala
display(data)

In [144]:
%scala
val output = myTransformer.transform(data)
println(s"Output data has ${output.count()} rows.")

In [145]:
%scala
display(output)