 <hr />
 Download the Dataset from the Github or from the link provided below.
 <hr />

https://drive.google.com/file/d/1-TfybHeSWpP9UJLxeNDIFT1CPfxn7XWT/view

 <hr />
 Before starting with the notebook ensure pyspark is installed and working. To install and to find the spark use pip install as shown in the below cells.
<hr />

In [1]:
import findspark

<hr />
The following command adds the pyspark to sys.path at runtime. If the pyspark is not on the system path by default. It also prints the path of the spark.
<hr />

In [2]:
print(findspark.find())
findspark.init()

/opt/spark


<hr />
Create a Spark Session
<hr />

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Pipeline") \
    .master('local[2]') \
    .getOrCreate()

22/11/29 15:09:13 WARN Utils: Your hostname, siyad-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/11/29 15:09:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/11/29 15:09:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/29 15:09:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


<hr />
Read the dataset into a dataframe.
<hr />

In [5]:
df = spark.read.csv("/home/siyad/my_ML_Projects/hpe_inter/Data/udemy_dataset.csv",header=True,inferSchema=True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

<hr />
Display the dataset.
<hr />

In [6]:
df.show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------
 _c0                 | 0                                                                                 
 course_id           | 1070968                                                                           
 course_title        | Ultimate Investment Banking Course                                                
 url                 | https://www.udemy.com/ultimate-investment-banking-course/                         
 is_paid             | True                                                                              
 price               | 200                                                                               
 num_subscribers     | 2147                                                                              
 num_reviews         | 23                                                                                
 num_lectures        | 51                     

22/11/29 15:12:38 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , course_id, course_title, url, is_paid, price, num_subscribers, num_reviews, num_lectures, level, content_duration, published_timestamp, subject, clean_course_title
 Schema: _c0, course_id, course_title, url, is_paid, price, num_subscribers, num_reviews, num_lectures, level, content_duration, published_timestamp, subject, clean_course_title
Expected: _c0 but found: 
CSV file: file:///home/siyad/my_ML_Projects/hpe_inter/Data/udemy_dataset.csv


<hr />
Select the required input columns used for prediction.
<hr />

In [7]:
df = df.select('course_title','subject')
df.show(truncate=False)

+------------------------------------------------------------+----------------+
|course_title                                                |subject         |
+------------------------------------------------------------+----------------+
|Ultimate Investment Banking Course                          |Business Finance|
|Complete GST Course & Certification - Grow Your CA Practice |Business Finance|
|Financial Modeling for Business Analysts and Consultants    |Business Finance|
|Beginner to Pro - Financial Analysis in Excel 2017          |Business Finance|
|How To Maximize Your Profits Trading Options                |Business Finance|
|Trading Penny Stocks: A Guide for All Levels In 2017        |Business Finance|
|Investing And Trading For Beginners: Mastering Price Charts |Business Finance|
|Trading Stock Chart Patterns For Immediate, Explosive Gains |Business Finance|
|Options Trading 3 : Advanced Stock Profit and Success Method|Business Finance|
|The Only Investment Strategy You Need F

<hr />
Determine the count of records in the dataset.
<hr />

In [None]:
df.count()

<hr />
Drop the rows with Null values.
<hr />

In [None]:
df.toPandas()['subject'].isnull().sum()
df = df.dropna(subset=('subject'))
df.count()

<hr />
Split the dataset into Training and Testing.
<hr />

In [None]:
(trainDF,testDF) = df.randomSplit((0.7,0.3),seed=42)

<hr />
Import the pyspark modules required for pre-processing the data. <br>
1. Tokenizer : To create tokens from the sentence <br>
2. StopWordsRemover : To remove the stop words in the sentence <br>
3. CountVectorizer : Extracts a vocabulary from dataset and generates a vectorized model with the count of occurance <br>
4. IDF : Compute the Inverse Document Frequency (IDF) given a dataset. <br>
5. StringIndexer : A label indexer that maps a string column of labels to an ML column of label indices. <br>
<hr />

In [None]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF
from pyspark.ml.feature import StringIndexer

<hr />
Initialzie the Estimators and Transformers.
<hr />

In [None]:
tokenizer = Tokenizer(inputCol='course_title',outputCol='mytokens')
stopwords_remover = StopWordsRemover(inputCol='mytokens',outputCol='filtered_tokens')
vectorizer = CountVectorizer(inputCol='filtered_tokens',outputCol='rawFeatures')
idf = IDF(inputCol='rawFeatures',outputCol='vectorizedFeatures')

In [None]:
labelEncoder = StringIndexer(inputCol='subject',outputCol='label')

<hr />
Import the pyspark modules required for training the model.
<hr />

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression(featuresCol='vectorizedFeatures',labelCol='label')

<hr />
Create a Pipeline.
<hr />

In [None]:
from pyspark.ml import Pipeline 

In [None]:
pipeline = Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,labelEncoder,lr])

<hr />
Call the fit function for executing the pipeline and generating the trained model.
<hr />

In [None]:
lr_model = pipeline.fit(trainDF)

<hr />
Display the Stages of the pipeline.
<hr />

In [None]:
lr_model.stages

<hr />
Use the pipeline to generate predictions for the test data.
<hr />

In [None]:
predictions = lr_model.transform(testDF.select('course_title'))

<hr />
Display the predictions.
<hr />

In [None]:
predictions.show(vertical=True)

In [None]:
predictions = lr_model.transform(testDF)
predictions.show(vertical=True)

In [None]:
spark.stop()