# Stock NLP exercise (Distributed ML using Spark):
We will use the news data and try to predict its effect on the Dow Jones Industrial Average (Market Index)

### Configure relevant libraries:
For this exercise we will use 2 OSS libraries:
- SynapseML from Microsoft: https://microsoft.github.io/SynapseML/ 
- SparkNLP from John Snow labs: https://nlp.johnsnowlabs.com/ 

Run code below to configure them for the session.

In [1]:
%%configure -f
{
  "name": "StockNLP",
  "conf": {
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.4,com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.1",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12",
      "spark.yarn.user.classpath.first": "true"
  }
}

StatementMeta(, 9, -1, Finished, Available)

### Getting the data

We will work with a pre-prepared dataset in parquet format.
Place the folder ``DatasetStockNews`` to your data lake storage (associated/linked to Synapse) and configure the path accordingly using ``data_lake_account_name`` and ``file_system_name`` below. 

### Configure attributes

Configure the following attributes as per your environment

- ``data_lake_account_name``: Your data lake storage account name
- ``file_system_name``: The blob container name within your storage account, containing the `DatasetStockNews`
- ``subscription_id``: Azure subscription ID
- ``resource_group``: the resource group name
- ``workspace_name``: Workspace name for the associated Azure Machine Learning workspace
- ``workspace_region``: Azure region string for the associated Azure Machine Learning workspace
- ``cog_service_subscription_key``: The API key to call cognitive services. If you do not have one already, for the purpose of this exercise, [please create one](https://docs.microsoft.com/en-us/azure/cognitive-services/cognitive-services-apis-create-account?tabs=multiservice%2Cwindows)
- ``cog_service_location``: The azure location string for the cognitive service resource (e.g: eastus)

In [2]:
from synapse.ml.cognitive import *
from pyspark.sql.functions import regexp_replace

data_lake_account_name = 'hlthmlqhke6c66zwp74'
file_system_name = 'raw'

subscription_id = "e4e90092-1fce-4464-9b56-78f904fb17d1" 
resource_group = "healthml-rs-rg" 
workspace_name = "ml-hlthml" 
workspace_region = "eastus"
cog_service_subscription_key = "3b0156112b5747d8b8491f1475f8a73d"
cog_service_location = "eastus"

StatementMeta(smallspark02, 9, 1, Finished, Available)

# Dataset Description:

There are two channels of data provided in this dataset:

- News data: Crawled historical news headlines from Reddit WorldNews Channel (/r/worldnews). They are ranked by reddit users' votes, and only the top 25 headlines are considered for a single date.
(Range: 2008-06-08 to 2016-07-01)

- Stock data: Dow Jones Industrial Average (DJIA) is used to "prove the concept".
(Range: 2008-08-08 to 2016-07-01)

There are three data files in .csv format:

- RedditNews.csv: two columns
The first column is the "date", and second column is the "news headlines".
All news are ranked from top to bottom based on how hot they are.
Hence, there are 25 lines for each date.

- DJIA_table.csv:
Downloaded directly from Yahoo Finance: check out the web page for more info.

- CombinedNewsDJIA.csv:
Combined dataset with 27 columns.
The first column is "Date", the second is "Label", and the following ones are news headlines ranging from "Top1" to "Top25".

- Combined_news_djia.parquet: combined dataset with 27 columns.
The first column is "Date", the second is "Label", and the following ones are news headlines ranging from "Top1" to "Top25". **We will be using this one for the excercise.**

Dataset citation, Kaggle source: https://www.kaggle.com/aaron7sun/stocknews 

### Read dataset into a dataframe:

In [3]:
df_stocknews = spark.read.format("parquet").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/DatasetStockNews/Combined_news_djia.parquet",header=True)
# view the column names, quickly
df_stocknews

StatementMeta(smallspark02, 9, 2, Finished, Available)

DataFrame[Date: string, Label: bigint, Top1: string, Top2: string, Top3: string, Top4: string, Top5: string, Top6: string, Top7: string, Top8: string, Top9: string, Top10: string, Top11: string, Top12: string, Top13: string, Top14: string, Top15: string, Top16: string, Top17: string, Top18: string, Top19: string, Top20: string, Top21: string, Top22: string, Top23: string, Top24: string, Top25: string]

### Clean up the dataframe with missing/"NA" values and do a quick exploratory check
note how the ``display()`` function can be helpful here, explore visual distribution of ``Label`` column by clicking on ``unique`` value and then selecting chart type

In [4]:
df_stocknews = df_stocknews.dropna(how="any")
display(df_stocknews, summary=True)

StatementMeta(smallspark02, 9, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7a4749c5-288e-409f-940f-fd727624e689)

### Feature engineering: 
We will do the following for feature engineering
- clean the news headlines, remove special charecters
- gather document embedding for all news headlines (all top 25), using ``SparkNLP`` pretrained model. We will use a ``SparkNLP pipeline`` for this.
- use Azure Cognitive services to gather overall sentiment of news headlines, using ``SynapseML``. We will do this for top 2 headlines for each day (in the interest of saving time)

Define feature engineering functions:


In [5]:
from synapse.ml.cognitive import *
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import explode, col, when
from pyspark.ml.feature import VectorAssembler

# function for cleaning the data
def clean_data(col_name):
    regexp = "[^a-zA-Z]"
    return regexp_replace(col_name, regexp, " ")

# function for document embedding a column in a dataframe
def doc_embedding(col_name):
    documentAssembler = DocumentAssembler() \
        .setInputCol(col_name) \
        .setOutputCol("document")

    tokenizer = Tokenizer() \
        .setInputCols(["document"]) \
        .setOutputCol("token")

    embeddings = Doc2VecModel.pretrained() \
        .setInputCols(["token"]) \
        .setOutputCol("embeddings")
 
    embeddingsFinisher = EmbeddingsFinisher() \
        .setInputCols(["embeddings"]) \
        .setOutputCols("finished_embeddings") \
        .setOutputAsVector(True)  \
        .setCleanAnnotations(False)

    pipeline = Pipeline().setStages([
        documentAssembler,
        tokenizer,
        embeddings,
        embeddingsFinisher
    ])
    return pipeline



# get sentiments of headlines using Azure Cognitive Services (Text Analytics API)
def sentiment_analysis(dataframe, col_name):
  sentiment = (TextSentiment(concurrency=40)
              .setSubscriptionKey(cog_service_subscription_key)
              .setLocation(cog_service_location)
              .setTextCol(col_name)
              .setOutputCol("sentiment")
              .setErrorCol("error"))

  return sentiment.transform(dataframe)


StatementMeta(smallspark02, 9, 4, Finished, Available)

Apply feature engineering functions on the relevant columns of dataframe, drop un-necessary columns when done. 

As a future exercise, study the code below and think of a more efficient way of doing this:

In [6]:
# Aplly clean up of news headlines
for col_name in df_stocknews.columns:
    # do not clean date and label columns
    if not col_name in ["Date", "Label"]:
        df_stocknews = df_stocknews.withColumn(col_name, clean_data(col_name))


# Gather document vectors for all top 25 news headlines for each day
for col_name in df_stocknews.columns:
    # exclude date and label columns
    if col_name.startswith("Top"):
        pipeline = doc_embedding(col_name)
        df_stocknews = pipeline.fit(df_stocknews).transform(df_stocknews)
        emb_col_name = "emb_"+col_name
        df_stocknews = df_stocknews.select("*", explode(df_stocknews.finished_embeddings).alias(emb_col_name))

# clean up the un-needed columns from the pipeline
df_stocknews = df_stocknews.drop("document", "token", "embeddings", "finished_embeddings")

# Gather news sentiment for all top 2 news headlines for each day
for col_name in df_stocknews.columns:
    # exclude date and label columns
    # if col_name.startswith("Top") :
    if col_name in ["Top1", "Top2"] :
        df_stocknews = sentiment_analysis(df_stocknews, col_name)
        sent_col_name = "sent_"+col_name
        df_stocknews = df_stocknews.select("*", col("sentiment").getItem(0).getItem("sentiment").alias("temp_sentiment"))
        df_stocknews= df_stocknews.withColumn(sent_col_name, when(df_stocknews.temp_sentiment == "positive",1.0).when(df_stocknews.temp_sentiment == "negative",2.0).otherwise(3.0))
        df_stocknews = df_stocknews.drop("temp_sentiment")

# create one feature vector from sentiment of all top healines
assembler = VectorAssembler(inputCols=[x for x in df_stocknews.columns if x.startswith("sent_")], outputCol="feature_sentiment")
df_stocknews = assembler.transform(df_stocknews)
df_stocknews = df_stocknews.drop("error", "sentiment", *[x for x in df_stocknews.columns if x.startswith("sent_")])

StatementMeta(smallspark02, 9, 5, Finished, Available)

doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[OK!]
doc2vec_gigaword_300 download started

### Split data into training and testing subset(s)

In [7]:
df_train = df_stocknews[df_stocknews['Date'] < '20150101']
df_test = df_stocknews[df_stocknews['Date'] > '20141231']

StatementMeta(smallspark02, 9, 6, Finished, Available)

### Configure MLFlow to track experiments on Azure Machine Learning 

we defined the Azure Machine Learning workspace name, resource group before.

In [8]:
import mlflow
from azureml.core import Workspace

ws = Workspace(workspace_name = workspace_name,
              subscription_id = subscription_id,
              resource_group = resource_group)
ws.write_config()   

mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())

