In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# create a SparkSession
spark = SparkSession.builder.appName("ReadJSON")\
.config("spark.executor.memory", "6g") \
.master("local[*]")  \
.config("spark.driver.memory", "4g") \
.config("spark.network.timeout", "800s")\
.config("spark.executor.heartbeatInterval", "200s")\
.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC")\
.config("spark.driver.extraJavaOptions", "-XX:+UseG1GC")\
.config("spark.memory.fraction", "0.8") \
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.3.3")\
.getOrCreate()

In [2]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType, BooleanType, IntegerType, ArrayType
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.pipeline import PipelineModel

import sys
import json

In [3]:
# Define the file path
json_training_file_path = r"C:\Users\Emma\Downloads\school\Big_Data\project\finaldata\combined_train.json"

# Define chunk size in lines
chunk_size = 10 * 1024 * 1024    # 10mb

# define schema
schema = StructType(
    [
        StructField("asin", StringType(), True),
        StructField("reviewerID", StringType(), True),
        StructField("overall", StringType(), True),
        StructField("reviewText", StringType(), True)
    ]
)

In [4]:
# functions
import re
import nltk
from nltk.corpus import stopwords

targetUDF = F.udf(lambda x: 1 if float(x) >= 4.0 else 0, IntegerType())

nltk.download('stopwords')
STOPWORDS = set(stopwords.words("english"))
def remove_reg_stop(text: str) -> str:
    text = re.sub(r'[^\w]', ' ', text).lower()
    lst = text.split(' ')
    lst = list(filter(None, lst))
    lst = [word for word in lst if word not in STOPWORDS]
    str = ' '.join(lst)
    return str
cleanTextUDF = F.udf(remove_reg_stop, StringType())

# json pre-process, return df
def preProc(df):
    df = df.dropDuplicates(["reviewerID", "asin"])
    df = df.filter(df.reviewText.isNotNull())
    df = df.withColumn("label", targetUDF(df["overall"]))
    df = df.withColumn("cleanText", cleanTextUDF(df["reviewText"]))
    return df.select("reviewText", "cleanText", "label")

# tfidf, return pipeline
def get_tfidf_pipeline():
    tk = Tokenizer(inputCol= "cleanText", outputCol = "tokens")
    tf1 = HashingTF(inputCol="tokens", outputCol="rawFeatures", numFeatures=1e5)
    idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=2.0)
    lr = LogisticRegression(maxIter=20)
    return Pipeline(stages=[tk, tf1, idf, lr])

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")


