In [1]:
from pyspark.sql.types import StructType, StructField, StringType, LongType
jsonSchema = StructType([
    StructField('label', StringType(), True),
    StructField('tweet_id', LongType(), True),
    StructField('tweet_text', StringType(), True)
])

#replace the file path
df=spark.read.format("json").schema(jsonSchema).load("/Users/Pavel/Documents/KULeuven/Courses/AdvancedAnalyticsinBigDataWorld/spark/data/*")

In [2]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import ltrim
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd

class RegexReplacerWritable(
    Transformer, DefaultParamsReadable, DefaultParamsWritable,
):
    #value = Param(
    #   Params._dummy(),
    #   "value",
    #   "value to fill",
    #)

    @keyword_only
    def __init__(self):
        super(RegexReplacerWritable, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this RegexReplacerWritable.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df: DataFrame) -> DataFrame:
        #Converting all letters to lowercase
        df = df.withColumn("tweet_text",f.lower(f.col("tweet_text")))
        #removing punctuations, numbers, http and spaces
        df = df.withColumn("tweet_text",f.regexp_replace(f.col("tweet_text"),'([^ a-zA-Z\'])',''))
        df = df.withColumn("tweet_text",f.regexp_replace(f.col("tweet_text"),'http.*?\\b',' '))
        df = df.withColumn("tweet_text",f.ltrim(f.regexp_replace(f.col("tweet_text"),'[\r\n\t\f\v ]+', ' ')))
        return df

In [3]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import ltrim
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd



class UDLemmatization(
    Transformer, DefaultParamsReadable, DefaultParamsWritable,
):
    @keyword_only
    def __init__(self):
        super(UDLemmatization, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this RegexReplacerWritable.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df: DataFrame) -> DataFrame:
        
        lemmatizer = WordNetLemmatizer() 
        pandas_df = df.select("*").toPandas()
        pandas_df['lemmatized'] = pandas_df['words'].apply(
                    lambda lst:[lemmatizer.lemmatize(word) for word in lst])
        pandas_df['lemmatized']
        df = spark.createDataFrame(pandas_df)
        return df
    
    

m = __import__("__main__"); 
setattr(m, 'UDLemmatization', UDLemmatization)

In [4]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import ltrim
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd



class UDShortWordsRemover(
    Transformer, DefaultParamsReadable, DefaultParamsWritable,
):
    @keyword_only
    def __init__(self):
        super(UDShortWordsRemover, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this RegexReplacerWritable.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df: DataFrame) -> DataFrame:
        
        df = df.withColumn("filtered2", f.expr("filter(filtered, x -> not(length(x) < 3))")).where(f.size(f.col("filtered2")) > 0).drop("filtered")
        return df

## Pipeline preparation

In [13]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer, StringIndexer, IDF, HashingTF, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
import nltk
from nltk.stem import WordNetLemmatizer



# 1. Regex Filter replacer
regexrep = RegexReplacerWritable()

# 2. Tokenizer - splitting words 
tokenizer = Tokenizer(inputCol="tweet_text", outputCol="words")

# 3. Lemmatizer user defined
lemmatizerUD = UDLemmatization()

# 4. Stop Words Remover
stopwordList = ["u","ur", "amp", "q"] 
stopwordList.extend(StopWordsRemover().getStopWords())
remover = StopWordsRemover(inputCol="words", outputCol="filtered" ,stopWords=stopwordList)

# 5. Short Words len < 3 user defined remover 
shortWordsremover = UDShortWordsRemover()

# 6. Count Vectorizer
cv = CountVectorizer(inputCol="filtered2", outputCol="features")

# 7. IDF
idf = IDF(inputCol = "features", outputCol = "tf_idf_features")

# 8. String Indexer
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "labelIndex")

# 9. Logistic Regression
lr = LogisticRegression(labelCol = "labelIndex", featuresCol = "tf_idf_features", maxIter=20, regParam=0.3, elasticNetParam=0)

# 10. Index to String, for now labels only, not prediction - TODO
converter = IndexToString(inputCol="labelIndex", outputCol="labelOriginal")

#create the pipeline
pipeline = Pipeline(stages=[regexrep, tokenizer, lemmatizerUD, remover, shortWordsremover, cv, idf, label_stringIdx, lr, converter])

In [14]:
pipelineFit = pipeline.fit(df)

In [15]:
pipelineFit.write().overwrite().save('lr_model')