# Import packages

In [1]:
#!pip install numpy # if necessary 

In [2]:
import warnings

warnings.filterwarnings('ignore')

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Connection to Spark instance

In [3]:
conf = pyspark.SparkConf().setMaster('spark://172.18.0.22:7077')
spark = SparkSession \
    .builder.config(conf=conf) \
    .appName("Python") \
    .getOrCreate()
print('Submitted application!')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/29 19:49:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Submitted application!


# Read csv files from 
Please download the six files "ts_data_block_$i$.csv", $i={1,...,6}$, from GoogleDrive and upload it to this container before.

In [4]:
# Create an empty RDD
emp_RDD = spark.sparkContext.emptyRDD()
columns = StructType([StructField('datetime', DateType(), False),
                       StructField('pulse', FloatType(), False),
                       StructField('category', StringType(), False),
                       StructField('ts_number', StringType(), False)])

df = spark.createDataFrame(data=emp_RDD, schema=columns)

for i in range(1, 3):

    df_temp = spark.read.format("csv") \
        .option('header', True) \
        .option('multiLine', True) \
        .option('inferSchema', True) \
        .load(f"file:////data/ts_data_{i}.csv")
    
    print(f'Read data block number {i}.')
    
    df = df.union(df_temp)

print('All data blocks read and concatenated.')

[Stage 1:>                                                          (0 + 1) / 1]

All data blocks read and concatenated.


                                                                                

In [5]:
type(df)

pyspark.sql.dataframe.DataFrame

In [6]:
df.show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+-------------------+-----+-----------+---------+
|           datetime|pulse|   category|ts_number|
+-------------------+-----+-----------+---------+
|2021-01-01 00:00:00| 52.0|pro_athlete|      1_1|
|2021-01-02 00:00:00| 57.0|pro_athlete|      1_1|
|2021-01-03 00:00:00| 54.0|pro_athlete|      1_1|
|2021-01-04 00:00:00| 52.0|pro_athlete|      1_1|
|2021-01-05 00:00:00| 50.0|pro_athlete|      1_1|
+-------------------+-----+-----------+---------+
only showing top 5 rows



                                                                                

In [7]:
# print(f'Dataframe consists of {df.count()} rows.')

# Applying Machine Learning Classification Model on time series data.

## Feature Engineering

### Group by time series data by ts_number and category and apply descriptive statistic measures on the column 'pulse' (analogue to pandas' describe() method)

In [8]:
# Define function for calculation descriptive statistics.
def groupby_describe(df, groupby_col, stat_col):
    out = df.groupby(groupby_col).agg(
        F.mean(stat_col).alias("mean"),
        F.stddev(stat_col).alias("std"),
        F.min(stat_col).alias("min"),
        F.expr(f"percentile({stat_col}, array(0.25))")[0].alias("low_quart"),
        F.expr(f"percentile({stat_col}, array(0.5))")[0].alias("median"),
        F.expr(f"percentile({stat_col}, array(0.75))")[0].alias("up_quart"),
        F.max(stat_col).alias("max"),
    )
    return out

df_stats = groupby_describe(df, ['ts_number', 'category'], 'pulse')

In [9]:
df_stats.show(5)

                                                                                

+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+
|ts_number|   category|             mean|                std| min|low_quart|median|up_quart| max|
+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+
|      1_1|pro_athlete|51.51111111111111| 4.2033813153100485|42.0|     49.0|  51.0|    54.0|61.0|
|     1_10|non_athlete|             80.0|                0.0|80.0|     80.0|  80.0|    80.0|80.0|
|    1_100|pro_athlete|47.75555555555555|   1.24801965476376|45.0|     47.0|  48.0|   48.75|51.0|
|    1_101|    athlete|72.81111111111112| 0.4211926470661242|72.0|     73.0|  73.0|    73.0|74.0|
|    1_102|non_athlete|80.05555555555556|0.23034469096157306|80.0|     80.0|  80.0|    80.0|81.0|
+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+
only showing top 5 rows



In [10]:
#print(f'Dataframe consists of {df_stats.count()} rows.')

### Encoding "category" column and store as Target:

In [11]:
catEncoder = StringIndexer(inputCol='category', outputCol='Target').fit(df_stats)
df_stats = catEncoder.transform(df_stats)

                                                                                

In [12]:
df_stats.show(5)

                                                                                

+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+------+
|ts_number|   category|             mean|                std| min|low_quart|median|up_quart| max|Target|
+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+------+
|      1_1|pro_athlete|51.51111111111111| 4.2033813153100485|42.0|     49.0|  51.0|    54.0|61.0|   1.0|
|     1_10|non_athlete|             80.0|                0.0|80.0|     80.0|  80.0|    80.0|80.0|   2.0|
|    1_100|pro_athlete|47.75555555555555|   1.24801965476376|45.0|     47.0|  48.0|   48.75|51.0|   1.0|
|    1_101|    athlete|72.81111111111112| 0.4211926470661242|72.0|     73.0|  73.0|    73.0|74.0|   0.0|
|    1_102|non_athlete|80.05555555555556|0.23034469096157306|80.0|     80.0|  80.0|    80.0|81.0|   2.0|
+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+------+
only showing top 5 rows



