## Task 1 - SQL

### Build SparkSession:

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Mllib_df').getOrCreate()


### Read the json file:

In [2]:
filepath = 'DataFrames_sample.json'
df = spark.read.json(filepath)


### Display the schema:


In [3]:
df.printSchema()

root
 |-- D: double (nullable = true)
 |-- H: double (nullable = true)
 |-- HDD: string (nullable = true)
 |-- Id: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- RAM: string (nullable = true)
 |-- ScreenSize: string (nullable = true)
 |-- W: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Year: long (nullable = true)



### Get all the data when "Model" equal "MacBook Pro":




In [4]:
df.filter(df.Model == 'MacBook Pro').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Create TempView:

In [5]:
df.createOrReplaceTempView('df_tmp_view')

### Display "RAM"column and count "RAM" column:

In [6]:
ram = spark.sql('SELECT RAM FROM df_tmp_view')
ram.show()
ram.groupBy('RAM').count().sort('count').show()

+----+
| RAM|
+----+
|16GB|
| 8GB|
| 8GB|
|64GB|
+----+

+----+-----+
| RAM|count|
+----+-----+
|64GB|    1|
|16GB|    1|
| 8GB|    2|
+----+-----+



### Get all columns when "Year" column equal "2015"  

In [7]:
df.filter(df.Year == 2015).show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all when "Model" start with "M":

In [8]:
spark.sql('SELECT * FROM df_tmp_view WHERE Model LIKE "M%"').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
|7.74|0.52|256GB SSD|  2|    MacBook| 8GB|       12"|11.04|  2.03|2016|
|8.94|0.68|128GB SSD|  3|MacBook Air| 8GB|     13.3"| 12.8|  2.96|2016|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all data when "Model" column equal "MacBook Pro"

In [9]:
df.filter(df.Model == 'MacBook Pro').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all data with Multiple Conditions when "RAM" column equal "8GB" and "Model" column is "Macbook".

In [10]:
spark.sql('SELECT * FROM df_tmp_view WHERE Model = "MacBook" AND RAM = "8GB"').show()

+----+----+---------+---+-------+---+----------+-----+------+----+
|   D|   H|      HDD| Id|  Model|RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-------+---+----------+-----+------+----+
|7.74|0.52|256GB SSD|  2|MacBook|8GB|       12"|11.04|  2.03|2016|
+----+----+---------+---+-------+---+----------+-----+------+----+



### Get all data with Multiple Conditions when "D" greater than or equal "8" and "Model" column is "iMac".

In [11]:
spark.sql('SELECT * FROM df_tmp_view where D >= 8 and Model ==  "iMac"').show()

+---+----+-------+---+-----+----+----------+----+------+----+
|  D|   H|    HDD| Id|Model| RAM|ScreenSize|   W|Weight|Year|
+---+----+-------+---+-----+----+----------+----+------+----+
|8.0|20.3|1TB SSD|  4| iMac|64GB|       27"|25.6|  20.8|2017|
+---+----+-------+---+-----+----+----------+----+------+----+



## Task 2


### Read "test1" dataset:

In [12]:
filepath = 'test1.csv'
test1_df = spark.read.option("header", True).csv(filepath)
test1_df.createOrReplaceTempView('test1_df_tmp_view')
test1_df.printSchema()


root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



### Display Salary of the people less than or equal to 20000

In [13]:
spark.sql('SELECT * FROM test1_df_tmp_view WHERE Salary <= 20000').show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Display Salary of the people less than or equal to 20000 and greater than or equal 15000

In [14]:
spark.sql('SELECT * FROM test1_df_tmp_view WHERE Salary <= 20000 AND Salary >= 15000').show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



## Task 3 

### Read "test3" dataset:

In [15]:
filepath = 'test3.csv'
test3_df = spark.read.option("header", True).csv(filepath)
test3_df.createOrReplaceTempView('test3_df_tmp_view')

