In [0]:
# Load in one of the tables
df = spark.sql("SELECT * FROM default.reviews_train").sample(0.1) 
#print((df.count(), len(df.columns)))

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.types import IntegerType, DoubleType, FloatType, DecimalType, LongType, ByteType, ShortType
import numpy as np

# Select only numeric columns for the correlation matrix
numeric_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType, FloatType, DecimalType, LongType, ByteType, ShortType))]

vector_col = "features"
assembler = VectorAssembler(inputCols=numeric_cols, outputCol=vector_col)
df_vector = assembler.transform(df)

# Compute the correlation matrix for the vector column
correlation_matrix = Correlation.corr(df_vector, vector_col).head()[0]

# Print the correlation matrix
print(str(correlation_matrix))

matrix_array = correlation_matrix.toArray().tolist()
# Set NumPy print options to suppress scientific notation
np.set_printoptions(suppress=True, precision=3)

# Convert 2D array into a DataFrame with feature names as column names
corr_df = spark.createDataFrame(matrix_array, numeric_cols)

# Show the correlation matrix with feature names
corr_df.show()

# Additionally, print the feature names for reference
print("Feature names:", numeric_cols)

DenseMatrix([[ 1.00000000e+00, -4.54607863e-04,  2.41603588e-01,
              -5.08534712e-02],
             [-4.54607863e-04,  1.00000000e+00,  8.58500773e-02,
              -1.94423826e-01],
             [ 2.41603588e-01,  8.58500773e-02,  1.00000000e+00,
              -4.27551613e-01],
             [-5.08534712e-02, -1.94423826e-01, -4.27551613e-01,
               1.00000000e+00]])
+--------------------+--------------------+-------------------+--------------------+
|            reviewID|             overall|     unixReviewTime|               label|
+--------------------+--------------------+-------------------+--------------------+
|                 1.0|-4.54607863177575...| 0.2416035882754715|-0.05085347115892889|
|-4.54607863177575...|                 1.0|0.08585007725218924|-0.19442382556024307|
|  0.2416035882754715| 0.08585007725218924|                1.0| -0.4275516132256553|
|-0.05085347115892889|-0.19442382556024307|-0.4275516132256553|                 1.0|
+---------------

In [0]:
"""print("Before duplication removal: ", df.count())
df = df.dropDuplicates(['reviewerID', 'asin'])
print("After duplication removal: ", df.count()) """

'print("Before duplication removal: ", df.count())\ndf = df.dropDuplicates([\'reviewerID\', \'asin\'])\nprint("After duplication removal: ", df.count()) '

In [0]:
# Correct usage
dtypes_series = df.dtypes
dtypes_series

[('reviewID', 'int'),
 ('overall', 'double'),
 ('verified', 'boolean'),
 ('reviewTime', 'string'),
 ('reviewerID', 'string'),
 ('asin', 'string'),
 ('reviewerName', 'string'),
 ('reviewText', 'string'),
 ('summary', 'string'),
 ('unixReviewTime', 'int'),
 ('label', 'int')]

In [0]:
# For our intitial modeling efforts, we are not going to use the following features
drop_list = ['image', 'style', "unixReviewTime"]
df = df.select([column for column in df.columns if column not in drop_list])
df = df.na.drop(subset=["reviewText", "label"])
#display(df)
print((df.count(), len(df.columns)))

(314342, 10)


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Count occurrences") \
    .getOrCreate()

# Group by 'reviewerID' and 'asin', and count occurrences
grouped_counts = df.groupBy('reviewerID', 'asin').agg(count('*').alias('count'))

# Convert to Pandas DataFrame for printing (only recommended for small result sets)
grouped_counts_pandas = grouped_counts.toPandas()
#display(grouped_counts_pandas)


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import split, col
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, DoubleType
import re
import nltk
from pyspark.sql.types import IntegerType
nltk.download('punkt')
from nltk.tokenize import sent_tokenize
from pyspark.sql.functions import col, regexp_replace

# Initialize Spark Session
spark = SparkSession.builder.appName("SentenceCount").getOrCreate()

# Download NLTK punkt tokenizer models
nltk.download('punkt')

# Define a Python function that uses NLTK for sentence tokenization
def nltk_sentence_count(text):
    if text and isinstance(text, str):
        sentences = sent_tokenize(text)
        return len(sentences)
    return 0


# Custom UDF to estimate the number of syllables
def syllable_count_estimation(text):
    count = 0
    vowels = "aeiouy"
    text = text.lower()
    if text == "":
        return 0
    if text[0] in vowels:
        count += 1
    for index in range(1, len(text)):
        if text[index] in vowels and text[index - 1] not in vowels:
            count += 1
    if text.endswith("e"):
        count -= 1
    if count == 0:
        count += 1
    return count

# Register the Python function as a PySpark UDF
sentence_count_udf = udf(nltk_sentence_count, IntegerType())


syllable_count_udf = udf(syllable_count_estimation, IntegerType())


# Calculating sentence, syllable, and word counts
df = df.withColumn("sentence_count", sentence_count_udf(col("reviewText")))
df = df.withColumn("syllable_count", syllable_count_udf(col("reviewText")))
df = df.withColumn("word_count", udf(lambda text: len(re.findall(r'\w+', text)), IntegerType())(col("reviewText")))

df = df.withColumn("sentence_count_summary", sentence_count_udf(col("summary")))

