# Introduction to Pyspark Tutorial

Authors: Yusen He & Martin Pollack

## Setting up Pyspark

First we have to install the Pyspark package to our environment before we can even import.

To do so we actually have to step outside of Python for a second. Instead we have to write a terminal comand to install a Python package. If you're not familiar with a computer's terminal, it is basically a place where you can talk directly to your computer.

To exit Python and communicate with the terminal in a Jupyter notebook, type a `!` as the first character of the code cell. Then we run the `pip install pyspark` command in the terminal to install the PySpark package.

In [106]:
!pip install pyspark



Now we can import objects and functions from the PySpark package as we normally do.

We can then start using the package by creating a Spark session. This means we are substantiating a Spark Driver that communicates with us humans, doing so through a SparkContext. 

Note that for this example the Spark cluster we are connecting to will only include one node, the computer you are using. But normally our cluster would have lots of nodes.

In [107]:
from pyspark.sql import SparkSession

SparkConnection = SparkSession.builder.appName('Dataframe').getOrCreate()
SparkConnection

Next we read in/load our data into our Spark cluster.

To do so, we take our `SparkConnection` and use its `read` method. We specify some options to make sure everything goes smoothly and then give a `csv` file path.

An important thing in Spark is the "schema" of the data. This just means the data types of the various columns. By telling Spark to `inferSchema`, we just let it makes its best guess on what the data types of our columns are.

We can see that it does a pretty good job of doing so. `Name` was read as a string, and the others as an integer.

In [108]:
df_pyspark = SparkConnection.read.option('header','true').csv('salary.csv',inferSchema=True)
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



A connection to our data stored in our cluster is stored in `df_pyspark`.

To actually see the data, we can use the `show()` method.

In [109]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



## Manipulating Data in Pyspark

Getting rid of an unneeded column is easy. Just use the `drop()` method.

But notice that the column is not permanently dropped. If we want to have data without the `Name` column, we would have to assign our line of code to a new name.

In [111]:
#Drop the name column
df_pyspark.drop('Name').show()

df_pyspark.show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|null|      null| 40000|
|  34|        10| 38000|
|  36|      null|  null|
+----+----------+------+



Instead we can also specify the columns we want to keep. The `select()` method allows us to do this. Sounds a lot like SQL, doesn't it?

In [None]:
df_pyspark.select(["age", "Salary"])

Selecting certain rows is just as easy with the `filter()` method. 

Just write out your conditional in a string, or surrounded by quotation marks.

In [None]:
df_pyspark.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



Instead of writing our conditional in a string, we can also use a "boolean column", or a column of trues and falses.

Multiple conditions can also be chained together using "and" logic with the `&` symbol, exactly as we did in pandas.

