<div  align='center'><img src='https://s3.amazonaws.com/weclouddata/images/logos/wcd_logo_new_2.png' width='15%'></div >
<p style="font-size:30px;text-align:center"><b>Lab Model training</b></p>
<p style="font-size:20px;text-align:center"><b><font color='#F39A54'>Data Engineering Diploma</font></b></p>
<p style="font-size:15px;text-align:center">Content developed by: WeCloudData Academy</p>

In this Lab, we are going to use the ML model we trained previously to make sentiment analysis to conclude the topic of each post, and summarize which topics are the most popular topics. The business reason why we train this model is because we are going to create a predict model to give a post a topic immediately when it was created. In this way, the post will be categorized to the right place for other users.

Before you start this lab, please finish the **[Workshop] Mount Azure Storage to Azure Databricks** first, it is the prerequisite of the lab. 

In the model training process, you are going to read the posts those come from the `Posts` files we ingest everyday from the public `posts_today` container. 

The `Posts` files are in parquet format. If you recall our previous job in data factory, they are recieved everyday and stored in a folder (in this example, we call it `Landing`). And in the `Landing` folder, the posts parquet files are saved in the sub-folder, in this example we call it `Posts`.

<img src='https://s3.amazonaws.com/weclouddata/images/data_engineer/ml_prd1.jpg' width='70%'>

In the databricks, it is mounted to '/mnt/deBDProject'

In [0]:
display(dbutils.fs.ls("/mnt/deBDProject/Landing/Posts"))

path,name,size,modificationTime
dbfs:/mnt/deBDProject/Landing/Posts/part-00000-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00000-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,24045,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00001-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00001-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,28084,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00002-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00002-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,8943,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00003-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00003-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,7878,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00004-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00004-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,11480,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00005-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00005-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,16962,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00006-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00006-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,10579,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00007-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00007-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,7855,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00008-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00008-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,11209,1692303253000
dbfs:/mnt/deBDProject/Landing/Posts/part-00009-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,part-00009-ce256a14-5edb-42c2-9936-39585c708fff-c000.parquet,13490,1692303253000


## 1. Preparation

#### 1.1 Spark preparation

In [0]:
# import necessary libaries
from pyspark.sql.functions import *

In [0]:
# Creating Spark Session
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("ML Model")
         .getOrCreate())

sc = spark.sparkContext

#### 1.2 Prepare a UDF (User Defined Function)

We need to create UDF to embed the ML model we trained in the previous workshop. This model will be used for Posts data sentiment analysis.

In [0]:
# User defined function
def predictions_udf(df, ml_model, stringindexer):
    from pyspark.sql.functions import col, regexp_replace, lower, trim
    from pyspark.ml import PipelineModel

    # Filter out empty body text
    df = df.filter("Body is not null")
    # Making sure the naming of the columns are consistent with the model
    df = df.select(col("Body").alias("text"), col("Tags"))
    # Preprocessing of the feature column
    cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 

    # Load in the saved pipeline model
    model = PipelineModel.load(ml_model)

    # Making the prediction
    prediction = model.transform(df)

    predicted = prediction.select(col('text'), col('Tags'), col('prediction'))

    # Decoding the indexer
    from pyspark.ml.feature import StringIndexerModel, IndexToString

    # Load in the StringIndexer that was saved
    indexer = StringIndexerModel.load(stringindexer)

    # Initialize the IndexToString converter
    i2s = IndexToString(inputCol = 'prediction', outputCol = 'decoded', labels = indexer.labels)
    converted = i2s.transform(predicted)

    # Display the important columns
    return converted

#### 1.3 Load Posts files and ML model

If you can remember our last workshop for machine learning model training, our trained model was saved to `/mnt/deBDProject/model`. Yours name might be different.

In [0]:
display(dbutils.fs.ls("/mnt/deBDProject/model"))

