In [None]:
pip install findspark
pip install spark-nlp==4.2.2


In [1]:
import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [2]:
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext('local[2]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")



In [3]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [4]:
sc

In [5]:
spark

In [6]:
import sparknlp
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Spark NLP") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:<version>") \
    .getOrCreate()

23/05/19 19:18:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
print("Spark NLP version:", sparknlp.version())


Spark NLP version: 4.2.2


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 建立 SparkSession
spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder \
    .appName("Spark NLP Example") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.2") \
    .getOrCreate()


#

# 定義 JSON 檔案的模式
schema = StructType([
    StructField("review_id", StringType(), True),
    StructField("app_id", StringType(), True),
    StructField("review_text", StringType(), True),
    StructField("label", IntegerType(), True)
])

# 設定包含 JSON 檔案的根目錄路徑
root_path = '/Users/bijiben/Desktop/BDA/advancedanalytics/assignment3/data/*/*'

# 讀取並合併所有 JSON 檔案
df = spark.read.schema(schema).json(root_path)

# 顯示讀取結果
df.show()

23/05/19 19:18:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


                                                                                

+---------+-------+-----------------------------------+-----+
|review_id| app_id|                        review_text|label|
+---------+-------+-----------------------------------+-----+
|138086736|1742020|               Lemme get a Crunc...|    1|
|138083242|1293460|               [h1] Less of a ca...|    1|
|138125042|1321440|               Cassette Beasts i...|    1|
|138076943|1575830|               I'm really not su...|    0|
|138082413|1159690|               Voidtrain is a ne...|    1|
|137685823|1321440|               Playtime: 20 hour...|    1|
|137905799|1321440|仅代表个人观点，浅浅的打个分吧~\...|    1|
|123764026|2027560|               It's close to bar...|    1|
|138091114|1742020|               Soratomo, Robosa,...|    1|
|137542841|2176930|               https://steamcomm...|    1|
|138079463|1940340|               The good:\n- Rela...|    0|
|138082319|1940340|               Alright, so this ...|    0|
|138093108|2229260|               I played Eden Ete...|    1|
|137684311|1321440|    

In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
df.show(5)

+---------+-------+--------------------+-----+
|review_id| app_id|         review_text|label|
+---------+-------+--------------------+-----+
|138086736|1742020|Lemme get a Crunc...|    1|
|138083242|1293460|[h1] Less of a ca...|    1|
|138125042|1321440|Cassette Beasts i...|    1|
|138076943|1575830|I'm really not su...|    0|
|138082413|1159690|Voidtrain is a ne...|    1|
+---------+-------+--------------------+-----+
only showing top 5 rows



In [11]:
df = df.dropna().dropDuplicates()
df.count()

                                                                                

481

In [12]:
(train_set, val_set, test_set) = df.randomSplit([0.8, 0.1, 0.1], seed = 2000)

In [13]:
type(train_set)

pyspark.sql.dataframe.DataFrame

In [None]:
#try preprocessing with sparknlp
import sparknlp
spark= sparknlp.start()
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml.feature import (
    DocumentAssembler,
    Tokenizer,
    Normalizer,
    StopWordsCleaner,
    TokenAssembler,
    Stemmer,
    Lemmatizer
)

documentAssembler= DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")\
    .setCleanupMode("shrink")

tokenizer= Tokenizer()\
    .setInputCols(["document"])\
    .setOutputCol("token")

normalizer= Normalizer()\
    .setInputCols(["token"])\
    .setOutputCol("normalized")\
    .setLowercase(True)\
    .setCleanupPatterns(["[^\w\d\s]"])

stopwordsCleaner =StopWordsCleaner()\
    .setInputCols(["token"])\
    .setOutputCol("cleaned_tokens")\
    .setCaseSensitive(True)

tokenAssembler= TokenAssembler()\
    .setInputCols(["sentence", "cleaned_tokens"])\
    .setOutputCol("assembled")

stemmer= Stemmer()\
    .setInputCols(["token"])\
    .setOutputCol("stem")

lemmatizer= Lemmatizer()\
    .setInputCols(["token"])\
    .setOutputCol("lemma")\
    .setDictionary("AntBNC_lemmas_ver_001.txt",  
    value_delimiter="\t", key_delimiter="->")

pipeline = Pipeline(stages=[documentAssembler,
    tokenizer,
    sentenceDetector,
    normalizer,
    stopwordsCleaner,
    tokenAssembler,
    stemmer,
    lemmatizer, tokenizer, cv, idf, lr])

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[preprocess_transformer, replace_transformer, translate_transformer, tokenizer, cv, idf, lr])

TF_pipelineFit = pipeline.fit(train_set)
train_df = TF_pipelineFit.transform(train_set)
val_df = TF_pipelineFit.transform(val_set)
train_df.show(5)

from pyspark.ml.feature import CountVectorizer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import SQLTransformer
import sparknlp
spark= sparknlp.start()
from sparknlp.base import *
from sparknlp.annotator import *


tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
lr = LogisticRegression(maxIter=100, regParam=0.01,elasticNetParam=1.0)
#pipeline = Pipeline(stages=[tokenizer, cv, idf, lr])
pipeline = Pipeline(stages=[preprocess_transformer, replace_transformer, translate_transformer, tokenizer, cv, idf, lr])
#CV_pipelineFit = pipeline.fit(df).transform(df)

CV_pipelineFit = pipeline.fit(train_set)
predictions = CV_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)



