# Feature Extraction aus InfluxDB und Klassifikation

## Imports

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
from influxdb_client import InfluxDBClient
import matplotlib.pyplot as plt

## Daten Laden
Wier wollen die Daten direkt aus Influx Laden.
Dafür bauen wir zuerst eine Connection zur Influx Instanz auf

In [3]:
BUCKET = "extended_labels"
URL = "https://css21.teco.edu"
TOKEN = "***REMOVED***"
ORG = "css21"
client = InfluxDBClient(url=URL, token=TOKEN, org=ORG, verify_ssl=False)
query_api = client.query_api()

### Querying Influx
Der Query wird in Flux geschrieben

In [27]:
query = '''
    from(bucket: bucket)
      |> range(start: -60d, stop: now())
      |> filter(fn: (r) => r.label != "testing")
      |> filter(fn: (r) => r.label != "standing")
      |> pivot(rowKey: ["_time", "label", "subject"], columnKey: ["_field", "_measurement"], valueColumn: "_value")
'''
params = {
    "bucket": BUCKET
}
result = query_api.query_data_frame(query, params=params)
result = result.set_index("_time", drop=True)
result = result.drop(columns=["table", "result", "_start", "_stop", "browser", "mobile"])
result["label"] = pd.Categorical(result["label"])
result




Unnamed: 0_level_0,label,subject,alpha_devicemotion,beta_devicemotion,gamma_devicemotion,x_devicemotion,x0_devicemotion,y_devicemotion,y0_devicemotion,z_devicemotion,z0_devicemotion,alpha_deviceorientation,beta_deviceorientation,gamma_deviceorientation
_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
2021-08-14 11:58:37.871000+00:00,driving,0c86078c3e,23.695926,-41.337541,-9.004711,0.774075,0.479989,-5.938367,0.633047,-9.519047,-2.245780,,,
2021-08-14 11:58:37.887000+00:00,driving,0c86078c3e,52.862054,-18.493067,-2.514026,0.685190,0.455864,-6.186018,0.483131,-9.409512,-2.223397,,,
2021-08-14 11:58:37.903000+00:00,driving,0c86078c3e,65.476283,4.852037,-1.505551,0.137966,-0.083518,-5.850979,0.967563,-8.309975,-1.265203,,,
2021-08-14 11:58:37.920000+00:00,driving,0c86078c3e,59.653137,3.318965,-8.634220,-0.124648,-0.371062,-6.084713,0.887691,-7.275830,-0.384150,,,
2021-08-14 11:58:37.937000+00:00,driving,0c86078c3e,51.026444,-1.898931,-12.700321,-0.046388,-0.306667,-6.291212,0.746649,-6.849212,-0.024907,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-08-21 11:42:30.367000+00:00,studying,hj44jjh740,,,,,,,,,,6.368002,52.924892,-9.198874
2021-08-21 11:42:30.383000+00:00,studying,hj44jjh740,,,,,,,,,,6.306407,53.026791,-9.471686
2021-08-21 11:42:30.400000+00:00,studying,hj44jjh740,,,,,,,,,,6.221469,52.874954,-9.370203
2021-08-21 11:42:30.417000+00:00,studying,hj44jjh740,,,,,,,,,,6.290653,52.814398,-9.351599


## Feature Extraction

In [28]:
import findspark
findspark.init("/opt/apache-spark/")

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [29]:
df = spark.createDataFrame(result.reset_index(
))

In [30]:
from pyspark.sql.functions import udf, col,  window, mean, sum as _sum, max as _max, min as _min, var_samp, to_timestamp
from pyspark.sql.types import IntegerType

df = df.withColumn("_time", to_timestamp("_time"))
w = window("_time", "2 seconds")

aggregate = ["alpha_devicemotion", "beta_devicemotion", "gamma_devicemotion", "x_devicemotion", "y_devicemotion", "z_devicemotion", "alpha_deviceorientation", "beta_deviceorientation", "gamma_deviceorientation"] 
funs = [mean, _sum, _max, var_samp, _min]

exprs = [f(col(c)) for f in funs for c in aggregate]

# Pyspark aggregations do not support nan in the sensors...
grouped = df.fillna(0).groupBy([w, "subject", "label"]).agg(*exprs)

## Time Based Features
hour = udf(lambda x: x.hour, IntegerType())
grouped = grouped.withColumn("hourOfDay", hour("window.start"))
                             
grouped = grouped.drop("window")
grouped