### Display dataset

In [16]:
test3_df.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



### Display schema

In [17]:
test3_df.printSchema()


root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: string (nullable = true)



### Group by "Name" column and using sum function on "Salary" column

In [18]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

test3_df = test3_df.withColumn("salary", col("salary").cast("integer"))
test3_df.groupBy("Name").sum("salary").show()


+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



### Group by "Name" column and using avg function on "Salary" column

In [19]:
test3_df.groupBy("Name").avg("salary").show()

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



### Group by "Departments" column and using sum function on "Salary" column

In [20]:
test3_df.groupBy("Departments").sum("salary").show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



### Group by "Departments" column and using mean function on "Salary" column:

In [21]:
test3_df.groupBy("Departments").mean("salary").show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



Group by "Departments" column and using count function on "Departments" column:

In [22]:
test3_df.groupBy("Departments").count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



### Apply agg to using sum function get the total of salaries

In [23]:
test3_df.agg(sum("salary")).show()

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



## Task 4

You've been flown to their headquarters in Ulsan, South Korea, to assist them in accurately estimating the number of crew members a ship will need.


They're currently building new ships for certain customers, and they'd like you to create a model and utilize it to estimate how many crew members the ships will require.


Metadata:
1. Measurements of ship size 
2. capacity 
3. crew 
4. age for 158 cruise ships.

It is saved in a csv file for you called "ITI_data.csv". our task is to develop a regression model that will assist in predicting the number of crew members required for future ships. The client also indicated that they have found that particular cruise lines will differ in acceptable crew counts, thus this is most likely an important factor to consider when conducting your investigation.

In [24]:
filepath = 'ITI_data.csv'
ITI_data_df = spark.read.option("header", True).csv(filepath)
ITI_data_df.printSchema()
ITI_data_df.show(5)

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Tonnage: string (nullable = true)
 |-- passengers: string (nullable = true)
 |-- length: string (nullable = true)
 |-- cabins: string (nullable = true)
 |-- passenger_density: string (nullable = true)
 |-- crew: string (nullable = true)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|   

In [25]:
# convert datatypes
ITI_data_df = ITI_data_df.withColumn("Age",ITI_data_df.Age.cast('int'))
ITI_data_df = ITI_data_df.withColumn("Tonnage",ITI_data_df.Tonnage.cast('float'))
ITI_data_df = ITI_data_df.withColumn("passengers",ITI_data_df.passengers.cast('float'))
ITI_data_df = ITI_data_df.withColumn("length",ITI_data_df.length.cast('float'))
ITI_data_df = ITI_data_df.withColumn("cabins",ITI_data_df.cabins.cast('float'))
ITI_data_df = ITI_data_df.withColumn("passenger_density",ITI_data_df.passenger_density.cast('float'))
ITI_data_df = ITI_data_df.withColumn("crew",ITI_data_df.crew.cast('float'))

ITI_data_df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: float (nullable = true)
 |-- passengers: float (nullable = true)
 |-- length: float (nullable = true)
 |-- cabins: float (nullable = true)
 |-- passenger_density: float (nullable = true)
 |-- crew: float (nullable = true)



In [26]:
# Split data into train & test
(train, test) = ITI_data_df.randomSplit([.8, .2])


### OneHotEncoder 


In [27]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [28]:
# first, convert string to number using StringIndexer
category_indexers = []
for col in ['Ship_name', 'Cruise_line']:
    category_indexers.append(StringIndexer(inputCol=col, outputCol=f"{col}_index"))
    # Fits a model to the input dataset with optional parameters.
    # ITI_data_df = category_indexer.fit(ITI_data_df).transform(ITI_data_df)
    # # drop original column
    # ITI_data_df = ITI_data_df.drop(col)

ITI_data_df.show(5)

+-----------+-----------+---+-------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+-------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6| 30.277|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6| 30.277|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26| 47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|  110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|101.353|     26.42|  8.92| 13.21|            38.36|10.0|
+-----------+-----------+---+-------+----------+------+------+-----------------+----+
only showing top 5 rows