[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Emma\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [5]:
# load testing data

df_test = spark.read.schema(schema).json(r"C:\Users\Emma\Downloads\school\Big_Data\project\finaldata\combined_test.json")
df_test = preProc(df_test)

In [6]:
%%time

json_lines=[]
cur_size = 0
cur_run = 0
model_path = 'bow_model'
with open(json_training_file_path, 'r') as file:
    for line in file:
        
        if (cur_size + sys.getsizeof(line)) >= chunk_size:
            print("Processing {}mb".format(cur_size/(1024*1024)))

            # pre-process
            df = spark.createDataFrame(json_lines, schema=schema)
            df = preProc(df)

            # laod and train
            if cur_run > 0:
                load_model = PipelineModel.load(model_path)
                stages_steps = load_model.stages
                model = Pipeline(stages = stages_steps).fit(df)
            else:
                model = get_tfidf_pipeline().fit(df)
            cur_run+=1

            # save
            model.write().overwrite().save(model_path)

            # get AUC score on test data
            predictions = model.transform(df_test)
            score = evaluator.evaluate(predictions)
            print("Model {} AUC SCORE: {}\n".format(cur_run, score))

            # reset and free memory
            cur_size = 0
            json_lines = []
            del df
            spark.catalog.clearCache()

        cur_size += sys.getsizeof(line)
        json_lines.append(json.loads(line))
        
    

Processing 9.999785423278809mb
Model 1 AUC SCORE: 0.6834598473054144

Processing 9.99919605255127mb
Model 2 AUC SCORE: 0.6834636207209295

Processing 9.999604225158691mb
Model 3 AUC SCORE: 0.6834650312015079

Processing 9.99938678741455mb
Model 4 AUC SCORE: 0.6834640961975539

Processing 9.999322891235352mb
Model 5 AUC SCORE: 0.6834614827648688

Processing 9.999980926513672mb
Model 6 AUC SCORE: 0.683465126709638

Processing 9.999693870544434mb
Model 7 AUC SCORE: 0.6834647022707571

Processing 9.999773025512695mb
Model 8 AUC SCORE: 0.6834614902704195

Processing 9.999876976013184mb
Model 9 AUC SCORE: 0.6834643093551881

Processing 9.999785423278809mb
Model 10 AUC SCORE: 0.6834670363093247

Processing 9.999602317810059mb
Model 11 AUC SCORE: 0.683455783049817

Processing 9.99984073638916mb
Model 12 AUC SCORE: 0.6834614369810107

Processing 9.998191833496094mb
Model 13 AUC SCORE: 0.6834575995806776

Processing 9.999659538269043mb
Model 14 AUC SCORE: 0.683465255617468

Processing 9.99989318

In [7]:
type(model)

pyspark.ml.pipeline.PipelineModel

In [9]:
predictions = model.transform(df_test)
score = evaluator.evaluate(predictions)
print("Model {} AUC SCORE: {}\n".format(cur_run, score))


Model 691 AUC SCORE: 0.6852539797955663



In [13]:
# test the model with self defined input

def createInput(lst):
    # returns a df to train from giving list of strings
    df = spark.createDataFrame([(review,) for review in lst], ["reviewText"])
    df = df.withColumn("cleanText", cleanTextUDF(df["reviewText"]))
    return df

tmp_reviews = [
    "This is a great product.",
    "I love this item!",
    "Not satisfied with the quality.", 
    "bad",
    "nice item however something wrong"
]

In [14]:
df_test2 = createInput(tmp_reviews)
pred_test = model.transform(df_test2)
pred_test.select("cleanText", "prediction").show()

+--------------------+----------+
|           cleanText|prediction|
+--------------------+----------+
|       great product|       1.0|
|           love item|       1.0|
|   satisfied quality|       1.0|
|                 bad|       0.0|
|nice item however...|       1.0|
+--------------------+----------+



In [6]:
%%time

# re run without predicting in loop

json_lines=[]
cur_size = 0
cur_run = 0
model_path = 'bow_model2'
with open(json_training_file_path, 'r') as file:
    for line in file:
        
        if (cur_size + sys.getsizeof(line)) >= chunk_size:
            print("Processing {}mb".format(cur_size/(1024*1024)))

            # pre-process
            df = spark.createDataFrame(json_lines, schema=schema)
            df = preProc(df)

            # laod and train
            if cur_run > 0:
                load_model = PipelineModel.load(model_path)
                stages_steps = load_model.stages
                model = Pipeline(stages = stages_steps).fit(df)
            else:
                model = get_tfidf_pipeline().fit(df)
            cur_run+=1

            # save
            model.write().overwrite().save(model_path)

            # get AUC score on test data
            #predictions = model.transform(df_test)
            #score = evaluator.evaluate(predictions)
            #print("Model {} AUC SCORE: {}\n".format(cur_run, score))

            # reset and free memory
            cur_size = 0
            json_lines = []
            del df
            spark.catalog.clearCache()

        cur_size += sys.getsizeof(line)
        json_lines.append(json.loads(line))

Processing 9.999756813049316mb
Processing 9.999835014343262mb
Processing 9.999883651733398mb
Processing 9.999629974365234mb
Processing 9.999536514282227mb
Processing 9.999847412109375mb
Processing 9.999750137329102mb
Processing 9.999750137329102mb
Processing 9.99991512298584mb
Processing 9.999649047851562mb
Processing 9.99925422668457mb
Processing 9.999994277954102mb
Processing 9.999865531921387mb
Processing 9.999866485595703mb
Processing 9.999281883239746mb
Processing 9.999675750732422mb
Processing 9.999894142150879mb
Processing 9.999741554260254mb
Processing 9.999774932861328mb
Processing 9.999953269958496mb
Processing 9.999385833740234mb
Processing 9.999629020690918mb
Processing 9.999855041503906mb
Processing 9.99897289276123mb
Processing 9.999502182006836mb
Processing 9.999951362609863mb
Processing 9.998663902282715mb
Processing 9.998926162719727mb
Processing 9.999608039855957mb
Processing 9.999892234802246mb
Processing 9.999893188476562mb
Processing 9.999704360961914mb
Processing 

In [6]:
# test set accuracy
model = PipelineModel.load("bow_model")
predictions = model.transform(df_test)

In [7]:
score = evaluator.evaluate(predictions)
print("AUC SCORE: {}".format(score))

AUC SCORE: 0.6832754838543467


In [22]:
predictedAndLabels = predictions.select(["prediction","label"])

In [18]:
total_count = predictedAndLabels.count()
match_count = predictedAndLabels.filter(predictedAndLabels["label"] == predictedAndLabels["prediction"]).count()
error_count = predictedAndLabels.filter(predictedAndLabels["label"] != predictedAndLabels["prediction"]).count()

In [21]:
print("total_count:{}".format(total_count))
print("match_count:{}".format(match_count))
print("error_count:{}".format(error_count))
print("test accuracy: {}".format(match_count/total_count))

total_count:2714927
match_count:2019406
error_count:695521
test accuracy: 0.7438159479057816


In [None]:
# returning connection refused error

from pyspark.mllib.evaluation import MulticlassMetrics
predictedAndLabels = predictions.select(["prediction","label"]).rdd.map(lambda r : (float(r[0]), float(r[1])))
metrics = MulticlassMetrics(predictedAndLabels)

In [None]:
confusion_matrix = metrics.confusionMatrix().toArray()

print(confusion_matrix)
import matplotlib.pyplot as plt
import seaborn as sns

# Plotting confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(confusion_matrix, annot=True, fmt='g', cmap='Blues')  # 'g' formats numbers as integers
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()