In [1]:
#!pip install nb_black
#%load_ext nb_black

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import SparkContext, SparkConf

spark = SparkSession.builder.getOrCreate()
conf = SparkConf().setAppName('Spark Lab2')
sc = spark.sparkContext # conf=conf)


In [2]:
spark
sc

In [4]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql import functions as F

#LOAD

df = spark.read.load("gs://colmbo_bdpp_bucket1/datasetB1.csv", format="csv", sep="|", header="true")


df.printSchema()
df.show(2)
df.count()
df = df.select("text", "label")

# SPLIT TRAIN/TEST

train, test = df.randomSplit([0.7, 0.3], seed=8984)

print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

train.first()

# CLEAN


# drop null values
train.dropna()
test.dropna()

print(f"Train set length: {train.count()} records")


# drop numbers
train.withColumn("text", F.regexp_replace(F.col("text"), "\d+", ""))

#train.withColumn("text", F.regexp_replace(F.col("text"), "\!+", ""))
train.select('text').replace(".", "")

train.head(2)

root
 |-- text: string (nullable = true)
 |-- label: string (nullable = true)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|Story of a man wh...|    0|
|Airport 77 starts...|    0|
+--------------------+-----+
only showing top 2 rows

Train set length: 34932 records
Test set length: 15070 records
Train set length: 34932 records


[Row(text='     With their no holds bar cruel offensive humor sure enough to offended anyone you would sure think this would be a laugh riot! wrong Worest movie since Open water Dont be to surprised if you completely miss this movie upon release date as Im sure it wont do very good at all at the box office This movie had a lot of Potential but fell to little to short No enough character development awkward actors and The upside of this movie was nudity Boobs Amazing If I had to see this movie again I myself would go POSTAl    ', label='0'),
 Row(text='   And thats a bad thing because at least if this had been a Troma film it would have had wanton violence and a greater sense of anarchic abandon that might have brought my rating up a bitSo what we have instead is a very tame (rated PG) barely lukewarm low budget (Roger Corman produced it with an unknown director who has subsequently remained unknown) Gremlins (1984)Critters (1986)-wannabe with almost exclusively flat humor little of the

### TRAIN

In [5]:
# pipeline
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import (
    Tokenizer,
    RegexTokenizer,
    StopWordsRemover,
    CountVectorizer,
    IDF,
    StringIndexer,
    StandardScaler,
)

from pyspark.ml.classification import LogisticRegression, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib import linalg as mllib_linalg
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col

import time



start = time.time()


regexTokenizer = RegexTokenizer(
    inputCol="text", outputCol="words", pattern="\\W", minTokenLength=2
)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
StopWordsRemover.loadDefaultStopWords("english")

cv = CountVectorizer(
    inputCol="filtered", outputCol="counts", minDF=2.0 , vocabSize=1500)

idf = IDF(
    inputCol="counts", outputCol="features", minDocFreq=10
)  # minDocFreq: remove sparse terms

label_stringIdx = StringIndexer(inputCol="label", outputCol="index")
label_stringIdx.setHandleInvalid("skip")

#scaler = StandardScaler(
   # inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True
#)

pipeline = Pipeline(
    stages=[regexTokenizer, remover, cv, idf, label_stringIdx]
)  #


pipelineFit = pipeline.fit(train)


# CREATE PARTITION
pre_processed = pipelineFit.transform(train).select(["features", "index"])

#pre_processed.printSchema()


# create RDD of Label points
rdd_label_point = pre_processed.rdd.map(lambda line:LabeledPoint(line[1],[line[0]]))

#rdd_label_point.first()



# Build the model SVM
modelSVM = SVMWithSGD.train(rdd_label_point, iterations=10,regParam=0.1)

#modelSVM.weights

latency = time.time() - start

print(latency, 'sec')
print("DEFAULT :",rdd_label_point.getNumPartitions(), " partitions", rdd_label_point.partitioner)

542.6542656421661 sec
DEFAULT : 4  partitions None


### TEST

In [6]:
start = time.time()

# Evaluating the model on test data

pre_processed_test = pipelineFit.transform(test).select(["features", "index"])

# create RDD of Label points

lpoint = pre_processed_test.rdd.map(lambda row: LabeledPoint(row["index"], mllib_linalg.DenseVector(row["features"])))

labelsAndPreds = lpoint.map(lambda p: (p.label, modelSVM.predict(p.features)))

latency = time.time() - start

print(latency, 'sec')

#labelsAndPreds.take(10)

0.31357240676879883 sec


### EVALUATION

In [43]:
testAccuracy = labelsAndPreds.filter(lambda lp: lp[0] == lp[1]).count() / float(lpoint.count())
print("Test Accuracy = " + str(100*testAccuracy) + "%")

Test Accuracy = 81.43038059054855%