# Assuming your DataFrame is named df
df = (df
      .withColumn("reviewTime", split(col("reviewTime"), " "))
      .withColumn("reviewtime_month", col("reviewTime")[0])
      .withColumn("reviewtime_year", col("reviewTime")[2]))

df = (df
      .withColumn("reviewtime_day", regexp_replace(col("reviewTime")[1], ",", ""))
      .withColumn("reviewtime_day", regexp_replace(col("reviewtime_day"), " ", "")))


# Download the NLTK data (if not already downloaded)
nltk.download('words')

# Set of English words from NLTK
english_words = set(nltk.corpus.words.words())

# Function to count misspelled words
def count_misspelled_words(text):
    # Tokenize the text into words
    words = nltk.word_tokenize(text)
    
    # Find misspelled words
    misspelled = [word for word in words if word.lower() not in english_words]
    
    # Return the count of misspelled words
    return len(misspelled)

# Register the UDF
count_misspelled_udf = udf(count_misspelled_words, IntegerType())

# Apply the UDF to create a new column "misspelledWordsCount"
df = df.withColumn("misspelledWordsCount", count_misspelled_udf(col("reviewText")))


column_to_drop = "reviewTime"
df = df.drop(column_to_drop)

# Show the first few rows of the DataFrame
#df.show()


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package words to /root/nltk_data...
[nltk_data]   Package words is already up-to-date!


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Define a function to count characters in a string
def count_characters(text):
    if text:
        return len(text)
    else:
        return 0

# Register the function as a UDF
count_characters_udf = udf(count_characters, IntegerType())

# Apply the UDF to the "summary" column
df = df.withColumn("character_count_summary", count_characters_udf(col("summary")))

# Apply the UDF to the "reviewText" column
df = df.withColumn("character_count_reviewText", count_characters_udf(col("reviewText")))

#display(df)

In [0]:
from pyspark.sql.functions import col

# Assuming you have a DataFrame named 'df' with a column 'your_string_column' of type string
df = df.withColumn("reviewtime_month", col("reviewtime_month").cast("int"))
df = df.withColumn("reviewtime_year", col("reviewtime_year").cast("int"))
df = df.withColumn("reviewtime_day", col("reviewtime_day").cast("int"))


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF

# We'll tokenize the text using a simple RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="reviewerID", outputCol="words", pattern="\\W")


# Remove standard Stopwords
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")


# Vectorize the sentences using simple BOW method. Other methods are possible:
# https://spark.apache.org/docs/2.2.0/ml-features.html#feature-extractors
countVectors = CountVectorizer(inputCol="filtered", outputCol="reviewerID_features", vocabSize=10000, minDF=5)

# Generate Inverse Document Frequency weighting

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])


pipelineFit = pipeline.fit(df)
df = pipelineFit.transform(df)
#df.show(5)


Downloading artifacts:   0%|          | 0/13 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF

# We'll tokenize the text using a simple RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="asin", outputCol="words2", pattern="\\W")


# Remove standard Stopwords
stopwordsRemover = StopWordsRemover(inputCol="words2", outputCol="filtered2")


# Vectorize the sentences using simple BOW method. Other methods are possible:
# https://spark.apache.org/docs/2.2.0/ml-features.html#feature-extractors
countVectors = CountVectorizer(inputCol="filtered2", outputCol="asin_features", vocabSize=10000, minDF=5)


# Generate Inverse Document Frequency weighting

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

pipelineFit2 = pipeline.fit(df)
df = pipelineFit2.transform(df)
#df.show(5)

Downloading artifacts:   0%|          | 0/13 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
contractions_dict  = {
    "don't": "do not",
    "aren't": "are not",
    "can't": "cannot",
    "couldn't": "could not",
    "won't": "will not",
    "a’ight": "alright",
    "ain’t_am": "am not",
    "ain’t_is": "is not",
    "ain’t_are": "are not",
    "ain’t_has": "has not",
    "ain’t_have": "have not",
    "ain’t_did": "did not",
    "amn’t": "am not",
    "’n’": "and",
    "arencha": "are not you",
    "aren’t": "are not",
    "’bout": "about",
    "can’t": "cannot",
    "cap’n": "captain",
    "’cause": "because",
    "cuz": "because",
    "’cept": "except",
    "c’mon": "come on",
    "could’ve": "could have",
    "couldn’t": "could not",
    "couldn’t’ve": "could not have",
    "cuppa": "cup of",
    "daren’t": "dare not",
    "daresn’t": "dare not",
    "dasn’t": "dare not",
    "didn’t": "did not",
    "doesn't": "does not",
    "don’t": "do not",
    "dunno": "do not know",
    "d’ye": "do you",
    "d’ya": "did you",
    "e’en": "even",
    "e’er": "ever",
    "’em": "them",
    "everybody’s": "everybody is",
    "everyone’s": "everyone is",
    "everything's": "everything is",
    "finna": "fixing to",
    "fo’c’sle": "forecastle",
    "’gainst": "against",
    "g’day": "good day",
    "gimme": "give me",
    "giv’n": "given",
    "gi’z": "give us",
    "gonna": "going to",
    "gon’t": "go not",
    "gotta": "got to",
    "hadn’t": "had not",
    "had’ve": "had have",
    "hasn’t": "has not",
    "haven’t": "have not",
    "he’d": "he had / he would",
    "he'll": "he shall / he will",
    "helluva": "hell of a",
    "yes'nt": "yes not / no",
    "he’s": "he has / he is",
    "here’s": "here is",
    "how’d": "how did / how would",
    "howdy": "how do you do / how do you fare",
    "how’ll": "how will / how shall",
    "how’re": "how are",
    "how’s": "how has / how is / how does",
    "I’d": "I had / I would",
    "I’d’ve": "I would have",
    "I’d’nt": "I would not",
    "I’d’nt’ve": "I would not have",
    "If’n": "If and when",
    "I’ll": "I shall",
    "I'll": "I will",
    "I’m": "I am",
    "Imma": "I am about to / I am going to",
    "I’m’o": "I am going to",
    "innit": "isn’t it / ain’t it",
    "Ion": "I do not / I don't",
    "I’ve": "I have",
    "isn’t": "is not",
    "it’d": "it would",
    "it’ll": "it shall / it will",
    "it’s": "it has",
    "it's": "it is",
    "Idunno": "I do not know",
    "kinda": "kind of",
    "lemme": "let me",
    "let’s": "let us",
    "loven’t": "love not",
    "ma’am": "madam",
    "mayn’t": "may not",
    "may’ve": "may have",
    "methinks": "I think",
    "mightn’t": "might not",
    "might’ve": "might have",
    "mine’s": "mine is",
    "mustn’t": "must not",
    "mustn’t’ve": "must not have",
    "must’ve": "must have",
    "’neath": "beneath",
    "needn’t": "need not"
}

