In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Spark ML 2 quiz") \
        .config("spark.ui.port", "4050") \
        .master("local[8]") \
        .config("spark.driver.memory","10G")\
        .getOrCreate()

sc = spark.sparkContext

22/10/27 17:34:15 WARN Utils: Your hostname, orange resolves to a loopback address: 127.0.1.1; using 166.104.246.51 instead (on interface enp15s0)
22/10/27 17:34:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/27 17:34:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/27 17:34:17 WARN Utils: Service 'SparkUI' could not bind on port 4050. Attempting port 4051.


In [20]:
from pyspark.sql.types import *

schema = StructType([
    StructField('text', StringType(), True),
    StructField('label', IntegerType(), True),
])

df = spark.read.format('csv')\
               .option('header', 'true')\
               .option("quote", "\"")\
               .option("escape", "\"")\
               .schema(schema)\
               .load('../../data/imdb-review-sentiment.csv')
df.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|I grew up (b. 196...|    0|
|When I put this m...|    0|
|Why do people who...|    0|
|Even though I hav...|    0|
|Im a die hard Dad...|    1|
+--------------------+-----+
only showing top 5 rows



                                                                                

[Stage 42:>                                                         (0 + 1) / 1]

In [21]:
df.summary().show()

[Stage 42:>                                                         (0 + 1) / 1]

+-------+--------------------+------------------+
|summary|                text|             label|
+-------+--------------------+------------------+
|  count|               40000|             40000|
|   mean|                null|          0.499525|
| stddev|                null|0.5000060244893185|
|    min|!!!! MILD SPOILER...|                 0|
|    25%|                null|                 0|
|    50%|                null|                 0|
|    75%|                null|                 1|
|    max|ý thýnk uzak ýs t...|                 1|
+-------+--------------------+------------------+



                                                                                

[Stage 42:>                                                         (0 + 1) / 1]

In [22]:
df.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [23]:
def make_df():  
    schema = StructType([
        StructField('text', StringType(), True),
        StructField('label', IntegerType(), True),
    ])

    df = spark.read.format('csv')\
                .option('header', 'true')\
                .option("quote", "\"")\
                .option("escape", "\"")\
                .schema(schema)\
                .load('../../data/imdb-review-sentiment.csv')
    return df

In [24]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import Tokenizer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, Word2Vec, VectorAssembler
from pyspark.ml.classification import LinearSVC

class RemoveStopWordsAndSpecialCharacters(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None) -> None:
        super().__init__()
        self.stopwords = Param(self, 'stopwords', '')
        self._setDefault(stopwords=set())
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self
    
    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    def _transform(self, dataset):
        stopwords = self.getStopwords()
        def f(s):
            return [''.join(e for e in token if e.isalnum()) 
                    for token in s if token not in stopwords]
        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [29]:
stopwords = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're"]

tokenizer = Tokenizer(inputCol='text', outputCol='words')
cleaning = RemoveStopWordsAndSpecialCharacters(inputCol='words', outputCol='clean_words', stopwords=stopwords)
# hashingTF = HashingTF(inputCol='clean_words', outputCol='tf')
w2v = Word2Vec(vectorSize=2, inputCol='clean_words', outputCol='w2v', minCount=1, maxIter=1, numPartitions=5)
asm = VectorAssembler(inputCols=[w2v.getOutputCol()], outputCol='features')
svm = LinearSVC(labelCol='label', maxIter=5)

In [31]:
mypipeline = Pipeline(stages=[tokenizer, cleaning, w2v, asm, svm])
df = make_df()
df = mypipeline.fit(df).transform(df)
df.rdd.map(lambda row: 1 if row['label'] == row['prediction'] else 0).reduce(lambda x, y: x + y) / df.count()

                                                                                

0.60945

In [28]:
for size in [5, 10, 20, 40]:
    df = make_df()
    w2v = Word2Vec(vectorSize=size, inputCol='clean_words', outputCol='w2v', minCount=1, maxIter=1, numPartitions=5)
    mypipeline = Pipeline(stages=[tokenizer, cleaning, w2v, asm, svm])
    df = mypipeline.fit(df).transform(df)
    print(df.rdd.map(lambda row: 1 if row['label'] == row['prediction'] else 0).reduce(lambda x, y: x + y) / df.count())

                                                                                

0.625375


                                                                                

0.6431


                                                                                

0.7028




0.788875


                                                                                

In [10]:
import plotly.express as px
fig = px.line(x=[2, 5, 10, 20, 40], y=[0.60945, 0.625375, 0.6431, 0.7028, 0.788875], markers=True)
fig.update_traces(textposition="bottom right")
fig

In [None]:
sc.stop()