
# **Running Pyspark in Colab**

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

Run a local spark session to test your installation:

In [21]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
import findspark
findspark.init()



In [22]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [23]:
! pip install haversine



In [24]:
import urllib
import requests
urllib.request.urlretrieve("https://download.microsoft.com/download/F/4/8/F4894AA5-FDBC-481E-9285-D5F8C4C4F039/Geolife%20Trajectories%201.3.zip", "/tmp/geo_raw.zip")

('/tmp/geo_raw.zip', <http.client.HTTPMessage at 0x7f63012cc550>)

In [25]:
%%sh
unzip /tmp/geo_raw.zip

Archive:  /tmp/geo_raw.zip


replace Geolife Trajectories 1.3/Data/000/Trajectory/20081023025304.plt? [y]es, [n]o, [A]ll, [N]one, [r]ename:  NULL
(EOF or read error, treating as "[N]one" ...)


# **Directory and file preprocessing**

In [26]:
os.listdir(path="./Geolife Trajectories 1.3/Data/010")

from pathlib import Path
import os

mypath = "./Geolife Trajectories 1.3/Data/"

dir_with_labels = []

for directory in os.listdir(mypath):
  label_file = Path("./Geolife Trajectories 1.3/Data/"+directory+"/labels.txt")
  if label_file.is_file():
    dir_with_labels.append(directory)

dir_with_labels.sort()
print (dir_with_labels)
len(dir_with_labels)

['010', '020', '021', '052', '053', '056', '058', '059', '060', '062', '064', '065', '067', '068', '069', '073', '075', '076', '078', '080', '081', '082', '084', '085', '086', '087', '088', '089', '091', '092', '096', '097', '098', '100', '101', '102', '104', '105', '106', '107', '108', '110', '111', '112', '114', '115', '116', '117', '118', '124', '125', '126', '128', '129', '136', '138', '139', '141', '144', '147', '153', '154', '161', '163', '167', '170', '174', '175', '179']


69

In [29]:
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DateType, DoubleType, TimestampType
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
from datetime import datetime
from pyspark.sql.functions import udf
import pyspark.sql.functions as func
from pyspark.sql.functions import lag   
from haversine import haversine, Unit

def timetotimestamp (string1) : 
  s = datetime.strptime(string1, "%Y-%m-%d %H:%M:%S")
  return datetime.timestamp(s)

def timetotimestampl (string1) : 
  s = datetime.strptime(string1, "%Y/%m/%d %H:%M:%S")
  return datetime.timestamp(s)

def distance (la1, lo1, la2, lo2) : 
  point1 = (la1, lo1)
  point2 = (la2, lo2)
  return haversine(point1, point2)

udfdistance = udf(lambda x1, y1, x2, y2:distance(x1, y1, x2, y2), DoubleType())
udftimetotimestamp = udf(lambda x:timetotimestamp(x), DoubleType())
udftimetotimestampl = udf(lambda x:timetotimestampl(x), DoubleType())

path = "./Geolife Trajectories 1.3/Data/"

