This notebook is created by Shashank Taneja for the purpose of learning basic of PySpark. The pre-requisites for this notebook are good understanding of Pandas and scikit learning packages

## Initiating the spark context

In [None]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
sc = SparkSession.builder.getOrCreate()

In [None]:
# Print the tables in the catalog. 
print(spark.catalog.listTables())
#Returns the names of all the tables in your cluster as a list.

Using SQL to query spark cluster

In [None]:
query = "FROM data SELECT * LIMIT 10"

# Get the first 10 rows of flights
spark_df = sc.sql(query)

# Show the results
spark_df.show()

Convert the results to a pandas DataFrame

In [None]:
pandas_df = spark_df.toPandas()

_____
The `.createDataFrame()` method takes a pandas DataFrame and returns a Spark DataFrame.

In [None]:
spark_df = sc.createDataFrame(pandas_df)

**spark_df is stored only locally**. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts.

For example, a SQL query (using the `.sql()` method) that references your DataFrame will throw an error. To access the data in this way, you have to save it as a temporary table.

You can do this using the `.createTempView()` Spark DataFrame method, which takes as its only argument the name of the temporary table you'd like to register. This method registers the DataFrame as a table in the catalog, but as this table is temporary, it can only be accessed from the specific SparkSession used to create the Spark DataFrame.
_______

There is also the method `.createOrReplaceTempView()`. This safely creates a new temporary table if nothing was there before, or updates an existing table if one was already defined. You'll use this method to avoid running into problems with duplicate tables.

In [None]:
spark_temp.createOrReplaceTempView('temp')

In [None]:
# Examine the tables in the catalog to see the new table
print(sc.catalog.listTables())

____

Let's look at performing column-wise operations. In Spark you can do this using the `.withColumn()` method, which takes two arguments. First, a string with the name of your new column, and second the new column itself.

Updating a Spark DataFrame is somewhat different than working in pandas because the **Spark DataFrame is immutable**. This means that it can't be changed, and so columns can't be updated in place. Thus, all these methods return a new DataFrame. To overwrite the original DataFrame you must reassign the returned DataFrame using the method like so:

In [None]:
df = df.withColumn("newCol", df.oldCol + 1)

The above code creates a DataFrame with the same columns as df plus a new column, newCol, where every entry is equal to the corresponding entry from oldCol, plus one. _To overwrite an existing column, just pass the name of the column as the first argument!_
____

### Rename a column

In [None]:
df = df.withColumnRenamed("column", "new_name")

___
### Filtering Data

Let's take a look at the `.filter()` method. As you might suspect, this is the Spark counterpart of SQL's `WHERE` clause. The .filter() method takes either an expression that would follow the WHERE clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.

For example, the following two expressions will produce the same output:

In [None]:
df.filter("column_name > 120").show()
df.filter(df.column_name > 120).show()

Notice that in the first case, we pass a string to .filter(). In SQL, we would write this filtering task as `SELECT * FROM df WHERE column_name > 120`. Spark's .filter() can accept any expression that could go in the WHEREclause of a SQL query (in this case, "column_name > 120"), as long as it is passed as a string. Notice that in this case, we do not reference the name of the table in the string -- as we wouldn't in the SQL request.

In the second case, we actually pass a column of boolean values to .filter(). Remember that df.column_name > 120 returns a column of boolean values that has True in place of those records in df.column_name that are over 120, and False otherwise.
___

## Selecting

The Spark variant of SQL's SELECT is the `.select( )` method. This method takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the `df.colName` syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it, much like inside `.withColumn()`.

The difference between `.select()` and `.withColumn()` methods is that `.select()` returns only the columns you specify, while `.withColumn()` returns all the columns of the DataFrame in addition to the one you defined. It's often a good idea to drop columns you don't need at the beginning of an operation so that you're not dragging around extra data as you're wrangling. In this case, you would use `.select()` and not `.withColumn()`.

In [None]:
df1 = df.select("column1", "column2", "column3")