In [29]:
# Second, OneHot Encoding
oh_encoders = []
for col in ['Ship_name_index', 'Cruise_line_index']:
    oh_encoders.append(OneHotEncoder(inputCol=col, outputCol=f"{col}_oh_encoded"))
    #Fits a model to the input dataset with optional parameters.
    # ITI_data_df = oh_encoder.fit(ITI_data_df).transfgory_indexer.fit(ITI_data_df).transform(ITI_data_df)
    # # drop original column
    # ITI_data_df = ITI_orm(ITI_data_df)
    # # drop original column
    # ITI_data_df = ITI_data_df.drop(col)

ITI_data_df.show(5)


+-----------+-----------+---+-------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+-------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6| 30.277|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6| 30.277|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26| 47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|  110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|101.353|     26.42|  8.92| 13.21|            38.36|10.0|
+-----------+-----------+---+-------+----------+------+------+-----------------+----+
only showing top 5 rows



### Use VectorAssembler to merge all columns into one column:

In [30]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['Ship_name_index_oh_encoded', 'Cruise_line_index_oh_encoded', 'Age',
               'Tonnage', 'passengers', 'length', 'cabins', 'passenger_density'],
    outputCol="features"
)


### Create a Linear Regression Model 

In [31]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol='crew', featuresCol='features')


### Creating a Pipeline

In [32]:
from pyspark.ml import Pipeline

# create pipeline
stages = category_indexers + oh_encoders + [assembler] + [lr]
pipeline = Pipeline(stages=stages)
# train LR mode
pipeline_model = pipeline.fit(train)
# get train predictions
train_predictions = pipeline_model.transform(train)
train_predictions.select('features', 'crew', 'prediction').show(5)

# get test predictions 
test_predictions = pipeline_model.transform(test)


+--------------------+-----+------------------+
|            features| crew|        prediction|
+--------------------+-----+------------------+
|(140,[9,115,134,1...|11.85|11.847046008201607|
|(140,[10,120,134,...|  4.0| 3.997169214523341|
|(140,[11,123,134,...| 0.59|0.5894338529130927|
|(140,[12,122,134,...|  7.0| 6.993477192790334|
|(140,[13,124,134,...|  5.2| 5.204257160163007|
+--------------------+-----+------------------+
only showing top 5 rows



### Model Evaluation

In [33]:
from pyspark.ml.evaluation import RegressionEvaluator

lr_eval = RegressionEvaluator(labelCol="crew", predictionCol="prediction", metricName="rmse")

# training RMSE
print("Training RMSE: %.3f" % lr_eval.evaluate(train_predictions))

# testing RMSE
print("Testing RMSE: %.3f" % lr_eval.evaluate(test_predictions))

Training RMSE: 0.091


Py4JJavaError: An error occurred while calling o616.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 81.0 failed 1 times, most recent failure: Lost task 0.0 in stage 81.0 (TID 1235, 192.168.1.17, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$3307/0x0000000841334840: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1204)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1205)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Unseen label: Amsterdam. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:405)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:390)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2194)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1220)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1196)
	at org.apache.spark.mllib.stat.Statistics$.colStats(Statistics.scala:58)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.summary$lzycompute(RegressionMetrics.scala:70)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.summary(RegressionMetrics.scala:62)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr$lzycompute(RegressionMetrics.scala:74)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr(RegressionMetrics.scala:74)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.meanSquaredError(RegressionMetrics.scala:106)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.rootMeanSquaredError(RegressionMetrics.scala:115)
	at org.apache.spark.ml.evaluation.RegressionEvaluator.evaluate(RegressionEvaluator.scala:110)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$3307/0x0000000841334840: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1204)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1205)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.SparkException: Unseen label: Amsterdam. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:405)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:390)
	... 31 more