### Transform features to a vector

In [13]:
print("Transform features to vector and store as 'features':")
required_features = ['mean', 'std', 'min', 'low_quart', 'median', 'up_quart', 'max', 'Target']

vec_assembler = VectorAssembler(inputCols=required_features, outputCol='features')
df_stats_vec = vec_assembler.transform(df_stats)

Transform features to vector and store as 'features':


In [14]:
df_stats_vec.show(5)

[Stage 17:>                                                         (0 + 1) / 1]

+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+------+--------------------+
|ts_number|   category|             mean|                std| min|low_quart|median|up_quart| max|Target|            features|
+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+------+--------------------+
|      1_1|pro_athlete|51.51111111111111| 4.2033813153100485|42.0|     49.0|  51.0|    54.0|61.0|   1.0|[51.5111111111111...|
|     1_10|non_athlete|             80.0|                0.0|80.0|     80.0|  80.0|    80.0|80.0|   2.0|[80.0,0.0,80.0,80...|
|    1_100|pro_athlete|47.75555555555555|   1.24801965476376|45.0|     47.0|  48.0|   48.75|51.0|   1.0|[47.7555555555555...|
|    1_101|    athlete|72.81111111111112| 0.4211926470661242|72.0|     73.0|  73.0|    73.0|74.0|   0.0|[72.8111111111111...|
|    1_102|non_athlete|80.05555555555556|0.23034469096157306|80.0|     80.0|  80.0|    80.0|81.0|   2.0|[80.0555555555

                                                                                

### Split data set into training (70%) and test data set (30%)

In [15]:
train_df, test_df = df_stats_vec.randomSplit([0.7, 0.3], seed=12345)

In [16]:
train_df.show(5)

[Stage 20:>                                                         (0 + 1) / 1]

+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+------+--------------------+
|ts_number|   category|             mean|                std| min|low_quart|median|up_quart| max|Target|            features|
+---------+-----------+-----------------+-------------------+----+---------+------+--------+----+------+--------------------+
|      1_1|pro_athlete|51.51111111111111| 4.2033813153100485|42.0|     49.0|  51.0|    54.0|61.0|   1.0|[51.5111111111111...|
|     1_10|non_athlete|             80.0|                0.0|80.0|     80.0|  80.0|    80.0|80.0|   2.0|[80.0,0.0,80.0,80...|
|    1_100|pro_athlete|47.75555555555555|   1.24801965476376|45.0|     47.0|  48.0|   48.75|51.0|   1.0|[47.7555555555555...|
|    1_101|    athlete|72.81111111111112| 0.4211926470661242|72.0|     73.0|  73.0|    73.0|74.0|   0.0|[72.8111111111111...|
|    1_102|non_athlete|80.05555555555556|0.23034469096157306|80.0|     80.0|  80.0|    80.0|81.0|   2.0|[80.0555555555

                                                                                

In [18]:
#print(f"Number of train dataset: {train_df.count()}")
#print(f"Number of test  dataset: {test_df.count()}")

## Apply Multinomial Logistic Regression as Classification Model

### Apply Logistic Regression model based on training data set and predict category on test data set

In [19]:
lr = LogisticRegression(featuresCol='features', labelCol='Target')

In [20]:
lr_model = lr.fit(train_df)

ERROR:root:Exception while sending command.                         (0 + 1) / 1]
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.8/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side 

Py4JError: An error occurred while calling o147.fit

### Provide prediction based on  trained model.

In [None]:
y_pred = lr_model.transform(test_df)

### Show prediction vs. true values on test data set.

In [None]:
df_target_vs_prediction = y_pred.select('Target', 'prediction')

In [None]:
df_target_vs_prediction.show(10)

## Model Evaluation

### Model evaluation measures

### Confusion matrix

In [None]:
metrics = MulticlassMetrics(df_target_vs_prediction.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

### Evaluation measures

In [None]:
multi_evaluator_acc = MulticlassClassificationEvaluator(labelCol='Target', metricName='accuracy')
print(f'Prediction Accuracy: {multi_evaluator_acc.evaluate(y_pred)}')
multi_evaluator_prec = MulticlassClassificationEvaluator(labelCol='Target', metricName='precisionByLabel')
print(f'Prediction Precision: {multi_evaluator_prec.evaluate(y_pred)}')
multi_evaluator_rec = MulticlassClassificationEvaluator(labelCol='Target', metricName='recallByLabel')
print(f'Prediction Recall: {multi_evaluator_rec.evaluate(y_pred)}')
multi_evaluator_f1 = MulticlassClassificationEvaluator(labelCol='Target', metricName='f1')
print(f'Prediction F1-Score: {multi_evaluator_f1.evaluate(y_pred)}')