def get_labeled_dataset_from_directory(DirectoryName):
  dir_path = path + DirectoryName
  traject_path = dir_path+"/Trajectory/"
  os.listdir(path = traject_path)[0]

  first_file = traject_path + '*.plt'

  file_location = first_file
  file_type = "csv"

  # Loading raw GPS Data

  customSchema = StructType([
      StructField("latitude", DoubleType(), True),
      StructField("longitude", DoubleType(), True),
      StructField("zero", IntegerType(), True),
      StructField("altitude", DoubleType(), True),
      StructField("datetype", StringType(), True),
      StructField("date", StringType(), True),
      StructField("time", StringType(), True),
  ])


  infer_schema = "false"
  first_row_is_header = "false"
  delimiter = ","

  df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .schema(customSchema) \
    .load(file_location)

  df = df.na.drop(how='any').drop('zero')

  df = df.withColumn('timedate', concat(df.date.cast("string"), lit(" "), df.time.cast("string"))).drop('datetype', 'date', 'time').withColumn("user", lit(DirectoryName))

  # Loading labels

  first_labels = dir_path +'/labels.txt'
  file_type = "csv"

  customSchema = StructType([
      StructField("start_time", StringType(), True),
      StructField("end_time", StringType(), True),
      StructField("mode", StringType(), True),
  ])

  infer_schema = "false"
  first_row_is_header = "true"
  delimiter = "	"

  dfl = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .schema(customSchema) \
    .load(first_labels)
  
  # Files preprocessing

  w = Window().orderBy(lit('A'))
  dfl = dfl.withColumn("traject_id", row_number().over(w))

  df = df.withColumn("date_timestamp", udftimetotimestamp('timedate'))

  dfl = dfl.withColumn("start_time_conv", udftimetotimestampl('start_time'))
  dfl = dfl.withColumn("end_time_conv", udftimetotimestampl('end_time'))

  # Feature engineering

  cond = [(df.date_timestamp >= dfl.start_time_conv) & (df.date_timestamp <= dfl.end_time_conv)]
  out_df = df.join(dfl, cond, 'inner').select(df.latitude, df.longitude,
                                            df.altitude,
                                            df.timedate,
                                            df.date_timestamp,  
                                            dfl.mode,
                                            df.user,
                                            dfl.traject_id, 
                                            )

  out_df = out_df.withColumn('prev_period_lat', lag('latitude', 10).over(Window.orderBy("traject_id").partitionBy("traject_id"))) \
                  .withColumn('prev_period_long', lag('longitude', 10).over(Window.orderBy("traject_id").partitionBy("traject_id"))) \
                  .withColumn('prev_period_alt', lag('altitude', 10).over(Window.orderBy("traject_id").partitionBy("traject_id"))) \
                  .withColumn('prev_period_td', lag('date_timestamp', 10).over(Window.orderBy("traject_id").partitionBy("traject_id"))).na.drop() 

  out_df = out_df.withColumn('dist', udfdistance('prev_period_lat', 'prev_period_long', 'latitude', 'longitude')) \
        .withColumn('delta_t', col('date_timestamp') - col('prev_period_td')) \
        .withColumn('velocity', col('dist') / col('delta_t')) \
        .drop('latitude', 'longitude', 'timedate', 'date_timestamp', 'prev_period_lat', 'prev_period_long', 'prev_period_alt', 'prev_period_td', 'delta_t')

  return out_df


In [30]:
for idx,f in enumerate(dir_with_labels):
    print('Joing user '+f+' directory to dataset')  
    if idx == 0:
        df = get_labeled_dataset_from_directory(f)
        joi = df
    else:
        df = get_labeled_dataset_from_directory(f)
        joi = joi.unionAll(df)
        

Joing user 010 directory to dataset
Joing user 020 directory to dataset
Joing user 021 directory to dataset
Joing user 052 directory to dataset
Joing user 053 directory to dataset


In [31]:
joi.show(40)

+--------+------+----+----------+-------------------+--------------------+
|altitude|  mode|user|traject_id|               dist|            velocity|
+--------+------+----+----------+-------------------+--------------------+
|   978.0|subway| 010|       148|0.10866102050591181|0.009878274591446528|
|   978.0|subway| 010|       148|0.11376691417546496|0.010342446743224086|
|   978.0|subway| 010|       148|0.12341268598109312|0.011219335089190284|
|   978.0|subway| 010|       148|0.13727788872104948|0.011439824060087457|
|   978.0|subway| 010|       148|0.13498635592544087|0.012271486902312806|
|   978.0|subway| 010|       148|0.13567719663309105|0.012334290603008278|
|   978.0|subway| 010|       148|0.13244519113651754|0.012040471921501594|
|   978.0|subway| 010|       148|0.13072088615835287|0.011883716923486625|
|   978.0|subway| 010|       148| 0.1284384829270006|0.011676225720636419|
|   978.0|subway| 010|       148|0.12458055719272818|0.011325505199338925|
|   978.0|subway| 010|   

# **Data split and normalization**

In [32]:
trainDF, testDF = joi.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) # Cache because accessing training data multiple times
print(testDF.count())

679422
170127


In [33]:
from pyspark.ml.feature import StringIndexer

labelToIndex = StringIndexer(inputCol="mode", outputCol="label")


In [34]:
from pyspark.ml.feature import VectorAssembler

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["dist", "velocity", "altitude"]
assemblerInputs =  numericCols
vecAssembler = VectorAssembler(outputCol="features")
vecAssembler.setInputCols(numericCols)

VectorAssembler_a989124c4aa4