In [None]:
df2 = df.select(df.column1, df.column2, df.column3)

___
## Selecting II

You can also use the `.alias()` method to rename a column you're selecting. So if you wanted to `.select()` the column `new_name` (which isn't in your DataFrame) you could do

`df.select((df.column/60).alias("new_name"))`

The equivalent Spark DataFrame method `.selectExpr()` takes SQL expressions as a string:
`df.selectExpr("column/60 as new_name")`

with the SQL as keyword being equivalent to the `.alias()` method. To select multiple columns, you can pass multiple strings.
***

## Aggregating

All of the common aggregation methods, like `.min()`, `.max()`, and `.count()` are GroupedData methods. These are created by calling the `.groupBy()` DataFrame method. For example, to find the minimum value of a column, col, in a DataFrame, df, you could do: 

`df.groupBy().min("col").show()`

This creates a `GroupedData` object (so you can use the `.min()` method), then finds the minimum value in col, and returns it as a DataFrame.


When you pass the name of one or more columns in your DataFrame to the `.groupBy()` method, the aggregation methods behave like when you use a `GROUP BY` statement in a SQL query!

In [None]:
#Another method
# Group by column_name
by_column = df.groupBy("column_name")

# Count of each discrete value in the column 'column_name' 
by_column.count().show()

In [None]:
two_column_grouping = df.groupBy('column1','column2')

# Average column3 by column1 and column2
two_column_grouping.avg('column3').show()

#### `pyspark.sql.functions` submodule

This submodule contains many useful functions for computing things like standard deviations. 

In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Standard deviation of departure delay
two_column_grouping.agg(F.stddev(column3)).show()

___
## Joining

A join will combine two different tables along a column that they share. This column is called the key. 


In PySpark, joins are performed using the DataFrame method `.join()`. This method takes three arguments. The first is the second DataFrame that you want to join with the first one. The second argument, `on`, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. The third argument, `how`, specifies the kind of join to perform.

In [None]:
df_merged = df1.join(df2, on = 'common_key', how = 'leftouter')

___
## Machine Learning Pipelines

At the core of the `pyspark.ml` module are the `Transformer` and `Estimator` classes. Almost every other class in the module behaves similarly to these two basic classes.

`Transformer` classes have a `.transform()` method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class Bucketizer to create discrete bins from a continuous feature or the class PCA to reduce the dimensionality of your dataset using principal component analysis.

`Estimator` classes all implement a `.fit()` method. These methods also take a DataFrame, but instead of returning another DataFrame they return a model object. This can be something like a `StringIndexerModel` for including categorical data saved as strings in your models, or a `RandomForestModel` that uses the random forest algorithm for classification or regression.

___
### Data types

It's important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called 'doubles' in Spark).

When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn't always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.
___
To remedy this, you can use the `.cast()` method in combination with the `.withColumn()` method. It's important to note that `.cast()` works on columns, while `.withColumn()` works on DataFrames.

The only argument you need to pass to `.cast()` is the kind of value you want to create, in string form. For example, to create integers, you'll pass the argument "integer" and for decimal numbers you'll use "double".

You can put this call to .`cast()` inside a call to `.withColumn()` to overwrite the already existing column, just like you did in the previous chapter!

In [None]:
dataframe = dataframe.withColumn("col", dataframe.col.cast("new_type"))

##### Making a Boolean

In [None]:
dataframe = dataframe.withColumn("bool_col", dataframe.col>0)

___
#### Strings and factors
As you know, Spark requires numeric data for modeling. So far this hasn't been an issue; even boolean columns can easily be converted to integers without any trouble. But you'll also be using the categorical features in your model. These are coded as strings and there isn't any obvious way to convert them to a numeric data type.

Fortunately, PySpark has functions for handling this built into the `pyspark.ml.features` submodule. You can create what are called 'one-hot vectors' to represent the categorical features. A one-hot vector is a way of representing a categorical feature where every observation has a vector in which all elements are zero except for at most one element, which has a value of one (1).