In [None]:
df_pyspark.filter((df_pyspark['Salary']<=20000) & (df_pyspark['Salary']>=18000)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



We can also chain multiple of these commands together.

For example, below we filter and then select certain columns.

In [None]:
df_pyspark.filter("Salary<=20000").select(['Name','age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



Sometimes we want to drop all rows with missing values. This is done with `.na.drop()`

In [112]:
df_pyspark.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



Or maybe missing values are just code for a 0 or another value in our dataset.

To subsitute a specific value for missing values use `na.fill()`, specifying first the value you want to put in, and then the columns where you want to make this substitution.

In [113]:
df_pyspark.na.fill(0,['age','Experience','Salary']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh|  0|         0| 40000|
|     null| 34|        10| 38000|
|     null| 36|         0|     0|
+---------+---+----------+------+



Since for our later steps we cannot have missing values, we are going to remove them forever here.

In [None]:
df_pyspark = df_pyspark.na.drop()

## Pyspark Groupby and Aggregate Functions

Like in pandas and SQL, we may want to look at summary statistics by various groups.

To do this in PySpark, we first call the `groupBy()` method with a column name. Then we say how we want to aggregate our groups together, or what summary statistic we are interested in.

Here we look at sums.

In [118]:
### Grouped to find the maximum salary
df_pyspark.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



But we can also look at group means.

In [120]:
df_pyspark.groupBy('Experience').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



Or look at how many observations are in each group.

In [121]:
df_pyspark.groupBy('Experience').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



## SQL-like queries in PySpark

Spark DataFrames are actually within "Spark SQL." This means that we interact with a DataFrame and its functions, but really behind the scenes SQL is being run to interact with data in Spark.

But we can also directly interact with our Spark data using SQL.

First we create something called a "temporary view", and give it a name. This is basically a snapshot of our data that will be thought of as a relational database table we can interact with.

In [None]:
df_pyspark.createOrReplaceTempView("SalaryTable")

Now we can just query from our SalaryTable!

In [None]:
sql_df = SparkConnection.sql("SELECT * FROM SalaryTable")
sql_df.show()

Any SQL query, no matter how complicated, can be done.

In [None]:
sql_highest_salary_df = SparkConnection.sql("SELECT Name, Salary FROM SalaryTable ORDER BY Salary LIMIT 1")
sql_highest_salary_df.show()

## Machine Learning with Pyspark

### Prepare Dataset

To set up our data to do machine learning, we need to create one column with our outcome variable, and one column that for each row has a list, or a "vector," of potential predictors.

We create our column of vectors with the `VectorAssembler` object. We tell the object what predictors we want put into the vector with `inputCols` and the name of the resulting column with `outputCol`.

Then we call the `transform()` method on our `VectorAssembler` object, giving our DataFrame as a parameter. This will actually give us the data we want, and we save the data to a new variable called `output`.

In [123]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=["age","Experience"],outputCol="Independent Features")
#Conduct transfrom operation on MLdata
output = featureassembler.transform(df_pyspark)
#Display the transformation
output.show()

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



Now to do machine learning we only need our "Independent Features", or predictors, and "Salary," or outcome, columns.

In [124]:
finalized_data = output.select("Independent Features","Salary")
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



We can then create our training and testing datasets.

This is done with the `randomSplit()` method of our DataFrame. We just tell the method the proportion of data we want in each dataset.

In [None]:
train_data,test_data = finalized_data.randomSplit([0.6,0.4])

### Linear Regression in Pyspark

PySpark has lots of machine learning algorithms in its `ml` sub-package. You'll soon see that the process for doing machine learning in Spark is very similar to what we did in scikit-learn.

We start by importing the `LinearRegression` object from this sub-package.

Like scikit-learn, we first specifiy the specifics of the model, like what column has our features and which has our labels or outcome. Then we use the `fit()` method.

In [125]:
from pyspark.ml.regression import LinearRegression

LRregressor = LinearRegression(featuresCol='Independent Features', labelCol='Salary')
LRregressor = LRregressor.fit(train_data)

We can then use our model to make predictions on the testing dataset. We use the `evaluate()` method, then getting the actual prediction values with the `predictions` field of the resulting object.

In [126]:
### Prediction
LR_pred = LRregressor.evaluate(test_data)
LR_pred.predictions.show()



+--------------------+------+----------+
|Independent Features|Salary|prediction|
+--------------------+------+----------+
|          [23.0,2.0]| 18000|   15000.0|
|          [24.0,3.0]| 20000|   15000.0|
|          [29.0,4.0]| 20000|   15000.0|
|          [30.0,8.0]| 25000|   15000.0|
|         [31.0,10.0]| 30000|   15000.0|
+--------------------+------+----------+



Evaluation metrics can also be calculated with, for example, the `meanAbsoluteError` and `meanSquaredError` fields.

In [127]:
print('The MAE is:',LR_pred.meanAbsoluteError)
print('The MSE is:',LR_pred.meanSquaredError)

The MAE is: 7600.0
The MSE is: 76799999.99999999


### XGBoost Regression in Pyspark

But of course we can fit other models too. Next we make an XGBoost model.

Below we import the model we want `GradientBoostedTrees`.

In [128]:
from pyspark.ml.regression import GBTRegressor

Now it's your turn!

Create the `GBTRegressor` model and then fit it to your training dataset.

Use your model to make predictions on the testing dataset, and find the mean absolute error.