In [20]:
# HashingTF + IDF + Logistic Regression
#Accuracy Score: 0.8666666666666667
#ROC-AUC: 0.8529411764705882
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
# label_stringIdx = StringIndexer(inputCol = "label", outputCol = "target_label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf])

TF_pipelineFit = pipeline.fit(train_set)
train_df = TF_pipelineFit.transform(train_set)
val_df = TF_pipelineFit.transform(val_set)
train_df.show(5)



23/05/19 21:11:11 WARN DAGScheduler: Broadcasting large task binary with size 1092.9 KiB
+---------+-------+--------------------+-----+--------------------+--------------------+--------------------+
|review_id| app_id|         review_text|label|               words|                  tf|            features|
+---------+-------+--------------------+-----+--------------------+--------------------+--------------------+
|123764026|2027560|It's close to bar...|    1|[it's, close, to,...|(65536,[104,632,8...|(65536,[104,632,8...|
|123779352|2027560|If you're into bu...|    1|[if, you're, into...|(65536,[1696,1880...|(65536,[1696,1880...|
|123780055|2027560|Took the game bec...|    1|[took, the, game,...|(65536,[110,495,1...|(65536,[110,495,1...|
|123780785|2027560|Cute game with re...|    1|[cute, game, with...|(65536,[3584,7823...|(65536,[3584,7823...|
|123821893|2027560|This game is less...|    1|[this, game, is, ...|(65536,[1540,1880...|(65536,[1540,1880...|
+---------+-------+------------


                                                                                

23/05/19 22:00:08 WARN TransportChannelHandler: Exception in connection from /10.44.49.91:52330
java.io.IOException: Operation timed out
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptim

In [15]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)



23/05/19 19:20:19 WARN DAGScheduler: Broadcasting large task binary with size 1095.5 KiB



[Stage 18:>                                                         (0 + 1) / 1]

                                                                                

23/05/19 19:20:20 WARN DAGScheduler: Broadcasting large task binary with size 1097.2 KiB
23/05/19 19:20:20 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/19 19:20:20 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/05/19 19:20:20 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/19 19:20:20 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/05/19 19:20:20 WARN DAGScheduler: Broadcasting large task binary with size 1097.2 KiB
23/05/19 19:20:20 WARN DAGScheduler: Broadcasting large task binary with size 1097.2 KiB
23/05/19 19:20:20 WARN DAGScheduler: Broadcasting large task binary with size 1097.2 KiB
23/05/19 19:20:21 WARN DAGScheduler: Broadcasting large task binary with size 1097.2 KiB
23/05/19 19:20:21 WARN DAGScheduler: Broadcasting large task binary with size 1097.2 KiB
23/05/19 



23/05/19 19:20:30 WARN DAGScheduler: Broadcasting large task binary with size 1116.1 KiB



                                                                                

0.8666666666666667

In [None]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

In [None]:
## CountVectorizer + IDF + Logistic Regression
##crossvalidation to select hyperparamter
#Accuracy Score: 0.8529
#ROC-AUC: 0.8917
# %%time

tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
lr = Logfrom pyspark.ml.feature import CountVectorizer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
isticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, lr])

# 定義參數網格用於交叉驗證
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# 建立交叉驗證器
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

# 使用訓練數據訓練模型
CV_pipelineFit = crossval.fit(train_set)


predictions = CV_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print ("Accuracy Score: {0:.4f}".format(accuracy))
print ("ROC-AUC: {0:.4f}".format(roc_auc))

In [None]:
#之前成功没有前处理的代码
# CountVectorizer + IDF + Logistic Regression
#Accuracy Score: 0.8529
#ROC-AUC: 0.8917
# %%time
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import SQLTransformer
import sparknlp
spark= sparknlp.start()
from sparknlp.base import *
from sparknlp.annotator import *


tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
lr = LogisticRegression(maxIter=100, regParam=0.01,elasticNetParam=1.0)
#pipeline = Pipeline(stages=[tokenizer, cv, idf, lr])
pipeline = Pipeline(stages=[tokenizer, cv, idf, lr])
#CV_pipelineFit = pipeline.fit(df).transform(df)

CV_pipelineFit = pipeline.fit(train_set)
predictions = CV_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)


print ("Accuracy Score: {0:.4f}".format(accuracy))

In [None]:
#Best Parameters:
#regParam: 0.01
#elasticNetParam: 1.0
bestModel = CV_pipelineFit.bestModel
bestLRModel = bestModel.stages[-1]  # 取得最佳的邏輯回歸模型

print("Best Parameters:")
print("regParam:", bestLRModel.getRegParam())
print("elasticNetParam:", bestLRModel.getElasticNetParam())

In [None]:
#Accuracy Score: 0.7407
#ROC-AUC: 0.7568
test_df = TF_pipelineFit.transform(test_set)
test_predictions = lrModel.transform(test_df)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(test_predictions)
# print accuracy, roc_auc
print ("Accuracy Score: {0:.4f}".format(test_accuracy))
print ("ROC-AUC: {0:.4f}".format(test_roc_auc))

In [None]:
## CountVectorizer + IDF + Logistic Regression

#Accuracy Score: 0.7963
#ROC-AUC: 0.7591
test_predictions = CV_pipelineFit.transform(test_set)
accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
roc_auc = evaluator.evaluate(test_predictions)

print ("Accuracy Score: {0:.4f}".format(accuracy))
print ("ROC-AUC: {0:.4f}".format(roc_auc))

In [None]:
train_set.groupBy("label").count().show()

In [None]:
#save the model
CV_pipelineFit.save("CV_pipelineFit.model")