In [0]:
import re

def expand_contractions(text):
    if not isinstance(text, str):
        return text  # Handle cases where text is not a string
    
    contractions_pattern = re.compile('({})'.format('|'.join(contractions_dict.keys())), flags=re.IGNORECASE|re.DOTALL)
    
    def expand_match(contraction):
        match = contraction.group(0)
        # Use .get() with a default fallback to match to handle unmatched cases
        expanded_contraction = contractions_dict.get(match.lower(), match)
        return expanded_contraction
    
    return contractions_pattern.sub(expand_match, text)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

expand_contractions_udf = udf(expand_contractions, StringType())
df = df.withColumn('reviewText_expanded', expand_contractions_udf(df['reviewText']))

df.select("reviewText", "reviewText_expanded")

#display(df)


DataFrame[reviewText: string, reviewText_expanded: string]

In [0]:
# set seed for reproducibility
(trainingData, testingData) = df.randomSplit([0.8, 0.2], seed = 47)
print("Training Dataset Count:  " + str(trainingData.count()))
print("Test Dataset Count:     " + str(testingData.count()))

Training Dataset Count:  251744
Test Dataset Count:     62598


In [0]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.pipeline import Transformer
from pyspark.ml.linalg import VectorUDT
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.ml import PipelineModel
from nltk.stem import PorterStemmer
import re

# Define a custom transformer to remove punctuation and numbers
class CleanTextTransformer(Transformer):
    def __init__(self, inputCol="reviewText", outputCol="cleanedReviewText"):
        super(CleanTextTransformer, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, df: DataFrame) -> DataFrame:
        # Define a UDF to clean the text
        clean_text_udf = F.udf(lambda text: re.sub(r'[^a-zA-Z\s]', '', text) if text else text, F.StringType())

        # Apply the UDF to the specified column
        df = df.withColumn(self.outputCol, clean_text_udf(df[self.inputCol]))

        return df

# Define a custom transformer to perform stemming using NLTK
class NLTKStemming(Transformer):
    def __init__(self, inputCol="filtered", outputCol="stemmed"):
        super(NLTKStemming, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, df: DataFrame) -> DataFrame:
        # Define a UDF to apply stemming using NLTK
        stemmer = PorterStemmer()
        stem_udf = F.udf(lambda tokens: [stemmer.stem(token) for token in tokens], F.ArrayType(F.StringType()))

        # Apply the UDF to the specified column
        df = df.withColumn(self.outputCol, stem_udf(df[self.inputCol]))

        return df