StatementMeta(smallspark02, 9, 7, Finished, Available)

### Train a Random Forest Classifier using ``SynapseML``, track using Azure Machine Learning and finally test the trained model 

(Exploration task) After completion go to Azure machine learning studio and navigate to Experiments tab (from navigation pane on the left) and look at the ``synapse_stockNLPSynapseML`` experiment. Navigate inside to check your runs, 
look at metrices and params that are logged by MLFlow, check the model that is saved after training. 

(Tip) Go grab a coffee, this can take some time..

In [9]:
from synapse.ml.automl import *
from synapse.ml.train import *
from pyspark.ml.classification import RandomForestClassifier
from synapse.ml.train import ComputeModelStatistics

df_train_prep = df_train.select([x for x in df_train.columns if x.startswith("emb_") or x in ["Label", "feature_sentiment"]])
df_test_prep = df_test.select([x for x in df_test.columns if x.startswith("emb_") or x in ["Label", "feature_sentiment"]])

experiment_name = 'synapse_stockNLPSynapseML'
mlflow.set_experiment(experiment_name)

params = {"max_depth": 20, "min_info_gain": 0.0, "min_instance_per_node": 1, "num_trees": 30, "subsampling_rate": 1.0, "seed": 0}

with mlflow.start_run():
    mlflow.log_params(params)
    # train a random forest classifier
    randomForestClassifier = (TrainClassifier()
        .setModel(RandomForestClassifier()
            .setMaxDepth(params.get("max_depth"))
            .setMinInfoGain(params.get("min_info_gain"))
            .setMinInstancesPerNode(params.get("min_instance_per_node"))
            .setNumTrees(params.get("num_trees"))
            .setSubsamplingRate(params.get("subsampling_rate"))
            .setSeed(params.get("seed")))
            .setLabelCol("Label"))
    model = randomForestClassifier.fit(df_train_prep)
    mlflow.spark.log_model(model, "RandomForest")
    
    # predict and log metric
    prediction = model.transform(df_test_prep)
    metrics = ComputeModelStatistics().transform(prediction)
    metrics.select('AUC').show()
    mlflow.log_metrics({"AUC": metrics.select('AUC').collect()[0].AUC})

