# Challenge 3: PySpark Demo

In this Challenge we'll show you how to use Spark to run intensive computation jobs in Jupyter Notebook. 

#### Suggested time on this lab: 120-150 min

So far you should have installed PySpark in local mode, configured your environment variables, and started Jupyter Notebook with PySpark. If you haven't completed any of the previous steps, please go back and complete them because otherwise you won't be able to complete this challenge.

If everything works so far, you should be able to create a `SparkContext` instance in Jupyter Notebook. Try to execute the next cell where the code is already written for you to create a new `SparkContext` instance.

In [1]:
# Create a `SparkContext` instance or reuse an existing one
from pyspark import SparkContext 
sc = SparkContext.getOrCreate()

No errors? Congrats! But in case you do, review Challenge 2 and google your error to see what could have possibly gone wrong.

SparkContext is a library that helps you access the Spark execution environment in order to use the [Spark Python APIs](https://spark.apache.org/docs/latest/api/python/index.html). We will show you an example of how to call the Spark API using the SparkContext instance.

But before we start the real work, we'd like you to get familiar with the concept of [benchmarking](https://en.wikipedia.org/wiki/Benchmark_(computing)). Benchmarking is a common technique in software engineering and data engineering to evaluate the efficiency of your codes. The idea is to measure the execution time of your program and research if there is room to improve your code in order to get the job done with shorter time. In machine learning a lot of times you'be doing repeated sampling and complex queries that could run for hours, days, or even weeks. It is important that you improve your code efficiency whenever you can. Otherwise you'll waste a lot of time and computation powers in executing inefficient code.

We will benchmark a code snippet that estimates Pi (`π`) by repeated sampling. We will first benchmark this without using Spark. The idea of the Pi estimation is to randomly generate many points with `x` and `y` coordinates between 0 and 1. These points will fall in a square whose side length is 1 (the upper right grid of the square in the image below that combines the red and green areas). We will count the number of sample points that fall into the 1/4 circle sector (the area in red) against all points which allows us to calculate Pi.

![pi.png](pi.png)

Below are the math formulas to calculate the probability of a point to fall into the 1/4 circle sector (*A0*) and the square area (*A0*).

```
A0 = π * r^2
A1 = (2r)^2
```

From the formulas above, you can deduce ` π = 4 * A0` if *r* is 1, which means Pi equals to 4 times the probability of a point falling into the red area.

Because this lab focuses on Spark, we will provide you the code snippet for estimating Pi based on the mathematical concept discussed above. Read the code carefully and make sure you fully understand what the code does.

In [10]:
import numpy as np
from timeit import default_timer as timer

def inside(p):
    """
    Generate a random point and check if the point is within the circle with radius=1.
    
    Returns:
        (bool) whether the generated point is within the circle area
    """
    x, y = np.random.random(), np.random.random()
    return x*x + y*y < 1

def estimate_pi(num_samples):
    """
    Estimate the value of Pi by means of repeated sampling. Benchmark the repeated sampling time cost.
    
    Params:
        num_sampes (int): the number of sample points to generate
    
    Returns:
        (float) estimated value of Pi
    """
    print("Executing Spark job...")
    start = timer()
    dots = list(filter(inside,list(range(num_samples)))) 
    count = len(dots)
    end = timer()
    return(4.0 * count / num_samples)


In the cell below, call `estimate_pi()` with `50000`, `500000`, and `5000000` sample sizes. See what you get.

In [11]:
# Your code here
estimate_pi(50000)

Executing Spark job...


3.14928

In [12]:
estimate_pi(500000)

Executing Spark job...


3.141688

In [13]:
estimate_pi(500000)

Executing Spark job...


3.140136

As you see, the excution time increases when you increase the sample size. In addition, the accuracy of the Pi value increases as you increase the sample size. But if you keep increasing the sample size you have to wait a long time for the script to finish execution. What can you do if you really need to run a very large sample size?

There are several options. You can **use a computer with higher processing speed**. This allows you to execute the sampling jobs faster. But this is usually not the best option because the improvement of CPU processing speed has limited impacts on executing millions of sampling jobs.

The second option is to **run multiple sampling jobs in parallel**. In case you don't know yet, Python is a linear programming language which means it executes one job then the next. If you wait for millions of samping jobs to finish in a single queue it will take you a lot of time. Therefore, most modern programming languages including Python and R have introduced a feature called [parallel programming or parallel processing](https://wiki.python.org/moin/ParallelProcessing) so that you can save time by running several tasks at the same time. Say if your sampling jobs would take 1,000 seconds to finish in a single queue, you can finish them in 250 seconds with 4 parallel processing queues. Note that this time calculation is theoretical. In reality with 4 parallel queues the finishing time is usually more than 250 seconds because Python has to spend some overhead time to [spawn](https://en.wikipedia.org/wiki/Spawn_(computing)) the parallel processes.

**How many parallel processes you can run depends on how many CPU cores you have** on your computer. If you have 8 CPU cores on your computer, the max number of parallel processes you can run is 8. What if you attempt to run more processes than the number of your CPU cores? The excessive jobs will be queued up in the same CPU core which makes your parallel programming meaningless. Therefore, in order to run parallel programming you need to use a multi-core computer and the parallel processes can't exceed your number of CPU cores. 

Then you may ask, **should I allocate all my CPU cores to parallel programming?** Remember that besides executing the parallel programming scripts, your computer's CPUs also need to run your operation system and some other system processes. Therefore, **you typically reserve 1-2 CPU cores for the OS and system processes and allocate the rest to parallel programming**. However, with Apache Spark you don't need to manually allocate the CPU cores because Spark will automatically do that for you depending on how many cores it has access to. Remember in Challenge 2 we asked you to use the following command to start Spark:

```
$SPARK_PATH/bin/pyspark --master "local[*]"
```

The `*` directive tells Sparks to decide how many CPU cores to use by itself. If you start Spark with `local[2]`, it means you allocate 2 CPU cores to Spark.

Another note is **your computer memory may also set a bottleneck on the performance of parallel programming**. When your computer runs parallel jobs, its memory consumption is multiplied. If a single process consumes 256mb memory, four parallel processes will consume 1,024mb memory. This is in addition to the memory consumption of Apache Spark that keeps your dataset in the memory to improve processing speed. Therefore, in order to use Spark successfully in local mode, you really need a high-end computer with multi-cores and high memory (8gb or above). If your computer runs out of memory, your parallel scripts will be shut down.  

So far we have discussed the multi-core and memory concerns of running Spark in the local mode. These concerns are similar if you run Spark in the cluster mode. With the cluster mode each cluster has its own memory and one or multiple cores. Spark is smart enough to automatically allocate the resources in the cluster mode too.

Now let's move on to the real stuff. How do you run parallel programming with Apache Spark? **That is achieved by invoking Spark's `parallelize` method** ([documentation](https://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections)). In the next cell, we have a function called `estimate_pi_parallel` for you to complete. You will rewrite the `estimate_pi` function we gave you earlier and use `SparkContext.parallelize` to spawn parallel processes to create the random point samples then use the samples to estimate Pi. 

If you are stuck, you can reference [this Jupyter Notebook example](https://github.com/mGalarnyk/Installations_Mac_Ubuntu_Windows/blob/master/Spark/Estimating%20PI.ipynb) (but don't simply copy and paste). **Make sure to benchmark your function so that we know if Spark helps improve the code executing time.**

*Hint: You can re-use the `inside` function we gave you earlier in your code.*

In [18]:
def estimate_pi_parallel(num_samples):
    """
    Estimate the value of Pi by means of repeated sampling using Spark's `parallelize` method. 
    Benchmark the repeated sampling time cost.
    
    Params:
        num_sampes (int): the number of sample points to generate
    
    Returns:
        (float) estimated value of Pi
    """
    start = timer()

    count = sc.parallelize(range(0, num_samples)).filter(inside).count()    
    end = timer()
    return 4.0 * count/num_samples

Next, test your `estimate_pi_parallel` function with `5000000` and `50000000` sample sizes. Run it more than once for each sample size to see how the execution time varies.

In [20]:
# Your code here
estimate_pi_parallel(5000000)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 6.0 failed 1 times, most recent failure: Lost task 1.0 in stage 6.0 (TID 25, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 262, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.7, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 262, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.7, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


You should have noticed the execution time is significantly shorter with Spark's `parallelize` method than without using Spark. However, if you find the execution time is actually longer, it could be because your code is not efficient or Spark doesn't have access to multiple CPU cores. If it's the latter case, don't worry. You can still experience the power of parallel programming in Challenge 4 when we run Spark in the cluster mode.

You may have also noticed the first time to execute your function takes significantly longer time than executing it in the second and third time (you need to execute the commands consequtively in a row). This is because at the first time you execute the function, Spark has to acquire the memory and CPUs it needs from the system. After the execution is finished Spark does not release the memory immediately. So when you execute the script consequtively, Spark does not need to acquire the hardware resources again.

Spark is written in Java (you are interacting with its Java core via a Python wrapper). Java applications typically maintain a minimum memory even if it is idling. When memory-demanding processes are being executed, Java will *burst* its memory consumption by acquiring more from the system.

# Data Wrangling with PySpark Dataframes

Next, you will be practicing data wrangling with the Pandas-like dataframes that PySpark provides you. You should have already used the equivalent of these functions with Pandas in the previous labs. We just want to show you the Spark way of doing the similar things. Note that the syntax in Pyspark is different from that in Pandas. Use Google to find the examples you need.

#### Import `SparkSession` from `pyspark.sql`. Create a new instance of SparkSession.

In [22]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('patients').getOrCreate()

#### Read a CSV dataset from a previous lab using `spark.read.csv()` and assign the returned dataframe into a variable called `patients`.

The dataset path is `../lab-sklearn-and-unsupervised-learning/patient-admission-dataset-for-learning-data-mining.csv`. 

*Hint: Use `spark.read.csv()` to read from the dataset.*

In [24]:
# Use `spark.read.csv
patients = spark.read.csv('../lab-sklearn-and-unsupervised-learning/patient-admission-dataset-for-learning-data-mining.csv', header=True)

#### Print a summary of `patients` using `printSchema()`.

*Hint: Use `printSchema`.*

In [25]:
# Your code here
patients.printSchema()

root
 |-- id: string (nullable = true)
 |-- patient_name: string (nullable = true)
 |-- patient_email: string (nullable = true)
 |-- doctor_phone: string (nullable = true)
 |-- patient_gender: string (nullable = true)
 |-- patient_dob: string (nullable = true)
 |-- patient_diabetic: string (nullable = true)
 |-- patient_allergic: string (nullable = true)
 |-- patient_weight_kg: string (nullable = true)
 |-- patient_height_sm: string (nullable = true)
 |-- patient_nhs_number: string (nullable = true)
 |-- doctor_name: string (nullable = true)
 |-- appointment_date: string (nullable = true)
 |-- patient_show: string (nullable = true)
 |-- is_regular_visit: string (nullable = true)
 |-- prescribed_medicines: string (nullable = true)
 |-- diagnosis: string (nullable = true)



#### Count the missing values in the `patients["diagnosis"]`. Your output should be `488`.

In [26]:
# Your code here
patients.filter((patients["diagnosis"].isNull())).count()

488

#### Print the `diagnosis` column in `patients`.

*Hint: First select the column (`.select()`) then use the `show()` method*.

In [27]:
# Your code here
patients.select("diagnosis")

DataFrame[diagnosis: string]

#### Fill the missing values in column `diagnosis` with a string `no diagnosis`.

*Hint: Use `.fillna()`.*

In [28]:
# Your code here
patients = patients.fillna({'diagnosis':'no diagnosis'})

#### Print the `diagnosis` column again to confirm null values are replaced.

In [34]:
# Your code here
patients.filter((patients["diagnosis"].isNull())).count()

0

#### Count the missing values in column `doctor_name`. You should see output `58`.

In [35]:
# Your code here
patients.filter((patients["doctor_name"].isNull())).count()

58

#### Drop the rows in `patients` where column `doctor_name` is missing data. Remember to assign the converted dataset back to `patients`.

*Hint: Use `.dropna()`.*

In [36]:
# Your code here
patients =patients.na.drop(subset=["doctor_name"])

#### Now count the missing values in `doctor_name` again. You should see output `0`.

In [37]:
# Your code here
patients.filter((patients["doctor_name"].isNull())).count()

0

#### Convert the boolean columns `patient_show`, `is_regular_visist`, `patient_diabetic`, and `patient_allergic` to int.

*Hint: [Here](https://stackoverflow.com/questions/33354571/casting-a-new-derived-column-in-a-dataframe-from-boolean-to-integer) is an example.*

In [42]:
# Your code here
patients = patients.withColumn('patient_show',(patients["patient_show"]=="true").cast('integer'))
patients = patients.withColumn('is_regular_visit',(patients["is_regular_visit"]=="true").cast('integer'))
patients = patients.withColumn('patient_diabetic',(patients["patient_diabetic"]=="true").cast('integer'))
patients = patients.withColumn('patient_allergic',(patients["patient_allergic"]=="true").cast('integer'))

#### Add a new column called `diagnosis_int`. The value in this column should be `0` if the corresponding row of the `diagnosis` column is `no diagnosis`. Otherwise the value should be `1`.

*Hint: The way to add a derived coloumn is similar to converting a column values as you've done in the previous question.*

In [44]:
# Your code here
patients = patients.withColumn('diagnosis_int',(patients["diagnosis"]=="no_diagnosis").cast('integer'))

#### Add a new column called `gender_int`. The value in this column should be `0` if the corresponding row of the `patient_gender` column is `Male`. Otherwise the value should be `1`.

In [45]:
# Your code here
patients = patients.withColumn('gender_int',(patients["patient_gender"]=="Male").cast('integer'))

Due to the time limitation for this lab, we won't go through all steps of data wrangling like we did in the Unsupervised Learning with Scikit-Learn Lab. We'll simply keep the numerical columns we have so far and drop the rest so that we'll have time to practice MLlib.

#### Drop the following columns from `patients`: 

```
['id', 'patient_name', 'patient_email', 'patient_nhs_number', 'doctor_phone', 'patient_dob', 'doctor_name', 'appointment_date', 'prescribed_medicines', 'diagnosis', 'patient_gender']
```

In [48]:
# Your code here

patients = patients.drop('id', 'patient_name', 'patient_email', 'patient_nhs_number', 'doctor_phone', 'patient_dob', 
         'doctor_name', 'appointment_date', 'prescribed_medicines', 'diagnosis', 'patient_gender')

In [55]:
patients = patients.withColumn("patient_weight_kg",(patients["patient_weight_kg"]).cast('integer'))
patients = patients.withColumn("patient_height_sm",(patients["patient_height_sm"]).cast('integer'))

#### Call `printSchema()` again for `patients`. You should see all fields remaining are interger types.

In [56]:
# Your code here
patients.printSchema()

root
 |-- patient_diabetic: integer (nullable = true)
 |-- patient_allergic: integer (nullable = true)
 |-- patient_weight_kg: integer (nullable = true)
 |-- patient_height_sm: integer (nullable = true)
 |-- patient_show: integer (nullable = true)
 |-- is_regular_visit: integer (nullable = true)
 |-- diagnosis_int: integer (nullable = false)
 |-- gender_int: integer (nullable = true)



#### You now get the feeling that PySpark have similar features as Pandas to perform data wrangling. However data wrangling with PySpark is not as convenient as in Pandas because PySpark is specialized for data engineering rather than data processing. The real value of Spark is to compute really big datasets with complex and time-consuming algorithms. Many data scientists/engieers use Pandas for data wrangling then import the data to Spark for computation.

# KMeans Analysis Example

Finally we will show you an example of clustering the cleaned data with KMeans. Because you have done the same thing in the Supervised Learning with Scikit-Learn lab, we won't ask you to figure it out with PySpark's MLlib. We are providing the code for you to reference.

In order to perform KMeans analysis in PySpark, **we need to first use the `VectorAssembler` to convert the existing data into high-dimensional vectors**. We will assign the generated vectors into a new column called `features`.

In [57]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["patient_diabetic", "patient_allergic", "patient_weight_kg", "patient_height_sm", "patient_show", "is_regular_visit", "diagnosis_int", "gender_int"],
    outputCol="features")

output = assembler.transform(patients)
output.select("features").show(truncate=False)

Py4JJavaError: An error occurred while calling o447.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 41, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<patient_diabetic_double_VectorAssembler_0f39d3d329c3:double,patient_allergic_double_VectorAssembler_0f39d3d329c3:double,patient_weight_kg_double_VectorAssembler_0f39d3d329c3:double,patient_height_sm_double_VectorAssembler_0f39d3d329c3:double,patient_show_double_VectorAssembler_0f39d3d329c3:double,is_regular_visit_double_VectorAssembler_0f39d3d329c3:double,diagnosis_int_double_VectorAssembler_0f39d3d329c3:double,gender_int_double_VectorAssembler_0f39d3d329c3:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 21 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<patient_diabetic_double_VectorAssembler_0f39d3d329c3:double,patient_allergic_double_VectorAssembler_0f39d3d329c3:double,patient_weight_kg_double_VectorAssembler_0f39d3d329c3:double,patient_height_sm_double_VectorAssembler_0f39d3d329c3:double,patient_show_double_VectorAssembler_0f39d3d329c3:double,is_regular_visit_double_VectorAssembler_0f39d3d329c3:double,diagnosis_int_double_VectorAssembler_0f39d3d329c3:double,gender_int_double_VectorAssembler_0f39d3d329c3:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 21 more


Then we import the `KMeans` class from Pyspark's MLlib and fit the new dataset with the `features` column to KMeans. We call `KMeans().setK(4)` to indicate we would like to receive 4 data clusters. Finally we print the center of each cluster.

In [58]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(4).setSeed(1)

model = kmeans.fit(output)

wssse = model.computeCost(output)
print("Within Set Sum of Squared Errors = " + str(wssse))

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Py4JJavaError: An error occurred while calling o493.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 42, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<patient_diabetic_double_VectorAssembler_0f39d3d329c3:double,patient_allergic_double_VectorAssembler_0f39d3d329c3:double,patient_weight_kg_double_VectorAssembler_0f39d3d329c3:double,patient_height_sm_double_VectorAssembler_0f39d3d329c3:double,patient_show_double_VectorAssembler_0f39d3d329c3:double,is_regular_visit_double_VectorAssembler_0f39d3d329c3:double,diagnosis_int_double_VectorAssembler_0f39d3d329c3:double,gender_int_double_VectorAssembler_0f39d3d329c3:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:220)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
	at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:572)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:561)
	at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:386)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:282)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:251)
	at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:362)
	at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:340)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:340)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<patient_diabetic_double_VectorAssembler_0f39d3d329c3:double,patient_allergic_double_VectorAssembler_0f39d3d329c3:double,patient_weight_kg_double_VectorAssembler_0f39d3d329c3:double,patient_height_sm_double_VectorAssembler_0f39d3d329c3:double,patient_show_double_VectorAssembler_0f39d3d329c3:double,is_regular_visit_double_VectorAssembler_0f39d3d329c3:double,diagnosis_int_double_VectorAssembler_0f39d3d329c3:double,gender_int_double_VectorAssembler_0f39d3d329c3:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:220)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 29 more