path,name,size,modificationTime
dbfs:/mnt/deBDProject/model/metadata/,metadata/,0,1695932277000
dbfs:/mnt/deBDProject/model/stages/,stages/,0,1695932278000


Let's load the Posts files and the ml model

In [0]:
posts = spark.read.parquet("/mnt/deBDProject/Landing/Posts/*")
ml_model = "/mnt/deBDProject/model"
stringindexer = "/mnt/deBDProject/stringindexer"

#### 1.4 Run model to do `Sentiment Analysis`

In [0]:
# # Producing the sentiment analysis
result = predictions_udf(posts,ml_model, stringindexer)
display(result)

text,Tags,prediction,decoded
"""Take a look at Camera Programming Topics for iOS. """,,59.0,hibernate
"Doc was developed using the InterViews UI toolkit. I believe that doc source is part of the InterViews distribution. Doc was used to typeset Paul's thesis. (Paul Calder was my lecturer at Flinders University) If you look at the InterViews code you might be surprised. It was developed before modern C++ existed. For example, there are no templates. And there are no comments in the code. To my understanding, Lexi never existed. It was created as an example for the book by GoF.",,754.0,nlp
"For debugging purposes, how do I echo the query generated by PDO so I can see what it is about the execute?",,0.0,c#
"""There is also another project which may be worth looking at by Rob Reynolds; RoundHousE http://code.google.com/p/roundhouse/ The wiki is at https://github.com/chucknorris/roundhouse/wiki """,,20.0,c
"I've built a function which will prepare SQL statement and execute it with given parameters. So here how it looks like: function go($statement) { $q = self::$connection->prepare($statement, array(PDO::ATTR_CURSOR => PDO::CURSOR_FWDONLY)); for($i = 1; $i < func_num_args(); $i++) { $arg_to_pass = func_get_arg($i); $q->bindParam($i, $arg_to_pass, PDO::PARAM_INT); } $q->execute(); } But when I call it, it gives me the following error: Fatal error: Uncaught exception 'PDOException' with message 'SQLSTATE[42000]: Syntax error or access violation: 1064 You have an error in your SQL syntax; However, this two variants are working perfectly: function go($statement) { $q = self::$connection->prepare($statement, array(PDO::ATTR_CURSOR => PDO::CURSOR_FWDONLY)); for($i = 1; $i < func_num_args(); $i++) { $q->bindValue($i, func_get_arg($i), PDO::PARAM_INT); } $q->execute(); } (This one is stupid, but just for test) function go($statement) { $q = self::$connection->prepare($statement, array(PDO::ATTR_CURSOR => PDO::CURSOR_FWDONLY)); $arg_to_pass = func_get_arg(1); $q->bindParam(1, $arg_to_pass, PDO::PARAM_INT); $arg_to_pass2 = func_get_arg(2); $q->bindParam(2, $arg_to_pass2, PDO::PARAM_INT); $q->execute(); } So why bindParam doesn't work inside a loop?",,700.0,map-files
"The problem is that the databinding has not yet completed inside your constructur so any changes to the grid are removed (I'm not actually 100% sure why they are removed, since the rows and cells are there, but this is how it works). The correct place to put this sort of formatting is within the DataBindingComplete event handler - that event is raised after databinding has finished but before the grid is drawn. public Report1(DataSet dsReport1, string sDateRep) { InitializeComponent(); sDate = sDateRep; dsReportGrid = dsReport1; orgDataset(); dataGridView1.DataSource = dsReportGrid.Tables[0]; Controls.Add(dataGridView1); dataGridView1.Visible = true; dataGridView1.DataBindingComplete += dataGridView1_DataBindingComplete; } void dataGridView1_DataBindingComplete(object sender, DataGridViewBindingCompleteEventArgs e) { dataGridView1.Rows[2].Cells[1].Style.ForeColor = Color.Red; }",,16.0,wpf
"I have changed the deployment provider in the manifest of my WinForms application. It is a signed manifest. I also incremented the pubish version of the manifest. However it seems that I have to uninstall the application on the client and re-install it, because it doesn't pick up the update automatically like normally. Is this something that you would have expected or is this behavior incorrect? As it turned out only a reinstall was needed, not an uninstall.",,0.0,c#
"For example, here are two ways to set an integer variable (say C++): int x = 0xFF; int y = 255; Which statement would compile faster to set the actual bits to the integer value? Edited: *compile changed from execute. I assumed the conversion to binary was at execution time, but seems to be at compile time given @muntoo's answer",,20.0,c
"While learning iPhone programming, every Xcode template I've seen includes an AppName-Prefix.pch file with the following contents: #ifdef __OBJC__ #import <Foundation/Foundation.h> #import <UIKit/UIKit.h> #endif My understanding is that this file's contents are prefixed to each of the source code files before compilation. Yet each of the other files also imports UIKit, which seems superfluous. For example, main.m begins... #import <UIKit/UIKit.h> int main(int argc, char *argv[]) { ... Cocoa applications in Mac OS X do the same thing, importing Cocoa.h in both the prefix file and the header files. Why have both? I removed the #import directives from all of the source files except the prefix file, and it compiled and ran correctly.",,1.0,java
"""You may want to check NerdDinner which is done by high profile programmers and there is a step-by-step book as well. It also uses patterns such as Repository, ViewModel, DataValidation, etc. Source code: http://nerddinner.codeplex.com/  Free book: http://www.wrox.com/WileyCDA/Section/id-321793.html There is a branch for ASP.Net MVC 3 in the source code of NerdDinner. :) """,,12.0,asp.net