StatementMeta(smallspark02, 9, 8, Finished, Available)

+------------------+
|               AUC|
+------------------+
|0.9324876792114695|
+------------------+

### Prepare the predictions on test dataset for visualisation

In [10]:
bi_df = df_stocknews[df_stocknews['Date'] > '20141231']
bi_df = bi_df.drop(*["Label", "feature_sentiment"])
bi_df = bi_df.join(prediction, bi_df.emb_Top1 ==  prediction.emb_Top1,"inner")
bi_df = bi_df.drop(*[x for x in bi_df.columns if x.startswith("emb")])
bi_df


StatementMeta(smallspark02, 9, 9, Finished, Available)

DataFrame[Date: string, Top1: string, Top2: string, Top3: string, Top4: string, Top5: string, Top6: string, Top7: string, Top8: string, Top9: string, Top10: string, Top11: string, Top12: string, Top13: string, Top14: string, Top15: string, Top16: string, Top17: string, Top18: string, Top19: string, Top20: string, Top21: string, Top22: string, Top23: string, Top24: string, Top25: string, Label: bigint, feature_sentiment: vector, scores: vector, scored_probabilities: vector, scored_labels: double]

### Display the dataframe

In [11]:
display(bi_df)


StatementMeta(smallspark02, 9, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, b67856c2-35dd-45c8-b6a0-0d93f637c713)

