In [None]:
# Use the Azure Machine Learning data collector to log various metrics
from azureml.logging import get_azureml_logger
logger = get_azureml_logger()

In [None]:
print('hello')

In [None]:
# Use the Azure Machine Learning data preparation package
from azureml.dataprep import package

# This call will load the referenced package and return a DataFrame.
# If run in a PySpark environment, this call returns a
# Spark DataFrame. If not, it returns a Pandas DataFrame.
df = package.run('dataPrepPkg.dprep', dataflow_idx=0)

# Remove this line and add code that uses the DataFrame
df.head(5)


In [None]:
# Use Azure Machine Learning history magic to control history collection
# History is off by default, options are "on", "off", or "show"
%azureml history on

In [None]:
prepped = df.na.fill(0)
display(prepped.limit(5))

Now that all of our data is prepped. We're going to have to put all of it into one column of a vector type for Spark MLLib. This makes it easy to embed a prediction right in a DataFrame and also makes it very clear as to what is getting passed into the model and what isn't without have to convert it to a numpy array or specify an R formula. This also makes it easy to incrementally add new features, simply by adding to the vector. In the below case rather than specifically adding them in, I'm going to create a exclusionary group and just remove what is NOT a feature.

In [None]:
nonFeatureCols = ["zip", "zipcode", "count"]
featureCols = list(set(prepped.columns)-set(nonFeatureCols))

%md Now I'm going to use the `VectorAssembler` in Apache Spark to Assemble all of these columns into one single vector. To do this I'll have to set the input columns and output column. Then I'll use that assembler to transform the prepped data to my final dataset.

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(\
  inputCols =featureCols\
  ,outputCol = "features")

finalPrep = assembler.transform(prepped)

In [None]:
training, test = finalPrep.randomSplit([0.7, 0.3])

# // Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count())
print(test.count())

In [None]:
from pyspark.ml.regression import LinearRegression
lrModel = LinearRegression(\
  labelCol = "count"\
  ,featuresCol = "features"\
  ,elasticNetParam =0.5)

print("Printing out the model Parameters:")
print("-"*20)
print(lrModel.explainParams())
logger.log("modelParams",lrModel.explainParams())
print("-"*20)

In [None]:
from pyspark.mllib.evaluation import RegressionMetrics

lrFitted = lrModel.fit(training)

In [None]:
holdout = lrFitted\
  .transform(test)\
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "count", 
    """CASE double(round(prediction)) = count 
  WHEN true then 1
  ELSE 0
END as equal""")
display(holdout.limit(5))

In [None]:
display(holdout.selectExpr("sum(equal)/sum(1)"))

In [None]:
# // have to do a type conversion for RegressionMetric
rm = RegressionMetrics(holdout.select("prediction", "count").rdd.map(lambda x: (x[0], x[1])))

print("MSE: " + str(rm.meanSquaredError))
logger.log("MSE",str(rm.meanSquaredError))
print("MAE: " + str(rm.meanAbsoluteError))
logger.log("")
print("RMSE Squared: " + str(rm.rootMeanSquaredError))
logger.log("")
print("R Squared: " + str(rm.r2))
logger.log("")
print("Explained Variance: " + str(rm.explainedVariance) + "\n")
logger.log("")

In [None]:
%azureml history off