In [35]:
vecAssembler.transform(trainDF).head().features

DenseVector([0.0679, 0.0062, 308.0])

#**Running Logistic Model as Multiclass Classification Tool**


In [36]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)

In [37]:
from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[labelToIndex, vecAssembler.setParams(handleInvalid="skip"), lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)

In [38]:
predDF.select("features", "label", "prediction", "probability").show()

+--------------------+-----+----------+--------------------+
|            features|label|prediction|         probability|
+--------------------+-----+----------+--------------------+
|[0.07162719933783...|  5.0|       0.0|[0.38682993777625...|
|[0.06603794854226...|  5.0|       0.0|[0.38366131875853...|
|[0.07942294193602...|  5.0|       0.0|[0.39134779716143...|
|[0.08825395529509...|  5.0|       0.0|[0.39647273211695...|
|[0.06821512220740...|  5.0|       0.0|[0.38499499577376...|
|[0.09301689024807...|  5.0|       0.0|[0.39929287283732...|
|[0.07328422018879...|  5.0|       0.0|[0.38795568684189...|
|[0.07299244202743...|  5.0|       0.0|[0.38781421890033...|
|[0.10908240488803...|  5.0|       0.0|[0.40348655275383...|
|[3.57454218482925...|  5.0|       0.0|[0.41581968665548...|
|[3.58084491115412...|  5.0|       0.0|[0.41573094432987...|
|[0.05244109256144...|  5.0|       0.0|[0.37615721966628...|
|[0.09228724782318...|  5.0|       0.0|[0.39903713531497...|
|[0.09686793682240...|  

In [40]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

mcEvaluator = MulticlassClassificationEvaluator(metricName="f1")
print(f"F1-score: {mcEvaluator.evaluate(predDF)}")


Accuracy: 0.4496052948679516
F1-score: 0.2789548383863846


#**Running Random Forest as Multiclass Classification Tool**

In [41]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

In [42]:
# Define the pipeline based on the stages created in previous steps.
pipeline_rf = Pipeline(stages=[labelToIndex, vecAssembler.setParams(handleInvalid="skip"), rf])

# Define the pipeline model.
pipelineModel_rf = pipeline_rf.fit(trainDF)

# Apply the pipeline model to the test dataset.
predDF_rf = pipelineModel_rf.transform(testDF)

predDF_rf.select("features", "label", "prediction", "probability").show()

+--------------------+-----+----------+--------------------+
|            features|label|prediction|         probability|
+--------------------+-----+----------+--------------------+
|[0.07162719933783...|  5.0|       2.0|[0.22012773090000...|
|[0.06603794854226...|  5.0|       2.0|[0.22012773090000...|
|[0.07942294193602...|  5.0|       2.0|[0.22012773090000...|
|[0.08825395529509...|  5.0|       2.0|[0.22012773090000...|
|[0.06821512220740...|  5.0|       2.0|[0.22012773090000...|
|[0.09301689024807...|  5.0|       2.0|[0.22012773090000...|
|[0.07328422018879...|  5.0|       2.0|[0.22012773090000...|
|[0.07299244202743...|  5.0|       2.0|[0.22012773090000...|
|[0.10908240488803...|  5.0|       2.0|[0.29514278590641...|
|[3.57454218482925...|  5.0|       0.0|[0.40395871763533...|
|[3.58084491115412...|  5.0|       0.0|[0.40395871763533...|
|[0.05244109256144...|  5.0|       2.0|[0.19797181662221...|
|[0.09228724782318...|  5.0|       2.0|[0.22012773090000...|
|[0.09686793682240...|  

In [43]:
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF_rf)}")

mcEvaluator = MulticlassClassificationEvaluator(metricName="f1")
print(f"F1-score: {mcEvaluator.evaluate(predDF_rf)}")


Accuracy: 0.7061548137567817
F1-score: 0.6457514952340625


# **Model hyperparameters tuning**

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

In [None]:
# Create a 5-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)

# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = cv.fit(trainDF)

In [None]:
# Use the model identified by the cross-validation to make predictions on the test dataset
cvPredDF = cvModel.transform(testDF)

mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")

mcEvaluator = MulticlassClassificationEvaluator(metricName="f1")
print(f"F1-score: {mcEvaluator.evaluate(cvPredDF)}")
