In [1]:
# !apt-get update
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !ls
# !mkdir test
# !rm -r test
# !wget -q https://apache.osuosl.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# !ls
# !tar xf spark-3.2.1-bin-hadoop3.2.tgz
# !pwd
# !ls /content/

# # Set up Spark
# !pip install -q findspark
# !pip install py4j
# !pip install koalas

# !export JAVA_HOME=$(/usr/lib/jvm/java-8-openjdk-amd64 -v 1.8)
# ! echo $JAVA_HOME
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
# import findspark
# findspark.init("spark-3.2.1-bin-hadoop3.2")# SPARK_HOME

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
# import pyspark.sql.functions as F
from pyspark.sql.functions import col,size,count,when,isnan, udf

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

class CustomTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
  output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)
  
  @keyword_only
  def __init__(self, input_col: str = "input", output_col: str = "output"):
    super(CustomTransformer, self).__init__()
    self._setDefault(input_col=None, output_col=None)
    kwargs = self._input_kwargs
    self.set_params(**kwargs)
    
  @keyword_only
  def set_params(self, input_col: str = "input", output_col: str = "output"):
    kwargs = self._input_kwargs
    self._set(**kwargs)
    
  def get_input_col(self):
    return self.getOrDefault(self.input_col)
  
  def get_output_col(self):
    return self.getOrDefault(self.output_col)
  
  def _transform(self, df: DataFrame):
    input_col = self.get_input_col()
    output_col = self.get_output_col()
    # The custom action: concatenate the integer form of the doubles from the Vector
    transform_udf = udf(lambda x: '/'.join([str(int(y)) for y in x]), StringType())
    return df.withColumn(output_col, transform_udf(input_col))

In [4]:
class label_transformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
  output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)
  
  @keyword_only
  def __init__(self, input_col: str = "input", output_col: str = "output"):
    super(label_transformer, self).__init__()
    self._setDefault(input_col=None, output_col=None)
    kwargs = self._input_kwargs
    self.set_params(**kwargs)
    
  @keyword_only
  def set_params(self, input_col: str = "input", output_col: str = "output"):
    kwargs = self._input_kwargs
    self._set(**kwargs)
    
  def get_input_col(self):
    return self.getOrDefault(self.input_col)
  
  def get_output_col(self):
    return self.getOrDefault(self.output_col)
  
  def _transform(self, df: DataFrame):
    input_col = self.get_input_col()
    output_col = self.get_output_col()
    # The custom action
    df1 = df.withColumn(output_col, \
                           (when(col(input_col).like("%my dog%"), 1) \
                           .when(col(input_col).like("%I have a dog%"), 1) \
                           .when(col(input_col).like("%my cat%"), 1) \
                           .when(col(input_col).like("%I have a cat%"), 1) \
                           .when(col(input_col).like("%my puppy%"), 1) \
                           .when(col(input_col).like("%my pup%"), 1) \
                           .when(col(input_col).like("%my kitty%"), 1) \
                           .when(col(input_col).like("%my pussy%"), 1) \
                           .otherwise(0)))
    return df1

In [5]:
# from google.colab import drive
# drive.mount('/content/drive')
data_path = "test_2022-04-20.csv"
# data_path = "drive/My Drive/20210401/TDI_youtube_comment/animals_comments.csv"
test_review = spark.read.format("csv").option("header", "true").load(data_path)
# test_review.show(30)

In [6]:
from nltk.stem.porter import *
from pyspark.sql.functions import udf
from pyspark.sql.types import *

class stem_transformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
  output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)
  
  @keyword_only
  def __init__(self, input_col: str = "input", output_col: str = "output"):
    super(stem_transformer, self).__init__()
    self._setDefault(input_col=None, output_col=None)
    kwargs = self._input_kwargs
    self.set_params(**kwargs)
    
  @keyword_only
  def set_params(self, input_col: str = "input", output_col: str = "output"):
    kwargs = self._input_kwargs
    self._set(**kwargs)
    
  def get_input_col(self):
    return self.getOrDefault(self.input_col)
  
  def get_output_col(self):
    return self.getOrDefault(self.output_col)
  
  def _transform(self, df: DataFrame):
    input_col = self.get_input_col()
    output_col = self.get_output_col()

    # Instantiate stemmer object
    stemmer = PorterStemmer()

    # Create stemmer python function
    def stem(in_vec):
        out_vec = []
        for t in in_vec:
            t_stem = stemmer.stem(t)
            if len(t_stem) > 1:
                out_vec.append(t_stem)       
        return out_vec

    # Create user defined function for stemming with return type Array<String>
    stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

    # Create new column with vectors containing the stemmed tokens 
    df1 = df.withColumn(output_col, stemmer_udf(input_col))
    return df1

In [7]:
# from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel

# persistedModel = LogisticRegressionModel.load("drive/My Drive/20210401/TDI_youtube_comment/lr_best_model")
# predictions = persistedModel.transform(df_model)
# predictions.show(10)

In [9]:
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
persistedModel = GBTClassificationModel.load("gbt_best_model")
# persistedModel = GBTClassificationModel.load("drive/My Drive/20210401/TDI_youtube_comment/gbt_best_model")
# predictions = persistedModel.transform(df_model)
# predictions.show(10)

In [10]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec

custom_transformer = label_transformer(input_col="comment", output_col="label")
regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="text", pattern="\\W")
remover = StopWordsRemover(inputCol="text", outputCol="vector_no_stopw")
custom_transformer1 = stem_transformer(input_col="vector_no_stopw", output_col="vector_stemmed")
word2Vec = Word2Vec(vectorSize=50, minCount=1, inputCol="vector_stemmed", outputCol="wordVector")

In [11]:
pipeline = Pipeline(stages=[custom_transformer, regexTokenizer,remover, custom_transformer1, word2Vec, persistedModel])
model = pipeline.fit(test_review)
results = model.transform(test_review)
results.show()

+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|        creator_name|              userid|             comment|label|                text|     vector_no_stopw|      vector_stemmed|          wordVector|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|         Shiki Amabe|Ugh2nz-8o6JNLXgCoAEC|i'm 100% a cat pe...|    0|[i, m, 100, a, ca...|[m, 100, cat, per...|[100, cat, person...|[0.01140446082146...|[1.32590267922033...|[0.93412217565278...|       0.0|
|           Animalist|UgjRhSL--aI5NHgCoAEC|Nothing sounds cu...|    0|[nothing, sounds,...|[nothing, sounds,...|[noth, sound, cut...|[0.00233010696247...|[1.32590267922

In [12]:
Num_Pos_Label = results.filter(F.col('prediction') == 1.0).count() 
print(Num_Pos_Label)

NameError: name 'F' is not defined

In [None]:
final_result = results.select(['creator_name', 'userid']).where((results["label"] == 1) | (results["prediction"] == 1.0))
final_result.show()