#### 1.5 Summarize which topics are the most popular

In [0]:
# change the column name 
topics = result.withColumnRenamed('decoded', 'topic').select('topic')

# Aggregate the topics and calculate the total qty of each topic
topic_qty = topics.groupBy(col("topic")).agg(count('topic').alias('qty')).orderBy(desc('qty'))
topic_qty.show()

+-----------+---+
|      topic|qty|
+-----------+---+
|         c#|396|
|       java|225|
|  hibernate|173|
| javascript|143|
|     jquery|123|
|        php|116|
|    android| 91|
|     python| 78|
|        c++| 69|
|     iphone| 56|
|objective-c| 51|
|      mysql| 48|
|    asp.net| 39|
|          c| 34|
|        wpf| 31|
|       ruby| 26|
|       .net| 24|
|        css| 23|
| sql-server| 23|
|        ios| 23|
+-----------+---+
only showing top 20 rows



#### 1.6 Save the result file to the `BI` folder


Since spark is a distribution system, if you don't anything, the file you saved will be a folder with a couple of files. The files in the folder will be like this.
<img src='https://s3.amazonaws.com/weclouddata/images/data_engineer/ml_prd2.jpg' width='40%'>

In order to save a single file, we need a function to move the csv file move out of the folder, rename it, and delete the folder. leave the single csv file alone. 

In [0]:
# define this function

def crt_sgl_file(result_path):
        # write the result to a folder container several files
        path = "/mnt/deBDProject/BI/ml_result"
        topic_qty.write.option("delimiter", ",").option("header", "true").mode("overwrite").csv(path)

        # list the folder, find the csv file 
        filenames = dbutils.fs.ls(path)
        name = ''
        for filename in filenames:
            if filename.name.endswith('csv'):
                org_name = filename.name

        # copy the csv file to the path you want to save, in this example, we use  "/mnt/deBDProject/BI/ml_result.csv"
        dbutils.fs.cp(path + '/'+ org_name, result_path)

        # delete the folder
        dbutils.fs.rm(path, True)

        print('single file created')

In [0]:
# run the function
result_path = "/mnt/deBDProject/BI/ml_result.csv"

crt_sgl_file(result_path)

single file created
