In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark
!pip install findspark


In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HMP Dataset Example") \
    .getOrCreate()



In [None]:
!rm -Rf HMP_Dataset
!git clone https://github.com/wchill/HMP_Dataset

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType

schema = StructType([
    StructField("x", IntegerType(), True),
    StructField("y", IntegerType(), True),
    StructField("z", IntegerType(), True)])

In [None]:
import os
import fnmatch

d = 'HMP_Dataset/'

# filter list for all folders containing data (folders that don't start with .)
file_list_filtered = [s for s in os.listdir(d) if os.path.isdir(os.path.join(d,s)) & ~fnmatch.fnmatch(s, '.*')]

from pyspark.sql.functions import lit

#create pandas data frame for all the data

df = None

for category in file_list_filtered:
    data_files = os.listdir('HMP_Dataset/'+category)

    #create a temporary pandas data frame for each data file
    for data_file in data_files:
        print(data_file)
        temp_df = spark.read.option("header", "true").option("header", "false").option("delimiter", " ").csv('HMP_Dataset/'+category+'/'+data_file,schema=schema)

        #create a column called "source" storing the current CSV file
        temp_df = temp_df.withColumn("source", lit(data_file))

        #create a column called "class" storing the current data folder
        temp_df = temp_df.withColumn("class", lit(category))

        #append to existing data frame list
        #data_frames = data_frames + [temp_df]

        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)

In [None]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer

indexer = StringIndexer(inputCol="class", outputCol="label")
encoder = OneHotEncoder(inputCol="label", outputCol="labelVec")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"],outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="features_norm", maxIter=10)

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer, gbt])

In [None]:
model = pipeline.fit(df_train)

In [None]:
prediction = model.transform(df_train)

In [None]:
model = pipeline.fit(df_test)

In [None]:
prediction = model.transform(df_test)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")

In [None]:
BinEval.evaluate(prediction)