In [2]:

# In[1]:
# Read Text Data
from pyspark.sql import SparkSession
import pandas as pd
from pyspark import SparkContext
spark = SparkSession.builder.appName('text mining').getOrCreate()
 


In [6]:
data = spark.read.csv("farm-ads.csv", inferSchema=False, sep=',')
data = data.withColumnRenamed('_c0','label1').withColumnRenamed('_c1','text')
#data.show(truncate=False)
data.show(truncate=True)

+------+--------------------+
|label1|                text|
+------+--------------------+
| class|               words|
|    -1|ad-rheumatoid ad-...|
|    -1|ad-siemen ad-wate...|
|    -1|ad-symptom ad-mus...|
|    1 |d-animal ad-anima...|
|    -1|ad-dr ad-enrico a...|
|    -1|ad-ulcerative ad-...|
|    -1|ad-wellcentive ad...|
|    1 |d-free ad-raw ad-...|
|    -1|ad-north ad-shore...|
|    1 |d-world ad-finest...|
|    1 |d-vet ad-online a...|
|    -1|ad-gum ad-disease...|
|    1 |d-rabbit ad-guine...|
|    c |ac cd rom mpn rrp...|
|    -1|ad-colitis ad-sym...|
|    -1|ad-disease ad-sig...|
|    1 |d-pygmy ad-goat a...|
|    1 |d-feed ad-supplem...|
|    -1|ad-www ad-musclew...|
+------+--------------------+
only showing top 20 rows



In [7]:
# In[2]:
# Count number of Words in each Text

from pyspark.sql.functions import length
data = data.withColumn('length', length(data['text']))
data.show()

+------+--------------------+------+
|label1|                text|length|
+------+--------------------+------+
| class|               words|     5|
|    -1|ad-rheumatoid ad-...|   252|
|    -1|ad-siemen ad-wate...|   252|
|    -1|ad-symptom ad-mus...|   106|
|    1 |d-animal ad-anima...|    77|
|    -1|ad-dr ad-enrico a...|   158|
|    -1|ad-ulcerative ad-...|   252|
|    -1|ad-wellcentive ad...|   252|
|    1 |d-free ad-raw ad-...|  1037|
|    -1|ad-north ad-shore...|   252|
|    1 |d-world ad-finest...|  1522|
|    1 |d-vet ad-online a...|  6839|
|    -1|ad-gum ad-disease...|   252|
|    1 |d-rabbit ad-guine...| 32756|
|    c |ac cd rom mpn rrp...| 14589|
|    -1|ad-colitis ad-sym...|   252|
|    -1|ad-disease ad-sig...|   252|
|    1 |d-pygmy ad-goat a...|  3420|
|    1 |d-feed ad-supplem...|  1557|
|    -1|ad-www ad-musclew...|   252|
+------+--------------------+------+
only showing top 20 rows



In [8]:
# In[3]:
# Compare the lenght difference between ham and spam
data.groupby('label1').mean().show()
data.groupby('label1').sum().show()

+------+------------------+
|label1|       avg(length)|
+------+------------------+
|    -1|234.36697722567288|
|     a|            9447.0|
|    vi|           10376.0|
|    cl|             424.0|
|    y |            9218.0|
|    b |           12907.0|
|    t | 7220.333333333333|
|    ic|            8579.0|
|    re|           16519.0|
|    pl|            8919.0|
|    ho|            8347.0|
|    n |            1703.0|
|    ia|           30435.0|
|    c | 9920.666666666666|
|    me|           20374.0|
|    p |            7321.0|
|    pe|           28409.0|
|    ar|            8863.0|
|     e|           16367.0|
|    fe|           15327.0|
+------+------------------+
only showing top 20 rows

+------+-----------+
|label1|sum(length)|
+------+-----------+
|    -1|     452797|
|     a|       9447|
|    vi|      10376|
|    cl|        424|
|    y |       9218|
|    b |      12907|
|    t |      21661|
|    ic|       8579|
|    re|      16519|
|    pl|       8919|
|    ho|       8347|
|    n |

In [9]:
# In[4]:
# Treat TF-IDF features for each text
# TF: Term Frequency
# IDF: Inverse Document Frequency
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, VectorAssembler

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
num = StringIndexer(inputCol='label1',outputCol='label')
final_feature = VectorAssembler(inputCols=['tf_idf', 'length'],outputCol='features')

from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[num,tokenizer,stopremove,count_vec,idf,final_feature])
clean_data = data_prep_pipe.fit(data).transform(data)

clean_data.show()
clean_data.take(1)
clean_data.take(1)[0][-1]

+------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|label1|                text|length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
| class|               words|     5| 23.0|             [words]|             [words]|(43297,[33248],[1...|(43297,[33248],[7...|(43298,[33248,432...|
|    -1|ad-rheumatoid ad-...|   252|  1.0|[ad-rheumatoid, a...|[ad-rheumatoid, a...|(43297,[245,594,7...|(43297,[245,594,7...|(43298,[245,594,7...|
|    -1|ad-siemen ad-wate...|   252|  1.0|[ad-siemen, ad-wa...|[ad-siemen, ad-wa...|(43297,[43,253,35...|(43297,[43,253,35...|(43298,[43,253,35...|
|    -1|ad-symptom ad-mus...|   106|  1.0|[ad-symptom, ad-m...|[ad-symptom, ad-m...|(43297,[49,137,45...|(43297,

SparseVector(43298, {33248: 7.643, 43297: 5.0})

In [10]:
# In[5]: 
# ## Split data into training and test datasets
training, test = clean_data.randomSplit([0.6, 0.4], seed=12345)

# Build Logistic Regression Model
from pyspark.ml.classification import LogisticRegression

log_reg = LogisticRegression(featuresCol='features', labelCol='label')
logr_model = log_reg.fit(training)

results = logr_model.transform(test)
results.select('label','prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
+-----+----------+
only showing top 20 rows



In [11]:
# In[10]:
# #### Confusion Matrix
from sklearn.metrics import confusion_matrix
y_true = results.select("label")
y_true = y_true.toPandas()

y_pred = results.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print(cnf_matrix)
print("Prediction Accuracy is ", (cnf_matrix[0,0]+cnf_matrix[1,1])/sum(sum(cnf_matrix)) )

[[809  58   0   0   0   0   0   1   1   0   0   0   0   0   0   0   0]
 [ 32 777   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   1   0   0   0   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0   0   0   0   0   0   1   0   0   0]
 [  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  0   0   0   0   1   0   0   0   0   0   0   0   0   0   0   0   0]
 [  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   1]
 [  1   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0]
 [  1 