In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('human activity recognition') \
    .getOrCreate()

In [2]:
spark

In [3]:
df1 = (spark.read.format("csv").option('header', 'true').load("Phones_accelerometer.csv"))

In [4]:
df1.show(3)

+-----+-------------+-------------------+----------+------------------+--------+----+------+--------+-----+
|Index| Arrival_Time|      Creation_Time|         x|                 y|       z|User| Model|  Device|   gt|
+-----+-------------+-------------------+----------+------------------+--------+----+------+--------+-----+
|    0|1424696633908|1424696631913248572| -5.958191|         0.6880646|8.135345|   a|nexus4|nexus4_1|stand|
|    1|1424696633909|1424696631918283972|  -5.95224|         0.6702118|8.136536|   a|nexus4|nexus4_1|stand|
|    2|1424696633918|1424696631923288855|-5.9950867|0.6535491999999999|8.204376|   a|nexus4|nexus4_1|stand|
+-----+-------------+-------------------+----------+------------------+--------+----+------+--------+-----+
only showing top 3 rows



In [5]:
df1.dtypes

[('Index', 'string'),
 ('Arrival_Time', 'string'),
 ('Creation_Time', 'string'),
 ('x', 'string'),
 ('y', 'string'),
 ('z', 'string'),
 ('User', 'string'),
 ('Model', 'string'),
 ('Device', 'string'),
 ('gt', 'string')]

In [6]:
from pyspark.sql.functions import col
dataset1 = df1.select(col('Arrival_Time').cast('double'),
                         col('Creation_Time').cast('double'),
                         col('x').cast('float'),
                         col('y').cast('float'),
                         col('z').cast('float'),
                         col('gt'),
                         )
dataset1.show()

+-----------------+--------------------+----------+----------+---------+-----+
|     Arrival_Time|       Creation_Time|         x|         y|        z|   gt|
+-----------------+--------------------+----------+----------+---------+-----+
|1.424696633908E12|1.424696631913248...| -5.958191| 0.6880646| 8.135345|stand|
|1.424696633909E12|1.424696631918284...|  -5.95224| 0.6702118| 8.136536|stand|
|1.424696633918E12|1.424696631923288...|-5.9950867| 0.6535492| 8.204376|stand|
|1.424696633919E12|1.424696631928385...|-5.9427185| 0.6761627| 8.128204|stand|
|1.424696633929E12|1.424696631933420...| -5.991516|0.64164734| 8.135345|stand|
|1.424696633929E12|1.424696631938456...| -5.965332| 0.6297455| 8.128204|stand|
|1.424696633938E12|1.424696631943522...| -5.991516| 0.6356964|  8.16272|stand|
|1.424696633939E12|1.424696631948496...| -5.915344|0.63093567| 8.105591|stand|
|1.424696633951E12|1.424696631953592...| -5.984375| 0.6940155| 8.067505|stand|
|1.424696633952E12|1.424696631960428...| -5.937958|0

In [7]:
from pyspark.ml.feature import StringIndexer
dataset1 = StringIndexer(
    inputCol='gt', 
    outputCol='gt_index', 
    handleInvalid='keep').fit(dataset1).transform(dataset1)

In [8]:
dataset1 = dataset1.drop('gt')

In [9]:
# Assemble all the features with VectorAssembler
required_features = ['Arrival_Time',
                     'Creation_Time',
                     'x',
                     'y',
                     'z']
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=required_features, outputCol='features')
transformed_data = assembler.transform(dataset1)

In [10]:
transformed_data.show()