DataFrame[subject: string, label: string, avg(alpha_devicemotion): double, avg(beta_devicemotion): double, avg(gamma_devicemotion): double, avg(x_devicemotion): double, avg(y_devicemotion): double, avg(z_devicemotion): double, avg(alpha_deviceorientation): double, avg(beta_deviceorientation): double, avg(gamma_deviceorientation): double, sum(alpha_devicemotion): double, sum(beta_devicemotion): double, sum(gamma_devicemotion): double, sum(x_devicemotion): double, sum(y_devicemotion): double, sum(z_devicemotion): double, sum(alpha_deviceorientation): double, sum(beta_deviceorientation): double, sum(gamma_deviceorientation): double, max(alpha_devicemotion): double, max(beta_devicemotion): double, max(gamma_devicemotion): double, max(x_devicemotion): double, max(y_devicemotion): double, max(z_devicemotion): double, max(alpha_deviceorientation): double, max(beta_deviceorientation): double, max(gamma_deviceorientation): double, var_samp(alpha_devicemotion): double, var_samp(beta_devicemotion

In [26]:
from pyspark.sql.functions import first

counts = df.groupBy([w, "subject", "label"]).agg(first("label"))
counts = counts.groupBy("label").count()

print(counts.collect())


21/09/01 15:27:06 WARN TaskSetManager: Stage 1006 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.


[Row(label='sleeping', count=118), Row(label='standing', count=13), Row(label='jogging', count=4), Row(label='studying', count=1003), Row(label='driving', count=226)]




## Trainings

Let's build a pipeline and traing it using spark.

In [8]:
from pyspark.sql.types import StringType, DoubleType

num_cols = [f.name for f in grouped.schema.fields if isinstance(f.dataType, DoubleType) or isinstance(f.dataType, IntegerType)]

num_cols

['avg(alpha_devicemotion)',
 'avg(beta_devicemotion)',
 'avg(gamma_devicemotion)',
 'avg(x_devicemotion)',
 'avg(y_devicemotion)',
 'avg(z_devicemotion)',
 'avg(alpha_deviceorientation)',
 'avg(beta_deviceorientation)',
 'avg(gamma_deviceorientation)',
 'sum(alpha_devicemotion)',
 'sum(beta_devicemotion)',
 'sum(gamma_devicemotion)',
 'sum(x_devicemotion)',
 'sum(y_devicemotion)',
 'sum(z_devicemotion)',
 'sum(alpha_deviceorientation)',
 'sum(beta_deviceorientation)',
 'sum(gamma_deviceorientation)',
 'max(alpha_devicemotion)',
 'max(beta_devicemotion)',
 'max(gamma_devicemotion)',
 'max(x_devicemotion)',
 'max(y_devicemotion)',
 'max(z_devicemotion)',
 'max(alpha_deviceorientation)',
 'max(beta_deviceorientation)',
 'max(gamma_deviceorientation)',
 'var_samp(alpha_devicemotion)',
 'var_samp(beta_devicemotion)',
 'var_samp(gamma_devicemotion)',
 'var_samp(x_devicemotion)',
 'var_samp(y_devicemotion)',
 'var_samp(z_devicemotion)',
 'var_samp(alpha_deviceorientation)',
 'var_samp(beta_de

### Train Test Split by Subjects

In [9]:
from pyspark.sql.functions import rand, floor
from pyspark.sql.window import Window

num_folds = 3
seed = 2

subjects = grouped.select("subject").distinct()
subjects = subjects.withColumn("fold_id", floor(rand(seed) * num_folds))

data = grouped.join(subjects, "subject").drop("subject")

In [16]:

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.feature import StringIndexer,  VectorIndexer, VectorAssembler, IndexToString
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
indexedData = labelIndexer.transform(data)

featureAssembler = VectorAssembler(inputCols = num_cols, outputCol = "features")

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features", maxDepth=10)

# Chain indexers and tree in a Pipeline
indexRevert = IndexToString(inputCol="prediction", outputCol="predLabel", labels=labelIndexer.labels)
pipeline = Pipeline(stages=[featureAssembler, dt])

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")


grid = ParamGridBuilder().addGrid(dt.maxDepth, [15]).build()
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, foldCol="fold_id", evaluator=evaluator, numFolds=num_folds)

# Train model.  This also runs the indexers.
model = cv.fit(indexedData)
print(model.avgMetrics[0])

21/09/01 15:03:02 WARN TaskSetManager: Stage 539 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.
21/09/01 15:03:16 WARN TaskSetManager: Stage 544 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.
21/09/01 15:03:18 WARN TaskSetManager: Stage 546 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.
21/09/01 15:03:36 WARN TaskSetManager: Stage 559 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.
21/09/01 15:03:38 WARN TaskSetManager: Stage 561 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.
21/09/01 15:03:49 WARN TaskSetManager: Stage 574 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.
21/09/01 15:03:51 WARN TaskSetManager: Stage 576 contains a task of very large size (10541 KiB). The maximum recommended task size is 10

0.7625257678829107




In [17]:
# Make predictions.
predictions = model.transform(indexedData)
predictions = indexRevert.transform(predictions)

# Select example rows to display.
predictions.select("predLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))


21/09/01 15:11:54 WARN TaskSetManager: Stage 971 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.
21/09/01 15:11:56 WARN TaskSetManager: Stage 973 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.


+---------+--------+--------------------+
|predLabel|   label|            features|
+---------+--------+--------------------+
| studying|studying|[0.00496896817960...|
| studying|studying|[0.00360367478508...|
| studying|studying|[-0.0053737443219...|
| studying|studying|[0.00459792159916...|
| studying|studying|[0.00755677238757...|
+---------+--------+--------------------+
only showing top 5 rows



21/09/01 15:12:06 WARN TaskSetManager: Stage 987 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.
21/09/01 15:12:07 WARN TaskSetManager: Stage 986 contains a task of very large size (10541 KiB). The maximum recommended task size is 1000 KiB.


Test Error = 0 




In [20]:
from onnxmltools import convert_sparkml, utils
from onnxmltools.convert.sparkml.utils import buildInitialTypesSimple
from onnx import checker, version_converter

initial_types = buildInitialTypesSimple(data.drop("indexedLabel", "label", "fold_id"))
onnx_model = convert_sparkml(model.bestModel, 'Context Activity Predictor', initial_types, spark_session = spark)

onnx_model = version_converter.convert_version(onnx_model, 7)

utils.save_model(onnx_model, 'ks_context.onnx')

model = utils.load_model('ks_context.onnx')
checker.check_model(model)


The maximum opset needed by this model is only 4.


In [21]:
print(labelIndexer.labels)

['studying', 'driving', 'sleeping', 'standing', 'jogging']
