In [74]:
!pip install pyspark



In [75]:
from pyspark.sql import SparkSession

In [91]:
spark=SparkSession.builder.appName("NLP").getOrCreate()

In [94]:
title_df=spark.read.csv('/content/drive/MyDrive/Udemy Courses Dataset/Course Title.csv',
                        inferSchema=True,header=True)

In [97]:
category_df=spark.read.csv('/content/drive/MyDrive/Udemy Courses Dataset/Course Category.csv',
                           inferSchema=True,header=True)

In [98]:
feature_df=spark.read.csv('/content/drive/MyDrive/Udemy Courses Dataset/Course Features.csv',
                          inferSchema=True,header=True)

In [99]:
from pyspark.sql.functions import monotonically_increasing_id

In [101]:
DF1 = title_df.withColumn("row_id", monotonically_increasing_id())
DF2 = category_df.withColumn("row_id", monotonically_increasing_id())
result_df = DF1.join(DF2, ("row_id"))

In [102]:
DF3=feature_df.withColumn("row_id", monotonically_increasing_id())
result_df = result_df.join(DF3, ("row_id")).drop("row_id")

In [103]:
result_df.show(5)

+--------------------+---------+-------+------+---------------+----------+-----------+------------+------------+------------------+
|               title| category|is_paid| price|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|
+--------------------+---------+-------+------+---------------+----------+-----------+------------+------------+------------------+
|Online Vegan Vege...|Lifestyle|   true| 24.99|           2231|      3.75|        134|          42|          37|              1268|
|The Lean Startup ...| Business|  false|   0.0|          26474|       4.5|        709|         112|           9|                88|
|How To Become a V...|Lifestyle|   true| 19.99|           1713|       4.4|         41|          13|          14|                82|
|How to Train a Puppy|Lifestyle|   true|199.99|           4988|       4.8|        395|          88|          36|              1511|
|Web Design from t...|   Design|   true|159.99|           1266|      4.75|  

In [104]:
# NLP Preprocessing
# Break sentences into words or tokens and remove stopwords, punctuations,
# digits, special characters, etc.
# Convert to lower case
# Create a Corpus and from Corpus create a CountVectorizer(document term matrix)
# or TFIDF Vectorizer

# Multiple processes involved, Pipeline must be used

In [105]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF,StringIndexer

In [112]:
# Sequenctially define Pipeline Stages that need to be executed
tokenize=Tokenizer(inputCol='title',outputCol='wordtokens')
stopwords_remove=StopWordsRemover(inputCol='wordtokens',outputCol='cleanwords')
vectorizer=CountVectorizer(inputCol='cleanwords',outputCol='features')
idf=IDF(inputCol='features',outputCol='features_vector')
dummyencode=StringIndexer(inputCol='category',outputCol='label')

In [113]:
pipeline=Pipeline(stages=[tokenize,stopwords_remove,vectorizer,idf,dummyencode])

In [114]:
df=pipeline.fit(result_df)

In [117]:
df=df.transform(result_df)

In [118]:
df.show(5)

+--------------------+---------+-------+------+---------------+----------+-----------+------------+------------+------------------+--------------------+--------------------+--------------------+--------------------+-----+
|               title| category|is_paid| price|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|          wordtokens|          cleanwords|            features|     features_vector|label|
+--------------------+---------+-------+------+---------------+----------+-----------+------------+------------+------------------+--------------------+--------------------+--------------------+--------------------+-----+
|Online Vegan Vege...|Lifestyle|   true| 24.99|           2231|      3.75|        134|          42|          37|              1268|[online, vegan, v...|[online, vegan, v...|(102545,[39,536,6...|(102545,[39,536,6...|  8.0|
|The Lean Startup ...| Business|  false|   0.0|          26474|       4.5|        709|         112|           9|

In [119]:
(df_train,df_test)=df.randomSplit((0.7,0.3),seed=42)

In [120]:
from pyspark.ml.classification import LogisticRegression

In [121]:
logit=LogisticRegression(featuresCol="features_vector",labelCol='label')

In [122]:
logitmodel=logit.fit(df_train)

In [123]:
logitmodel.summary.accuracy

0.9622464476064267

In [129]:
logitpredict=logitmodel.transform(df_test)

In [130]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [131]:
evaluator=MulticlassClassificationEvaluator(metricName='accuracy')

In [132]:
evaluator.evaluate(logitpredict)

0.5784492255120164

In [133]:
df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  8.0| 7381|
|  0.0|23713|
|  7.0| 7984|
|  1.0|20754|
|  4.0|14726|
| 11.0| 4515|
|  3.0|17430|
|  2.0|17547|
| 10.0| 4637|
|  6.0| 8284|
|  5.0|11356|
|  9.0| 6728|
| 12.0| 2807|
+-----+-----+



In [134]:
from pyspark.ml.classification import DecisionTreeClassifier

In [135]:
tree=DecisionTreeClassifier(featuresCol='features_vector',labelCol='label')

In [136]:
treemodel=tree.fit(df_train)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored