## 1. Some Basics

**This is your jupyter notebook. You can use this to connect to Spark on DDP and run your code. **

Notebooks are made up of cells, and you can run 1 cell at a time. 

To view options on Jupyter notebook press Help>Keyboard Shortcuts, some useful ones:
1. Ctrl + Enter = Run Cell
2. Esc + A = Add Cell Above
3. Esc + B = Add Cell Below
4. Esc + L = Show Line numbers
5. Ctrl + S = Save Notenbook

Tips!
* Use InternetExplorer or FireFox, Chrome can sometimes be slow.
* Dont Open too many notebooks at the same time
* Remember to save as frequently as you can! 




### 1.1 This is some tips on how to edit text in Markdowns


# This is a level 1 heading
## This is a level 2 heading
This is some plain text that forms a paragraph.
Add emphasis via **bold** and __bold__, or *italic* and _italic_.

Paragraphs must be separated by an empty line.

* Sometimes we want to include lists.
 * Which can be indented.

1. Lists can also be numbered.
2. For ordered lists.

[It is possible to include hyperlinks](https://www.example.com)

Inline code uses single backticks: `foo()`, and code blocks use triple backticks:

```
bar()
```

Or can be indented by 4 spaces:

    foo()

And finally, adding images is easy: ![Alt text](https://www.example.com/image.jpg)

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

## 2 Loading data

[This is a sample dataset from Kaggle, "Rain in Australia"](https://www.kaggle.com/jsphyg/weather-dataset-rattle-package) 

Here we will:
1. Store the data in a [Spark DataFrame](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
2. From [HDFS](https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html) load this file into Spark using csv loading utility 
    * Open a terminal
    * cd to the directory containing the weatherAUS.csv file
    * Type: `hdfs dfs -put weatherAUS.csv`
    * Check the file was correctly placed by typing: `hdfs dfs -ls -t`
3. Analyze the Schema

In [None]:
print (spark.read                      # The DataFrameReader
        .option("delimiter", "\t")     # Use tab delimiter (default is comma-separator)
        .option("header", "true")      # Use first line of all files as header
        .csv('weatherAUS.csv')         # Creates a DataFrame from CSV after reading in the file
        .printSchema()
      )

(spark.read                            # The DataFrameReader
        .option("delimiter", "\t")     # Use tab delimiter (default is comma-separator)
        .option("header", "true")      # Use first line of all files as header
        .csv('weatherAUS.csv')         # Creates a DataFrame from CSV after reading in the file        
).show(2, False)


### 2.1 Load Data Using Spark CSV inference and Correct Delimiter

In [None]:
(spark.read                            # The DataFrameReader
        .option("delimiter", ",")      # Use tab delimiter (default is comma-separator)
        .option("header", "true")      # Use first line of all files as header
        .option("inferSchema", "true") # Use Spark's in built csv inference
        .csv('weatherAUS.csv')         # Creates a DataFrame from CSV after reading in the file
        .printSchema()
)

In [None]:
rainDataset = (spark.read                            # The DataFrameReader
                    .option("delimiter", ",")        # Use tab delimiter (default is comma-separator)
                    .option("header", "true")        # Use first line of all files as header
                    .csv('weatherAUS.csv')           # Creates a DataFrame from CSV after reading in the file
              )
partitions = rainDataset.rdd.getNumPartitions()
print("Partitions: {0:,}".format( partitions ))
print("Records: {0:,}".format( rainDataset.count()))



### 2.2 Convert Dataset to Pandas and view contents

In [None]:
import pandas as pd
pd.options.display.max_rows = 9999
pd.set_option('display.max_colwidth', -1)
pd.set_option('display.max_columns', 1000)
pd.set_option('display.max_row', 10)

rainDataset.limit(10).toPandas()
#rainDataset.toPandas() --Not Ideal

### 2.3 Data Cleansing/Correcting

[This is a sample dataset from Kaggle, "Rain in Australia"](https://www.kaggle.com/jsphyg/weather-dataset-rattle-package) 

Here we will:
1. Use [when](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.when) to match 'NA'
2. Use [lit](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.lit) function to replace the NA with nulls
3. Type Cast the columns to correct data types
4. Convert Yes/No to 1/0 

Be sure to bookmark Spark SQL API http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html 

In [None]:
from pyspark.sql.functions import when, lit, col, trim

rainDataset_withNulls = rainDataset

for c in rainDataset.drop('RainTomorrow', 'Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday').columns:
    rainDataset_withNulls = rainDataset_withNulls.withColumn(str(c), when(trim(rainDataset[c]) == 'NA', lit(None)).otherwise(rainDataset[c]))

In [None]:
rainDataset_withNulls.printSchema()

In [None]:
#Replace Yes/No with 1/0
rainDataset_withNulls = rainDataset_withNulls.withColumn('RainTomorrow', when(rainDataset_withNulls.RainTomorrow == 'Yes', 1).otherwise(0))

from pyspark.sql.types import DoubleType, FloatType, IntegerType

doubles = ['MinTemp', 'MaxTemp', 'Rainfall','Evaporation', 'Sunshine', 'Pressure9am', 'Pressure3pm', 'Temp9am', 'Temp3pm', 'RISK_MM']
integers = ['WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Cloud9am', 'Cloud3pm']

for col in doubles:
    rainDataset_withNulls = rainDataset_withNulls.withColumn(str(col), rainDataset_withNulls[col].cast(DoubleType()))

    
for col in integers:
    rainDataset_withNulls = rainDataset_withNulls.withColumn(str(col), rainDataset_withNulls[col].cast(IntegerType()))

rainDataset_withNulls.printSchema()

### 2.4 Plotting Data

The package we use for plotting here is [matplotlib](https://matplotlib.org/)
Here we will:
1. Convert Spark DateFrame to Pandas
2. Plot the different timeseries on the same plot 
3. Add BarChart to the same plot, [full tutorial here](http://jonathansoma.com/lede/algorithms-2017/classes/fuzziness-matplotlib/how-pandas-uses-matplotlib-plus-figures-axes-and-subplots/)


In [None]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.pyplot import figure
%matplotlib inline

#df = rainDataset_withNulls.filter("Location IN('Albury', 'MountGinini')").toPandas()
df = rainDataset_withNulls.toPandas()

fig = plt.figure(num=None, figsize=(20, 10), dpi=300, facecolor='w', edgecolor='k')
ax = fig.add_subplot(111)


df.groupby('Location').plot(x='Date', y='MaxTemp', ax=ax, legend=False)

In [None]:
fig = plt.figure(num=None, figsize=(20, 10), dpi=300, facecolor='w', edgecolor='k')

# Divide the figure into a 2x1 grid, and give me the first section
ax1 = fig.add_subplot(211)

# Divide the figure into a 2x1 grid, and give me the second section
ax2 = fig.add_subplot(212)

df.groupby('Location').plot(x='Date', y='MaxTemp', ax=ax1, legend=False)
df.groupby('Location')['Rainfall'].mean().sort_values().plot(kind='barh', ax=ax2)

## 3 Preparing Data for PySpark ML

In this section we will apply some transformations to the dataset to make it ready for Classification
Steps:
1. Create a [String Indexer](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer), this will convert String Variables to Integers 
2. Create a [Pipleline](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.Pipeline) instance and use it to run the String Indexer 

Bookmark [SparkML API docs!](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#) remember to always use the right version of Spark. Currently we have 2.2 on DDP and 2.3 on AI Lab. Spark Versions can very quite alot from version to version



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

cols = [item[0] for item in rainDataset_withNulls.drop('Date').dtypes if item[1].startswith('string')]

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid='keep')
    for c in cols
]

pipeline = Pipeline(stages=indexers)

rainDataset_indexed = pipeline.fit(rainDataset_withNulls).transform(rainDataset_withNulls).drop(*cols)

In [None]:
rainDataset_indexed.limit(10).toPandas()

In [None]:
#See the target distribution
rainDataset_indexed.groupby("RainTomorrow").count().toPandas()

In [None]:
from pyspark.sql.functions  import date_format

#See the distribution of data over time
rainDataset_indexed.groupby(date_format('Date', 'YYYY').alias('Year')).count().toPandas().plot.bar(x = 'Year')

### 3.1 Preparing Data for PySpark ML

In this section we will apply VectorAssembler to the data, this step is a pre-requisite for almost all ML classifiction functions:
1. Create a [Vector Assembler](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler), this will convert the columns into 1 Vector
2. Create a [Pipleline](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.Pipeline) instance and use it to run the Vector Assembler
3. Batch together the transformations we have done on the data so far

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

#This is to create a list of columns we want as features
feature_cols = rainDataset_indexed.drop('Date', 'RainTomorrow', 'RISK_MM').columns

#This is to initialize the VectorAssembler
assembler_features = VectorAssembler(inputCols=feature_cols, outputCol='features')
tmp = [assembler_features]
pipeline = Pipeline(stages=tmp)

#Fill Nulls with 0's for now, VectorAssembler doesnt take
rainDataset_indexed = rainDataset_indexed.na.fill(0)

rainDataset_vectorized = pipeline.fit(rainDataset_indexed).transform(rainDataset_indexed)

rainDataset_vectorized.limit(1).toPandas()

In [None]:
#Define the stages in the pipeline
pipeline_combined = Pipeline(stages=indexers + tmp)

#Fill Nulls with 0's for now, VectorAssembler doesnt take
rainDataset_combined = rainDataset_withNulls.na.fill(0)

rainDataset_vectorized = (pipeline_combined.fit(rainDataset_combined).transform(rainDataset_combined)).drop(*cols)

rainDataset_vectorized.limit(1).toPandas()

### 3.2 Train/Test Splits, Upsampling

In this section we will use sampling functions in PySpark to create a balanced Training Set and create a Test Set:
1. Use DataFrame [Sample](https://spark.apache.org/docs/2.2.1/api/python/pyspark.sql.html) function, this will allow us to specify how we want to split our dataset
2. Use UpSampling with Replacement to balance our Training Set


In [None]:
from random import randint
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)


#Raining Days UpSampling Ratio
raining_days_up = 3.4607


df_train = unionAll(rainDataset_vectorized.filter("RainTomorrow=0").sample(False, 0.7, seed=randint(100, 999)),\
                    rainDataset_vectorized.filter("RainTomorrow=1").sample(False, 0.7, seed=randint(100, 999)).\
                    sample(True, raining_days_up, seed=randint(100, 999)))


df_train.cache()
partitions = df_train.rdd.getNumPartitions()
print("Train Partitions: {0:,}".format(partitions ))
print("Train Records: {0:,}".format(df_train.count()))
print(df_train.groupby("RainTomorrow").count().toPandas())

df_test = rainDataset_vectorized.join(df_train, df_train.columns, how='left_anti')

df_test.cache()
partitions = df_test.rdd.getNumPartitions()
print("Test Partitions: {0:,}".format( partitions ))
print("Test Records: {0:,}".format( df_test.count()))
print(df_test.groupby("RainTomorrow").count().toPandas())

## 4 Modeling

In this section we will fit a RanfomForest Model to our Training dataset, and use the Test Data to evaulate it:
1. Use [RandomForestModel](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.classification.RandomForestClassifier) to fit a model.
2. Score the Test data to get a measure of Model Accuracy

In [None]:
from pyspark.ml.classification import RandomForestClassifier

#
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees = 10, maxDepth = 5, maxBins = 5)

# Train model with Training Data
rfModel = rf.fit(df_train.withColumnRenamed("RainTomorrow", "label"))

# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(df_test)

#Print Confusion Matrix
predictions.groupby("RainTomorrow", "prediction").count().toPandas()
#76.58% Model Accuracy, 50,26% True Positive Rate

### 4.1 HyperParamter Tuning

In this section we will use grid search in PySpark to tune our HyperParameters:
1. Create an instance of [BinaryClassificationEvaulator](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.evaluation.BinaryClassificationEvaluator).
2. Build a parameter grid using [ParamGridBuilder](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder), add values which we would like to be tested
3. Create a [CrossValidator](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) we will use this to evaulate our models, using cross-fold validation

Be sure to read through the examples for other models [here!](https://spark.apache.org/docs/2.2.0/ml-tuning.html)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()

#Hyper Parameters
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [5, 15])
             .addGrid(rf.maxBins, [5, 10])
             .addGrid(rf.numTrees, [25, 50])
             .build())


print("Running HP Search") 

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 2 values for each of maxDepth, maxBins and numTrees
# this grid will have 4 x 6 x 3 = 72 parameters ( including settings for CrossValidator)
cv = CrossValidator(estimator=rf, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=3)

# Run cross validations. This can take some time // To Do, time runs, track them on SparkUI, find ways to make it faster?
cvModel = cv.fit(df_train.withColumnRenamed("RainTomorrow", "label"))

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(df_test)

#Get the best performing Model
bestModel = cvModel.bestModel
print("HP Search Complete!")

### 5.1 Diagnostics, Plots

In this section we will create a plot for Feature Importances:
1. Get the Feature Importances from the Model and plot them
2. Create a ROC Curve, get ROC & PR values on Test Data 


In [None]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.pyplot import figure

figure(num=None, figsize=(8, 6), dpi=80, facecolor='w', edgecolor='k')

beta = np.sort(bestModel.featureImportances)

figure(num=None, figsize=(8, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(beta)
plt.ylabel('Feature Importances')
plt.show()

In [None]:
# Make predictions on test data using the Transformer.transform() method.
predictions = bestModel.transform(df_test)

#Print Confusion Matrix
predictions.groupby("RainTomorrow", "prediction").count().toPandas()
# 83.16.74% Accurate, 63,34% True Positive Rate, but is ok?

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = predictions.select(['probability', 'RainTomorrow'])
 
## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("The ROC score is: ", metrics.areaUnderROC)
print("The PR score is: ", metrics.areaUnderPR)

from sklearn.metrics import roc_curve, auc
 
fpr = dict()
tpr = dict()
roc_auc = dict()
 
y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
%matplotlib inline
figure(num=None, figsize=(8, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

In [None]:
index = 0

for i in rainDataset_indexed.drop('Date', 'RainTomorrow', 'RISK_MM').columns:
    print str(i) + " :: " + str(bestModel.featureImportances[index])
    index+=1

### 5.2 Save the Model and save Reults to HIVE/Parquet

In this section we will save the model in HDFS directory. And write the scores to HIVE/Parquet

In [None]:
#View the predictions dataframe
predictions.limit(1).toPandas()

In [None]:
from pyspark.sql.functions import udf
#Helper Function to get the second element from a Vector
secondelement=udf(lambda v:float(v[1]),FloatType())

predictions.select('Date', 'Location_indexed', (secondelement('probability')).alias('probability')).limit(10).toPandas()

In [None]:
#Save the results in a HIVE table
predictions.select('Date', 'Location_indexed', (secondelement('probability')).alias('probability')).createOrReplaceTempView("p901shm_ddp_intro_rain_austrialla_results")

HIVE_SQL = "DROP TABLE IF EXISTS ddp_cvm.p901shm_ddp_intro_rain_austrialla_results"
spark.sql(HIVE_SQL)

HIVE_SQL = "CREATE TABLE ddp_cvm.p901shm_ddp_intro_rain_austrialla_results AS SELECT * FROM p901shm_ddp_intro_rain_austrialla_results"
spark.sql(HIVE_SQL)

In [None]:
#Save the results as a Parquet File
predictions.select('Date', 'Location_indexed', (secondelement('probability')).alias('probability')).write.parquet("p901shm_ddp_intro_rain_austrialla_results.parquet")

In [None]:
#Some HIVE related Tips

#Read A table as a dataframe:
#spark.table("ddp_cvm.p901shm_ddp_intro_rain_austrialla_results").printSchema()
df_ddp = spark.table("ddp_cvm.p901shm_ddp_intro_rain_austrialla_results")

## 6 Next Steps!

1. Use [OneHotEncoding](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoder) instead of StringIndexer
2. Use [Imputer](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.Imputer) instead of just filling in missing values with 0's. Improving the quality of your training data set is key. You can try inputations for each location seperatly
3. Try [TimeSeries](https://towardsdatascience.com/an-end-to-end-project-on-time-series-analysis-and-forecasting-with-python-4835e6bf050b) follow this Tutorial
4. Follow [this](https://towardsdatascience.com/tuning-hyperparameters-part-i-successivehalving-c6c602865619) tutorial on how to HyperParameter Tune