Each element in the vector corresponds to a level of the feature, so it's possible to tell what the right level is by seeing which element of the vector is equal to one (1).

The first step to encoding your categorical feature is to create a `StringIndexer`. Members of this class are `Estimators` that take a DataFrame with a column of strings and map each unique string to a number. Then, the `Estimator` returns a `Transformer` that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.

The second step is to encode this numeric column as a one-hot vector using a `OneHotEncoder`. This works exactly the same way as the `StringIndexer` by creating an `Estimator` and then a `Transformer`. The end result is a column that encodes your categorical feature as a vector that's suitable for machine learning routines!

This may seem complicated, but don't worry! All you have to remember is that you need to create a `StringIndexer` and a `OneHotEncoder`, and the `Pipeline` will take care of the rest.

In [None]:
# Create a StringIndexer
column_name_indexer = StringIndexer(inputCol="column_name",outputCol="column_name_index")

# Create a OneHotEncoder
column_name_encoder = OneHotEncoder(inputCol="column_name_index",outputCol="column_name_fact")

___
#### Assemble a vector

The last step in the `Pipeline` is to combine all of the columns containing our features into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

Because of this, the `pyspark.ml.feature` submodule contains a class called `VectorAssembler`. This `Transformer` takes all of the columns you specify and combines them into a new vector column.

In [None]:
vec_assembler = VectorAssembler(inputCols=["column1", "column2", "column3", "column4", "column5"], outputCol="target")

___
#### Create the pipeline

You're finally ready to create a Pipeline!

`Pipeline` is a class in the `pyspark.ml` module that combines all the `Estimators` and `Transformers` that you've already created. This lets you reuse the same modeling process over and over again by wrapping it up in one simple object. Neat, right?

In [None]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
name_pipe = Pipeline(stages=[column_name_indexer, column_name_encoder, vec_assembler])

#### Test vs Train

In Spark it's important to make sure you split the data after all the transformations. This is because operations like `StringIndexer` don't always produce the same index even when given the same list of strings.

In [None]:
# Fit and transform the data
piped_data = name_pipe.fit(model_data).transform(model_data)

In [None]:
# Split the data into training and test sets (60% of training and 40% test)
training, test = piped_data.randomSplit([.6, .4])

___
### Modeling 

##### We'll tale Logistic regression as an example

#### Create the modeler

The Estimator we'll be using is a `LogisticRegression` from the `pyspark.ml.classification` submodule.

In [None]:
 # Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

#### Cross validation

PySpark's default fold is three. You'll be using cross validation to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters, `elasticNetParam` and `regParam`, and using the cross validation error to compare all the different models so you can choose the best one!

The `pyspark.ml.evaluation` submodule has classes for evaluating different kinds of models. Our model is a binary classification model, so we'll be using the `BinaryClassificationEvaluator` from the `pyspark.ml.evaluation` module.

This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number. 

In [None]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

Next, we need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule `pyspark.ml.tuning` includes a class called `ParamGridBuilder` that does just that. 

You'll need to use the `.addGrid()` and `.build()` methods to create a grid that you can use for cross validation. The `.addGrid()` method takes a model parameter and a list of values that we want to try. The `.build()` method takes no arguments, it just returns the grid that you'll use later.

This is similar to creatinga grid using `GridSearchCV()` in scikit learn

In [None]:
# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

Make the validator

The submodule `pyspark.ml.tuning` also has a class called `CrossValidator` for performing cross validation. This `Estimator` takes the modeler you want to fit, the grid of hyperparameters you created, and the evaluator you want to use to compare your models.

In [None]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

Fit the model

In [None]:
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

In [None]:
# Fitting a simple logistic regression

# Call lr.fit()
best_lr = lr.fit(training)

# Print best_lr to verify that it's an object of the LogisticRegressionModel class.
print(best_lr)

#### Evaluate the model

 It's finally time to test the model on the test data! We can use the same evaluator we made to fit the model.

In [None]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

___