+-----------------+--------------------+----------+----------+---------+--------+--------------------+
|     Arrival_Time|       Creation_Time|         x|         y|        z|gt_index|            features|
+-----------------+--------------------+----------+----------+---------+--------+--------------------+
|1.424696633908E12|1.424696631913248...| -5.958191| 0.6880646| 8.135345|     2.0|[1.424696633908E1...|
|1.424696633909E12|1.424696631918284...|  -5.95224| 0.6702118| 8.136536|     2.0|[1.424696633909E1...|
|1.424696633918E12|1.424696631923288...|-5.9950867| 0.6535492| 8.204376|     2.0|[1.424696633918E1...|
|1.424696633919E12|1.424696631928385...|-5.9427185| 0.6761627| 8.128204|     2.0|[1.424696633919E1...|
|1.424696633929E12|1.424696631933420...| -5.991516|0.64164734| 8.135345|     2.0|[1.424696633929E1...|
|1.424696633929E12|1.424696631938456...| -5.965332| 0.6297455| 8.128204|     2.0|[1.424696633929E1...|
|1.424696633938E12|1.424696631943522...| -5.991516| 0.6356964|  8.16272| 

In [11]:
df1.count()

13062475

In [12]:
dataset1.show()

+-----------------+--------------------+----------+----------+---------+--------+
|     Arrival_Time|       Creation_Time|         x|         y|        z|gt_index|
+-----------------+--------------------+----------+----------+---------+--------+
|1.424696633908E12|1.424696631913248...| -5.958191| 0.6880646| 8.135345|     2.0|
|1.424696633909E12|1.424696631918284...|  -5.95224| 0.6702118| 8.136536|     2.0|
|1.424696633918E12|1.424696631923288...|-5.9950867| 0.6535492| 8.204376|     2.0|
|1.424696633919E12|1.424696631928385...|-5.9427185| 0.6761627| 8.128204|     2.0|
|1.424696633929E12|1.424696631933420...| -5.991516|0.64164734| 8.135345|     2.0|
|1.424696633929E12|1.424696631938456...| -5.965332| 0.6297455| 8.128204|     2.0|
|1.424696633938E12|1.424696631943522...| -5.991516| 0.6356964|  8.16272|     2.0|
|1.424696633939E12|1.424696631948496...| -5.915344|0.63093567| 8.105591|     2.0|
|1.424696633951E12|1.424696631953592...| -5.984375| 0.6940155| 8.067505|     2.0|
|1.424696633952E

In [13]:
(training_data, test_data) = transformed_data.randomSplit([0.7,0.3])

import time
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='gt_index', 
                            featuresCol='features',
                            )
start_time = time.perf_counter()
model = rf.fit(training_data)
predictions = model.transform(test_data)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='gt_index', 
    predictionCol='prediction', 
    metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)
end_time = time.perf_counter()

training_time = end_time - start_time

print("The time taken to train the data is: %0.3f seconds" %training_time)


import time
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='gt_index', 
                            featuresCol='features',
                            maxDepth = 5)
start_time = time.perf_counter()
model = rf.fit(training_data)
predictions = model.transform(test_data)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='gt_index', 
    predictionCol='prediction', 
    metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)
end_time = time.perf_counter()

training_time = end_time - start_time

print("The time taken to train the data is: %0.3f seconds" %training_time)

import time
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='gt_index', 
                            featuresCol='features',
                            maxDepth = 10)
start_time = time.perf_counter()
model = rf.fit(training_data)
predictions = model.transform(test_data)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='gt_index', 
    predictionCol='prediction', 
    metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)
end_time = time.perf_counter()

training_time = end_time - start_time

print("The time taken to train the data is: %0.3f seconds" %training_time)

import time
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol='gt_index', 
                            featuresCol='features',
                           )
start_time = time.perf_counter()
model = gbt.fit(training_data)
predictions = model.transform(test_data)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='gt_index', 
    predictionCol='prediction', 
    metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)
end_time = time.perf_counter()

training_time = end_time - start_time

print("The time taken to train the data is: %0.3f seconds" %training_time)

In [14]:
import time
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'gt_index', maxIter=10)

start_time = time.perf_counter()
lrModel = lr.fit(training_data)
predictions = lrModel.transform(test_data)

# Evaluate our model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='gt_index', 
    predictionCol='prediction', 
    metricName='accuracy')

accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)

end_time = time.perf_counter()
training_time = end_time - start_time
print("The time taken to train the data is: %0.3f seconds" %training_time)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 503, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3397, in run_code
    exec(code_obj, self.user_global_ns, self

ConnectionRefusedError: [Errno 111] Connection refused