In [1]:
# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet

# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

df.createOrReplaceTempView('df')

df.show(5)

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200214032343-0000
KERNEL_ID = 02823108-e91d-4d83-8dce-d9e23e8dc688
--2020-02-14 03:23:45--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 192.30.253.112
Connecting to github.com (github.com)|192.30.253.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/coursera/master/hmp.parquet [following]
--2020-02-14 03:23:46--  https://raw.githubusercontent.com/IBM/coursera/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 199.232.8.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|199.232.8.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 932997 (911K) [application/octet-stream]
Saving to: 'hmp.parquet'


2020-02-14 03:23:46 (27.7 MB/s) - 'hmp.parquet' saved [932997/932997]

+---+---+---+--------------------+-

In [2]:
df_energy = spark.sql("""
    select sqrt(sum(x*x) + sum(y*y) + sum(z*z)) as label, class from df group by class
""")

In [3]:
df_energy.createOrReplaceTempView('df_energy')

In [19]:
df_join = spark.sql("""
    
    select x, y, z, label, df.class from df inner join df_energy on df.class=df_energy.class

""")
df_join.show(5)

+---+---+---+-----------------+-----------+
|  x|  y|  z|            label|      class|
+---+---+---+-----------------+-----------+
| 22| 49| 35|11785.39634462923|Brush_teeth|
| 22| 49| 35|11785.39634462923|Brush_teeth|
| 22| 52| 35|11785.39634462923|Brush_teeth|
| 22| 52| 35|11785.39634462923|Brush_teeth|
| 21| 52| 34|11785.39634462923|Brush_teeth|
+---+---+---+-----------------+-----------+
only showing top 5 rows



In [5]:
from pyspark.ml.feature import VectorAssembler, Normalizer
from pyspark.ml import Pipeline

vectorAssembler = VectorAssembler(inputCols=['x','y','z'], outputCol='features')
normalizer = Normalizer(inputCol='features', outputCol='features_norm', p=1.0)

In [6]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [7]:
pipeline = Pipeline(stages=[vectorAssembler, normalizer, lr])
model = pipeline.fit(df_join)
prediction=model.transform(df_join)

model.stages[2].summary

<pyspark.ml.regression.LinearRegressionTrainingSummary at 0x7f8aa420a3c8>

In [14]:
model.stages[2].summary

<pyspark.ml.regression.LinearRegressionTrainingSummary at 0x7f8a802ef668>

In [15]:
model.stages[2].summary.r2

0.03259100556263628

In [25]:
df_join = df_join.drop('label')
df_join.show(5)

+---+---+---+-----------+
|  x|  y|  z|      class|
+---+---+---+-----------+
| 22| 49| 35|Brush_teeth|
| 22| 49| 35|Brush_teeth|
| 22| 52| 35|Brush_teeth|
| 22| 52| 35|Brush_teeth|
| 21| 52| 34|Brush_teeth|
+---+---+---+-----------+
only showing top 5 rows



In [26]:
(df_train, df_test) = df_join.randomSplit([0.8,0.2])


In [27]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [28]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

indexer = StringIndexer(inputCol='class', outputCol='label')
encoder = OneHotEncoder(inputCol='class_index', outputCol='class_encoded')

pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
model = pipeline.fit(df_train)

In [29]:
prediction = model.transform(df_train)

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluation = MulticlassClassificationEvaluator().setMetricName('accuracy').setLabelCol('label').setPredictionCol('prediction')

In [31]:
evaluation.evaluate(prediction)

0.20685364701145537

In [32]:
evaluation.evaluate(model.transform(df_test))

0.2055918091809181