In [12]:
bi_df.createOrReplaceTempView("biview")

StatementMeta(smallspark02, 9, 11, Finished, Available)

### Create a SPARK database and store the prediction dataframe as a table

In [None]:
# bi_df.createOrReplaceTempView("biview")
#load the cleaned data to a spark database 
try:
   spark.sql("CREATE DATABASE stockdb")
except:
   print("Database already exists")
bi_df.write.mode("overwrite").saveAsTable("stockdb.newspred")

StatementMeta(, , , Cancelled, )

### Read back from the SPARK database and write as a prediction table to the SQL datawarehouse

Please configure the following variables as per your environment:
- `dedicatedSQLEndpoint`: The dedicated sql endpoint string (get this from the properties in Azure portal of your Synapse)
- `dbName`: the database name in dedicated SQL pool, where you have permissions
- `dbSchemaName`: the schema to use when creating the table
- `tableName`: the table name to write to the SQl database

make sure that your user has sufficients writes to create tables in the dedicated SQL pool (you must atleast have the `db_exporter role`).

Here are some helpful commands, execute the ones you need (if you need)
```sql
-- if you need to create a LOGIN
-- CREATE LOGIN [<someone@somewhere.com>] FROM EXTERNAL PROVIDER;
-- If you need to create a user
CREATE USER [<someone@somewhere.com>] FROM EXTERNAL PROVIDER;
-- Grant the DB exporter role
EXEC sp_addrolemember 'db_exporter',[<someone@somewhere.com>]
-- If you need to grant a db owner role
-- EXEC sp_addrolemember 'db_owner',[<someone@somewhere.com>m]
```

In [20]:
%%spark
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

var df = spark.sqlContext.sql("SELECT * FROM `stockdb`.`newspred`")
df = df.drop("feature_sentiment", "scores", "scored_probabilities")

val dedicatedSQLEndpoint = "synapse-ws-hlthml.sql.azuresynapse.net"
val dbName = "healthmlsql01"
val dbSchemaName = "dbo"
val tableName = "newspred"

df.write.option(Constants.SERVER, s"$dedicatedSQLEndpoint").synapsesql(s"$dbName.$dbSchemaName.$tableName")

StatementMeta(smallspark02, 9, 20, Finished, Available)

import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
df: org.apache.spark.sql.DataFrame = [Date: string, Top1: string ... 29 more fields]
df: org.apache.spark.sql.DataFrame = [Date: string, Top1: string ... 26 more fields]
dedicatedSQLEndpoint: String = synapse-ws-hlthml.sql.azuresynapse.net
dbName: String = healthmlsql01
dbSchemaName: String = dbo
tableName: String = newspred