# Define a custom transformer to prune rare words
class PruneRareWordsTransformer(Transformer):
    def __init__(self, inputCol="stemmed", outputCol="prunedStemmed", minDF=5):
        super(PruneRareWordsTransformer, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.minDF = minDF

    def _transform(self, df: DataFrame) -> DataFrame:
        # Calculate document frequency
        doc_freq = df.select(F.col(self.inputCol)).rdd.flatMap(lambda x: set(x[0])).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

        # Get terms to keep based on minDF
        terms_to_keep = set([term for term, freq in doc_freq.collect() if freq >= self.minDF])

        # Define a UDF to prune rare words
        prune_udf = F.udf(lambda tokens: [token for token in tokens if token in terms_to_keep], F.ArrayType(F.StringType()))

        # Apply the UDF to the specified column
        df = df.withColumn(self.outputCol, prune_udf(df[self.inputCol]))

        return df

# Create instances of transformers
regexTokenizer = RegexTokenizer(inputCol="cleanedReviewText", outputCol="words1", pattern="\\W")
stopwordsRemover = StopWordsRemover(inputCol="words1", outputCol="filtered1")
nltkStemming = NLTKStemming(inputCol="filtered1", outputCol="stemmed")
pruneRareWordsTransformer = PruneRareWordsTransformer(inputCol="stemmed", outputCol="prunedStemmed", minDF=5)
countVectors = CountVectorizer(inputCol="prunedStemmed", outputCol="rawfeatures", vocabSize=15000)

# Assemble the features
assembler = VectorAssembler(inputCols=["verified", "reviewtime_year", "word_count", "reviewtime_month", "reviewtime_day", "overall", "sentence_count", "sentence_count_summary", "syllable_count", "misspelledWordsCount", "character_count_summary", "character_count_reviewText", "reviewerID_features", "asin_features", "rawfeatures"], outputCol="features")

ml_alg = LogisticRegression(maxIter=20, regParam=0.1, elasticNetParam=0.0)

# Polynomial Expansion
poly_expansion = PolynomialExpansion(inputCol="features", outputCol="poly_features", degree=2)

# Create the pipeline with the custom text cleaning, stemming, pruning rare words transformers, count vectors, polynomial expansion, and logistic regression
cleanTextTransformer = CleanTextTransformer(inputCol="reviewText_expanded", outputCol="cleanedReviewText")
pipeline1 = Pipeline(stages=[cleanTextTransformer, regexTokenizer, stopwordsRemover, nltkStemming, pruneRareWordsTransformer, countVectors, assembler, poly_expansion, ml_alg])


In [0]:
pipelineFit1 = pipeline1.fit(trainingData)
trainingDataTransformed = pipelineFit1.transform(trainingData)
#display(trainingDataTransformed)

from pyspark.sql.types import DoubleType, IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Assuming 'df' is your initial DataFrame before applying the pipeline
# Apply the pipeline to your DataFrame

# Identify the numerical columns in the transformed DataFrame
# This includes both original numerical features and features created by PolynomialExpansion and other transformations
numerical_cols = [c[0] for c in trainingDataTransformed.dtypes if c[1] in ['int', 'double', 'float']]

# If the polynomial features are already in a vector form, just use that vector column
# Otherwise, create a new vector column that includes all the numerical features
vector_col = "num_features"
assembler = VectorAssembler(inputCols=numerical_cols, outputCol=vector_col)
df_vector = assembler.transform(trainingDataTransformed).select(vector_col)

# Compute the correlation matrix for the vector column
correlation_matrix = Correlation.corr(df_vector, vector_col).head()[0]

# Print the correlation matrix
print(str(correlation_matrix))

matrix_array = correlation_matrix.toArray().tolist()
# Set NumPy print options to suppress scientific notation
np.set_printoptions(suppress=True, precision=3)

# Convert 2D array into a DataFrame with feature names as column names
corr_df = spark.createDataFrame(matrix_array, numeric_cols)

# Show the correlation matrix with feature names
corr_df.show()

# Additionally, print the feature names for reference
print("Feature names:", numeric_cols)


DenseMatrix([[ 1.   , -0.   , -0.051, -0.091, -0.099, -0.098,  0.008,  0.002,
               0.24 , -0.001, -0.098, -0.02 , -0.099, -0.094],
             [-0.   ,  1.   , -0.195, -0.109, -0.107, -0.112, -0.008, -0.01 ,
               0.086, -0.003, -0.098, -0.095, -0.108, -0.168],
             [-0.051, -0.195,  1.   ,  0.361,  0.37 ,  0.372,  0.025,  0.007,
              -0.426,  0.005,  0.357,  0.154,  0.371,  0.541],
             [-0.091, -0.109,  0.361,  1.   ,  0.938,  0.946,  0.07 ,  0.003,
              -0.337,  0.005,  0.926,  0.237,  0.941,  0.541],
             [-0.099, -0.107,  0.37 ,  0.938,  1.   ,  0.997,  0.04 ,  0.003,
              -0.355,  0.005,  0.972,  0.254,  0.999,  0.566],
             [-0.098, -0.112,  0.372,  0.946,  0.997,  1.   ,  0.043,  0.003,
              -0.355,  0.005,  0.971,  0.257,  0.998,  0.567],
             [ 0.008, -0.008,  0.025,  0.07 ,  0.04 ,  0.043,  1.   ,  0.007,
               0.014,  0.   ,  0.045,  0.3  ,  0.041,  0.026],
             

In [0]:
"""# Logistic Regression
ml_alg = LogisticRegression(maxIter=20, regParam=0.1, elasticNetParam=0.0)
lrModel = ml_alg.fit(trainingDataTransformed) """

'# Logistic Regression\nml_alg = LogisticRegression(maxIter=20, regParam=0.1, elasticNetParam=0.0)\nlrModel = ml_alg.fit(trainingDataTransformed) '

In [0]:
"""# Extract the feature coefficients and feature names
# After fitting the model, extract the coefficients
coefficients = lrModel.coefficients.toArray()

# Get the feature names from the VectorAssembler stage (if metadata is available)
# Note: This assumes your VectorAssembler is named 'assembler' and it is the last stage before the classifier
feature_names = assembler.getInputCols()

# Combine feature names and coefficients
feature_coefficients = list(zip(feature_names, coefficients))

# Convert to a DataFrame for easy viewing (if you are using a Jupyter notebook or have pandas installed)
import pandas as pd
feature_coefficients_df = pd.DataFrame(feature_coefficients, columns=["Feature", "Coefficient"])

# Sort the features by the absolute value of their coefficient to determine "importance"
feature_coefficients_df['abs(Coefficient)'] = feature_coefficients_df['Coefficient'].abs()
sorted_features = feature_coefficients_df.sort_values('abs(Coefficient)', ascending=False)

# Display the sorted features
display(sorted_features) """

'# Extract the feature coefficients and feature names\n# After fitting the model, extract the coefficients\ncoefficients = lrModel.coefficients.toArray()\n\n# Get the feature names from the VectorAssembler stage (if metadata is available)\n# Note: This assumes your VectorAssembler is named \'assembler\' and it is the last stage before the classifier\nfeature_names = assembler.getInputCols()\n\n# Combine feature names and coefficients\nfeature_coefficients = list(zip(feature_names, coefficients))\n\n# Convert to a DataFrame for easy viewing (if you are using a Jupyter notebook or have pandas installed)\nimport pandas as pd\nfeature_coefficients_df = pd.DataFrame(feature_coefficients, columns=["Feature", "Coefficient"])\n\n# Sort the features by the absolute value of their coefficient to determine "importance"\nfeature_coefficients_df[\'abs(Coefficient)\'] = feature_coefficients_df[\'Coefficient\'].abs()\nsorted_features = feature_coefficients_df.sort_values(\'abs(Coefficient)\', ascendi

In [0]:
"""import matplotlib.pyplot as plt

# Extract the feature coefficients and feature names
# After fitting the model, extract the coefficients
coefficients = lrModel.coefficients.toArray()

# Get the feature names from the VectorAssembler stage (if metadata is available)
# Note: This assumes your VectorAssembler is named 'assembler' and it is the last stage before the classifier
feature_names = assembler.getInputCols()

# Combine feature names and coefficients
feature_coefficients = list(zip(feature_names, coefficients))

# Convert to a DataFrame for easy viewing (if you are using a Jupyter notebook or have pandas installed)
import pandas as pd
feature_coefficients_df = pd.DataFrame(feature_coefficients, columns=["Feature", "Coefficient"])

# Sort the features by the absolute value of their coefficient to determine "importance"
feature_coefficients_df['abs(Coefficient)'] = feature_coefficients_df['Coefficient'].abs()
sorted_features = feature_coefficients_df.sort_values('abs(Coefficient)', ascending=False)

# Plot the sorted features
plt.figure(figsize=(10, 6))
plt.barh(sorted_features["Feature"], sorted_features["Coefficient"], color='skyblue')
plt.xlabel('Coefficient Value')
plt.ylabel('Feature')
plt.title('Sorted Feature Coefficients')
plt.gca().invert_yaxis()  # Invert y-axis to have highest coefficient at the top
plt.show() """


'import matplotlib.pyplot as plt\n\n# Extract the feature coefficients and feature names\n# After fitting the model, extract the coefficients\ncoefficients = lrModel.coefficients.toArray()\n\n# Get the feature names from the VectorAssembler stage (if metadata is available)\n# Note: This assumes your VectorAssembler is named \'assembler\' and it is the last stage before the classifier\nfeature_names = assembler.getInputCols()\n\n# Combine feature names and coefficients\nfeature_coefficients = list(zip(feature_names, coefficients))\n\n# Convert to a DataFrame for easy viewing (if you are using a Jupyter notebook or have pandas installed)\nimport pandas as pd\nfeature_coefficients_df = pd.DataFrame(feature_coefficients, columns=["Feature", "Coefficient"])\n\n# Sort the features by the absolute value of their coefficient to determine "importance"\nfeature_coefficients_df[\'abs(Coefficient)\'] = feature_coefficients_df[\'Coefficient\'].abs()\nsorted_features = feature_coefficients_df.sort_v

In [0]:
predictions = pipelineFit1.transform(testingData)
predictions.groupBy("prediction").count().show()

+----------+------+
|prediction| count|
+----------+------+
|       0.0|574428|
|       1.0| 52448|
+----------+------+



In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

#acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
#pre_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
#rec_evaluator = MulticlassClassificationEvaluator(metricName="weightedRecall")
#pr_evaluator  = BinaryClassificationEvaluator(metricName="areaUnderPR")
auc_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")

#print("Test Accuracy       = %g" % (acc_evaluator.evaluate(predictions)))
#print("Test Precision      = %g" % (pre_evaluator.evaluate(predictions)))
#print("Test Recall         = %g" % (rec_evaluator.evaluate(predictions)))
#print("Test areaUnderPR    = %g" % (pr_evaluator.evaluate( predictions)))
print("Test areaUnderROC   = %g" % (auc_evaluator.evaluate(predictions)))

# 0.864462
# 0.866361
# 0.867055
# 0.867874
# 0.86762
# 0.867881
# 0.872519
# 0.874313
# 0.871431 - with IDF on all pipelines (brought down score)
# 0.887075 - removing IDF and adding more contractions to the list
# 0.887166 - including reviewtext and summary character count
# 0.88219 - running on 100% of the dataset 
# 0.883325 - Included hyper parameter tuning

Test areaUnderROC   = 0.883325


In [0]:
test_df = spark.sql("select * from default.reviews_test")
test_df.show(5)
print((test_df.count(), len(test_df.columns)))

+--------+-------+--------+-----------+--------------+----------+------------+--------------------+--------------------+--------------+
|reviewID|overall|verified| reviewTime|    reviewerID|      asin|reviewerName|          reviewText|             summary|unixReviewTime|
+--------+-------+--------+-----------+--------------+----------+------------+--------------------+--------------------+--------------+
|80000001|    4.0|   false|07 27, 2015|A1JGAP0185YJI6|0700026657|      travis|I played it a whi...|But in spite of t...|    1437955200|
|80000002|    5.0|    true| 03 3, 2014|A1WK5I4874S3O2|0700026657|  WhiteSkull|I bought this gam...|A very good game ...|    1393804800|
|80000003|    5.0|    true|01 12, 2013|A1YDQQJDRHM0FJ|0001713353|       Leila|I am very happy w...|One of our famili...|    1357948800|
|80000004|    5.0|    true|11 20, 2011|A2E6AHFDJ3JBAZ|0681795107|    robosolo|I purchased two o...|Insulated stainle...|    1321747200|
|80000005|    5.0|   false|06 28, 2011|A38NXTZUF

In [0]:
# For our intitial modeling efforts, we are not going to use the following features
drop_list = ['image', 'style', 'reviewerName', "unixReviewTime"]
test_df = test_df.select([column for column in test_df.columns if column not in drop_list])
test_df = test_df.na.drop(subset=["reviewText"])
display(test_df)  
print((test_df.count(), len(test_df.columns)))

reviewID,overall,verified,reviewTime,reviewerID,asin,reviewText,summary
80000001,4.0,False,"07 27, 2015",A1JGAP0185YJI6,0700026657,"I played it a while but it was alright. The steam was a bit of trouble. The more they move these game to steam the more of a hard time I have activating and playing a game. But in spite of that it was fun, I liked it. Now I am looking forward to anno 2205 I really want to play my way to the moon.","But in spite of that it was fun, I liked it"
80000002,5.0,True,"03 3, 2014",A1WK5I4874S3O2,0700026657,"I bought this game thinking it would be pretty cool and that i might play it for a week or two and be done. Boy was I wrong! From the moment I finally got the gamed Fired up (the other commentors on this are right, it takes forever and u are forced to create an account) I watched as it booted up I could tell right off the bat that ALOT of thought went into making this game. If you have ever played Sim city, then this game is a must try as you will easily navigate thru it and its multi layers. I have been playing htis now for a month straight, and I am STILL discovering layers of complexity in the game. There are a few things in the game that could used tweaked, but all in all this is a 5 star game.",A very good game balance of skill with depth of choices
80000003,5.0,True,"01 12, 2013",A1YDQQJDRHM0FJ,0001713353,I am very happy with the book!!! It is one of my children's favorite books and I was so pleased I could order it here!!!,One of our families favorite books!!!
80000004,5.0,True,"11 20, 2011",A2E6AHFDJ3JBAZ,0681795107,"I purchased two of these cups for camping but I like them so much I use one daily. Unlike other thermos-type cups, this one actually keeps the beverage hot for quite some time, especially if you place the cap on it. The cap itself is really unique in the way it works. After filling with a hot liquid the cap fits rather loosely in place. But, after a few seconds heat expands it and it wedges itself in place very securely. You can still pry it off but not easily when it's hot which is just what you'd want. Also, that cap has a neat little sliding anti-splash tab on the top that actually works. The cup is wide enough to be stable sitting on a flat surface - with or without liquid in it - yet fits easily into a car tray cup holder for travel. After using the cup for a few days I decided that I didn't want the nylon handle. I simply used a heat gun (on low) to gently heat it up and just pulled it down and off of the cup. That handle is secured to the cup with some glue and a slight ridge running around its inner circumference. It was easy to clean off the remaining glue from the stainless steel and I could - if I wanted to - re-glue and reattach the handle. This is a great insulated stainless steel cup with or without a handle and well worth the price.",Insulated stainless steel cup that actually works
80000005,5.0,False,"06 28, 2011",A38NXTZUFB1O2K,0700099867,"I'm not quite finished with the game's DiRT Tour mode, but I believe I've experienced the bulk of what the game has to offer. And I'm happy to say that the game is indeed awesome. Great cars, great tracks and racing modes, excellent gameplay and graphics. The highlight has been racing in the snow on various tracks in Aspen. There are also some really cool obstacle courses on pavement, which are new to the series. At first, I felt that the game was plagued with the same difficulty spike DiRT 2 had, where you can win any race without trying on casual difficulty, then lose big time on medium. Then I found the custom difficulty settings where you can choose from 5 levels of challenge, then pick and choose from all the other assists and handling options. Basically, you can let the game play itself by just holding the throttle and steering, or take gradual steps to reach what you feel works best for you. Love this feature! Sadly, there is no option to increase the 360 controller's dead zone or adjust the sensitivity. I'm doing ok as is, but some tweaking would be ideal. Much like the previous two games, the handling is a bit flighty. But that's speeding on loose ground for you. Still, I feel things have tightened up somewhat since DiRT 2. Still no cigar, but it's a step forward. The game runs as smooth as silk on my 560ti wih all the settings maxed. No issues whatsoever. If you enjoy racing games, this is not one you should ignore. It's the best DiRT so far.",Best in the series!
80000006,1.0,True,"06 28, 2014",A1INA0F5CWW3J4,0700099867,1st shipment received a book instead of the game. 2nd shipment got a FAKE one. Game arrived with a wrong key inside on sealed box. I got in contact with codemasters and send them pictures of the DVD and the content. They said nothing they can do its a fake DVD. Returned it good bye.!,Wrong key
80000007,2.0,True,"07 11, 2013",A1BHRNLW2L8KLD,0700099867,"The game itself is great, but Games for Windows Live is such a frustrating piece of garbage. I wish it would just die and stop bringing down my PC gaming experience.",Games for Windows Live...
80000008,4.0,False,"12 6, 2012",A3J9C2WMW0TZYB,0700099867,"I really enjoyed the Dirt series of racing games. I actually started with the Colin McRae Rally 3 demo, which came before Dirt. Dirt 1 was a nice game with lots of different types of races in varied environments, including Pike's Peak. You got to race semi trucks, cargo trucks, trophy trucks, SUVs, and rally cars in Dirt 1. The advancement was like a pyramid, unlocking levels as you go. I have a Toyota Tundra, so it is easy to like a game that has a Toyota Tundra to race in it. Dirt 2 made it more radical with nice music and an attitude. The races were longer and became more X-Games like. I enjoyed the Baja races very much. But, I did not like the Microsoft Live part of it where it takes a long time to get initiated, patched, etc before you can actually play for the first time. Once you get past this, things get better. The choice of vehicles were more limited, but the liveries try to make up for it. Dirt 3 keeps the same attitude and cool music, but also has the same Microsoft Live treatment: sign in, patch , etc. before the first game time. Once you get past this, it does get better. At first, I could not get used to the way the game downshifts automatically in the turns to automatically slow you down. I moved the difficulty up a notch to intermediate and it was much better and more of what I was expecting. I use a gamepad to play on the PC. I was pleasantly surprised that my 9800 GTX 512 MB plays this game rather well at 1680x1050 (no AA). The races are largely rally with some interesting winter-time truck races in the snow. The cars are not quite as varied as I would like. I also hate that there is in-game content that can be purchased additionally from Microsoft Live. Come on! I already paid for the game and there are some levels that are pay to play, so you won't really finish the season until you pony up more funds to Microsoft Live. Many of the cars are available to purchase too. That is cheesy and not appreciated. You don't need to purchase these to advance though. The gymkhana events are interesting and challenging. Being a hoonigan takes practice, patience, and many banged up cars. The Ken Block videos on youtube really inspired me to practice as I had no idea what gymkhana was before Dirt 3. I do miss the Baja style of offroading from Dirt 2. Dirt 3 seems to have a focus on other things, but it is still fun overall. For $12, I got my money's worth. Maybe I will get Dirt Showdown for some demolition derby excitement, or maybe not.","Dirt, Dirt 2, and Dirt 3"
80000009,5.0,True,"05 14, 2010",A1FZV5FWLJKYU7,1581174292,"I originally only knew this story/song in French! Now my baby can also learn it in English. She's only 3 months, but she enjoys us reading it in a sing-song way and doing sound effects for the animals. It's short enough to keep her attention and let us finish this one. Very cute!",Fun for my baby girl
80000010,4.0,False,"03 19, 2012",A20DRRKAN5Z9Q,0700099867,"In today's game market it's easy to miss racing games that aren't part of the license-heavy Forza or Gran Turismo franchises. Dirt 3 for PC is in an even worse boat than most since it's been a giveaway for a couple years now with every DX11 compliant ATI video card. As a result a lot of people who get it send it straight to eBay and most people who do play it have not bought it new. This is a shame because Dirt 3 is a fun little game. There's a great assortment of multinational tracks, and a wide array of classic and modern rally cars to choose from. As a rally game, Dirt 3 ditches the comfortable confines of tracks and sends you headlong across snow and gravel in a world where keeping control of your car is a greater skill than making it around the turns. Dirt 3 wears a veneer of noisy, fun arcade-style game play, but under the hood it calls for quick reactions, a good understanding of road surfaces, and nerves of steel. Sadly, it also calls for a gamepad or wheel- you will almost certainly find the keyboard too all-or-nothing for serious racing. PROS: -Visually stunning engine that makes full use of DirectX 11 effects -Soundtrack is an appealing mix of classic prog rock and modern music that does a good job of being vaguely European and not too intrusive -Successfully models an arcade-like complexity of play with a surprisingly deep simulation of track conditions and vehicle handling -In-car camera gives a stomach-churning stream of nonstop thrills as you launch over crests and 2-wheel hairpins -Probably the most realistic depiction of drifting/powersliding to appear in a video game (it's frightening, nauseating, and usually a bad idea) -3D support! CONS: -Lower difficulties aren't much fun, highest difficulty is brutally hard -Small fanbase of hardcore racing nuts may make multiplayer a bit hard for newcomers (Dirt 3 players seem pretty nice though) -Essentially requires some sort of analog input -May confuse players who aren't familiar with the rules and conditions of rally-style racing Other thoughts: -You need to plug your youtube credentials into the game to let it export your highlights to YouTube. I don't think CodeMasters wants to rip you off, but if it makes you uncomfortable, create a new Google identity for use with Dirt 3. Easy! -LEARN THE TRACKS. Learn the tracks, learn the tracks, learn the tracks. -If you want to win, employ a Fin!",An overlooked gem in the Forza/GT treasure trove


(348621, 8)


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Count occurrences") \
    .getOrCreate()

# Group by 'reviewerID' and 'asin', and count occurrences
grouped_counts = test_df.groupBy('reviewerID', 'asin').agg(count('*').alias('count'))

# Convert to Pandas DataFrame for printing (only recommended for small result sets)
grouped_counts_pandas = grouped_counts.toPandas()
#display(grouped_counts_pandas)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import split, col
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, DoubleType
import re
import nltk
from pyspark.sql.types import IntegerType
nltk.download('punkt')
from nltk.tokenize import sent_tokenize
from pyspark.sql.functions import col, regexp_replace

# Initialize Spark Session
spark = SparkSession.builder.appName("SentenceCount").getOrCreate()

# Download NLTK punkt tokenizer models
nltk.download('punkt')

# Define a Python function that uses NLTK for sentence tokenization
def nltk_sentence_count(text):
    if text and isinstance(text, str):
        sentences = sent_tokenize(text)
        return len(sentences)
    return 0


# Custom UDF to estimate the number of syllables
def syllable_count_estimation(text):
    count = 0
    vowels = "aeiouy"
    text = text.lower()
    if text == "":
        return 0
    if text[0] in vowels:
        count += 1
    for index in range(1, len(text)):
        if text[index] in vowels and text[index - 1] not in vowels:
            count += 1
    if text.endswith("e"):
        count -= 1
    if count == 0:
        count += 1
    return count

# Register the Python function as a PySpark UDF
sentence_count_udf = udf(nltk_sentence_count, IntegerType())


syllable_count_udf = udf(syllable_count_estimation, IntegerType())


# Calculating sentence, syllable, and word counts
test_df = test_df.withColumn("sentence_count", sentence_count_udf(col("reviewText")))
test_df = test_df.withColumn("syllable_count", syllable_count_udf(col("reviewText")))
test_df = test_df.withColumn("word_count", udf(lambda text: len(re.findall(r'\w+', text)), IntegerType())(col("reviewText")))

test_df = test_df.withColumn("sentence_count_summary", sentence_count_udf(col("summary")))


# Assuming your DataFrame is named df
test_df = (test_df
      .withColumn("reviewTime", split(col("reviewTime"), " "))
      .withColumn("reviewtime_month", col("reviewTime")[0])
      .withColumn("reviewtime_year", col("reviewTime")[2]))

test_df = (test_df
      .withColumn("reviewtime_day", regexp_replace(col("reviewTime")[1], ",", ""))
      .withColumn("reviewtime_day", regexp_replace(col("reviewtime_day"), " ", "")))


# Download the NLTK data (if not already downloaded)
nltk.download('words')

# Set of English words from NLTK
english_words = set(nltk.corpus.words.words())

# Function to count misspelled words
def count_misspelled_words(text):
    # Tokenize the text into words
    words = nltk.word_tokenize(text)
    
    # Find misspelled words
    misspelled = [word for word in words if word.lower() not in english_words]
    
    # Return the count of misspelled words
    return len(misspelled)

# Register the UDF
count_misspelled_udf = udf(count_misspelled_words, IntegerType())

# Apply the UDF to create a new column "misspelledWordsCount"
test_df = test_df.withColumn("misspelledWordsCount", count_misspelled_udf(col("reviewText")))


column_to_drop = "reviewTime"
test_df = test_df.drop(column_to_drop)

# Show the first few rows of the DataFrame
#df.show()
#display(test_df)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package words to /root/nltk_data...
[nltk_data]   Package words is already up-to-date!


In [0]:
from pyspark.sql.functions import col

# Assuming you have a DataFrame named 'df' with a column 'your_string_column' of type string
test_df = test_df.withColumn("reviewtime_month", col("reviewtime_month").cast("int"))
test_df = test_df.withColumn("reviewtime_year", col("reviewtime_year").cast("int"))
test_df = test_df.withColumn("reviewtime_day", col("reviewtime_day").cast("int"))


In [0]:

# Apply the UDF to the "summary" column
test_df = test_df.withColumn("character_count_summary", count_characters_udf(col("summary")))

# Apply the UDF to the "reviewText" column
test_df = test_df.withColumn("character_count_reviewText", count_characters_udf(col("reviewText")))

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

expand_contractions_udf = udf(expand_contractions, StringType())
test_df = test_df.withColumn('reviewText_expanded', expand_contractions_udf(test_df['reviewText']))

test_df.select("reviewText", "reviewText_expanded")

#display(test_df)


DataFrame[reviewText: string, reviewText_expanded: string]

In [0]:
#pipelineFit = pipeline.fit(test_df)
test_df = pipelineFit.transform(test_df)
#test_df.show(5)

In [0]:
test_df = pipelineFit2.transform(test_df)
#test_df.show(5)

In [0]:
#pipelineFit1 = pipeline1.fit(test_df)
kaggle_pred = pipelineFit1.transform(test_df)
kaggle_pred.groupBy("prediction").count().show()


+----------+------+
|prediction| count|
+----------+------+
|       0.0|319557|
|       1.0| 29064|
+----------+------+



In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

probelement=udf(lambda v:float(v[1]),FloatType())
submission_data = kaggle_pred.select('reviewID', probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')

In [0]:
# Download this and submit to Kaggle!
display(submission_data.select(["reviewID", "label"]))

reviewID,label
80000001,0.06963447
80000002,0.16733617
80000003,0.115543015
80000004,0.69163173
80000005,0.25584042
80000006,0.27263528
80000007,0.16403653
80000008,0.7377266
80000009,0.15623783
80000010,0.73529387
