In [32]:
sc.install_pypi_package("pandas==0.25.1")
# sc.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Downloading https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl (10.4MB)
Installing collected packages: pandas
Successfully installed pandas-0.25.1

You are using pip version 9.0.1, however version 22.0.4 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

## Read CSV File and Get DataFrame (pyspark)

In [3]:
import pyspark.sql.functions as F
import re
from pyspark.sql.types import *

train_path = "s3://myprojectsentiment/data/train.csv"
test_path = "s3://myprojectsentiment/data/test.csv"

def readCSV(path):
    df = (spark.read
      .option("charset", "utf-8")
      .option("multiline", "true")
      .option("quote", '"')
      .option("header", "true")
      .option("escape", "\\")
      .option("escape", '"')
      .csv(path))
    df = df.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in df.columns])
    df = df.withColumn('label', F.col('label').cast(DoubleType()))
    return df

train_df = readCSV(train_path)
test_df = readCSV(test_path)

train_df.printSchema()
train_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- content: string (nullable = true)
 |-- label: double (nullable = true)

+--------------------+-----+
|             content|label|
+--------------------+-----+
|Working with one ...|  0.0|
|Well...tremors I,...|  0.0|
|Ouch! This one wa...|  0.0|
|I've seen some cr...|  0.0|
|"Carriers" follow...|  0.0|
|I had been lookin...|  0.0|
|Effect(s) without...|  0.0|
|This picture star...|  0.0|
|I chose to see th...|  0.0|
|This film has to ...|  0.0|
|I felt brain dead...|  0.0|
|A young scientist...|  0.0|
|Inept, boring, an...|  0.0|
|From the first ti...|  0.0|
|I find it hard to...|  0.0|
|I actually saw Ch...|  0.0|
|I went to school ...|  0.0|
|I haven't seen th...|  0.0|
|I haven't seen an...|  0.0|
|One would think t...|  0.0|
+--------------------+-----+
only showing top 20 rows

## Convert to TF IDF model

- **Term-Frequency Inverse Document Frequency (TF-IDF)**
  - some words are common among many documents
    - common words are less informative because they appear in both classes.
  - **inverse document frequency (IDF)** - measure rarity of each word
    - $IDF(j) = \log \frac{N}{N_j}$
      - $N$ is the number of documents.
      - $N_j$ is the number of documents with word $j$.
    - IDF is:
      - 0 when a word is common to all documents
      - large value when the word appears in few documents
  - **TF-IDF vector:** downscale words that are common in many documents
    - multiply TF and IDF terms
    - $x_j = \frac{w_j}{|D|} \log \frac{N}{N_j}$

In [4]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

def converttf_idf(df, feature_num):
    tokenizer = Tokenizer(inputCol="content", outputCol="words")
    wordsData = tokenizer.transform(df)

    hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=feature_num)
    featurizedData = hashingTF.transform(wordsData)
    # alternatively, CountVectorizer can also be used to get term frequency vectors

    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idfModel = idf.fit(featurizedData)
    rescaledData = idfModel.transform(featurizedData)
    return rescaledData

tran_xtf = converttf_idf(train_df, 500)
test_xtf = converttf_idf(test_df, 500)

train = tran_xtf.select("label", "features")
test = test_xtf.select("label", "features")

train.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(500,[3,14,17,64,...|
|  0.0|(500,[3,5,7,8,10,...|
|  0.0|(500,[0,7,8,11,17...|
|  0.0|(500,[3,7,12,16,1...|
|  0.0|(500,[0,10,11,12,...|
|  0.0|(500,[1,3,8,16,17...|
|  0.0|(500,[6,11,15,17,...|
|  0.0|(500,[3,8,12,17,1...|
|  0.0|(500,[0,7,10,14,1...|
|  0.0|(500,[3,6,14,17,2...|
|  0.0|(500,[1,3,4,5,17,...|
|  0.0|(500,[2,3,17,22,2...|
|  0.0|(500,[3,13,17,33,...|
|  0.0|(500,[8,10,11,12,...|
|  0.0|(500,[3,10,11,17,...|
|  0.0|(500,[14,17,18,22...|
|  0.0|(500,[0,12,17,24,...|
|  0.0|(500,[3,8,17,24,2...|
|  0.0|(500,[3,8,12,17,2...|
|  0.0|(500,[3,10,11,12,...|
+-----+--------------------+
only showing top 20 rows

## Naive Bayes Multinomial

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(500,[3,17,23,26,...|[-622.13189632295...|[0.21479398803266...|       1.0|
|  0.0|(500,[2,17,32,35,...|[-639.74585979688...|[0.95308340974305...|       0.0|
|  0.0|(500,[1,6,7,10,17...|[-1008.1710972640...|[0.86299582670018...|       0.0|
|  0.0|(500,[0,3,7,10,11...|[-2724.6127297877...|[0.94078884648410...|       0.0|
|  0.0|(500,[3,9,10,11,1...|[-1189.8561792820...|[0.06849454795089...|       1.0|
|  0.0|(500,[3,4,8,17,26...|[-1035.7808488607...|[0.78948265063038...|       0.0|
|  0.0|(500,[8,13,17,23,...|[-764.49258409310...|[0.98224276054769...|       0.0|
|  0.0|(500,[17,18,22,27...|[-520.50285576704...|[0.20497001582073...|       1.0|
|  0.0|(500,[0,2,10,13,1...|[-1685.8780175841...|[0.97722965807648...|       0.0|
|  0.0|(500,[0,3

## Logistic Regression

In [55]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# build model and fit
lr = LogisticRegression(maxIter=10)
lrModel = lr.fit(train)

# select example rows to display.
predictions = lrModel.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(500,[3,17,23,26,...|[-0.7165276591984...|[0.32815807383877...|       1.0|
|  0.0|(500,[2,17,32,35,...|[2.15321023530749...|[0.89596838042190...|       0.0|
|  0.0|(500,[1,6,7,10,17...|[1.80688427705520...|[0.85898488917680...|       0.0|
|  0.0|(500,[0,3,7,10,11...|[5.09146906636550...|[0.99388859889409...|       0.0|
|  0.0|(500,[3,9,10,11,1...|[-1.6907287047053...|[0.15568003240277...|       1.0|
|  0.0|(500,[3,4,8,17,26...|[-0.8380469483374...|[0.30194627862082...|       1.0|
|  0.0|(500,[8,13,17,23,...|[2.56740341310094...|[0.92873402647590...|       0.0|
|  0.0|(500,[17,18,22,27...|[-0.3371114133701...|[0.41651131948666...|       1.0|
|  0.0|(500,[0,2,10,13,1...|[1.09957381600853...|[0.75018024303529...|       0.0|
|  0.0|(500,[0,3

In [5]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

objectiveHistory:
0.6931471805599471
+---+---+
|FPR|TPR|
+---+---+
|0.0|0.0|
|1.0|1.0|
|1.0|1.0|
+---+---+

areaUnderROC: 0.5
LogisticRegression_77a1159abbdd

## Logistic Regression with Cross-Validation

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="content", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawfeatures")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")

lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF,idf, lr])


# this grid will have 3*3 = 9 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [200, 800, 1600]) \
    .addGrid(lr.regParam, [0.1, 0.05, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train_df)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test_df)
selected = prediction.select( "content", "label", "prediction")
selected.show()

# compute accuracy on the test set
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
accuracy = evaluator.evaluate(selected)
print("Test set accuracy = " + str(accuracy))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+----------+
|             content|label|prediction|
+--------------------+-----+----------+
|Alan Rickman & Em...|  0.0|       1.0|
|I have seen this ...|  0.0|       1.0|
|In Los Angeles, t...|  0.0|       0.0|
|This film is bund...|  0.0|       0.0|
|I only comment on...|  0.0|       1.0|
|When you look at ...|  0.0|       0.0|
|Rollerskating vam...|  0.0|       0.0|
|Technically abomi...|  0.0|       1.0|
|When Hollywood is...|  0.0|       0.0|
|Respected western...|  0.0|       1.0|
|Worst movie ever ...|  0.0|       0.0|
|I was forced to w...|  0.0|       1.0|
|Well it is about ...|  0.0|       0.0|
|Man with the Scre...|  0.0|       1.0|
|I never read the ...|  0.0|       0.0|
|One of the movies...|  0.0|       1.0|
|Well I had the ch...|  0.0|       0.0|
|This is a movie t...|  0.0|       0.0|
|Dark Harvest is a...|  0.0|       0.0|
|A handful of nubi...|  0.0|       0.0|
+--------------------+-----+----------+
only showing top 20 rows

Test set accur

## Random Forest

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(train)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(train)

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=20)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("accuracy = %g" % (accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|           1.0|  0.0|(500,[3,17,23,26,...|
|           0.0|  0.0|(500,[2,17,32,35,...|
|           1.0|  0.0|(500,[1,6,7,10,17...|
|           0.0|  0.0|(500,[0,3,7,10,11...|
|           1.0|  0.0|(500,[3,9,10,11,1...|
+--------------+-----+--------------------+
only showing top 5 rows

accuracy = 0.67364
RandomForestClassificationModel: uid=RandomForestClassifier_f7add8a63034, numTrees=20, numClasses=2, numFeatures=500

## Multilayer perceptron classifier

In [10]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# specify layers for the neural network:
# input layer of size 500 (features), four intermediate layer
# and output of size 2 (classes)
layers = [500, 128, 32, 8, 2]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
result = model.transform(test)

predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test set accuracy = 0.72296

## Save Result

In [53]:
import datetime

# write prediction file with timestamp file name
def savePrediction(lines, path):
    to_day = datetime.datetime.today()
    to_day = str(to_day)[0:19]
    to_day = re.sub("[^0-9a-zA-Z$]+","_",to_day)
    file_path = path + to_day
    file_path.strip()
    lines.coalesce(1).write.csv(file_path, header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
test_df = (spark.read
      .option("charset", "utf-8")
      .option("header", "true")
      .csv(test_path))

pdf = test_df.toPandas()
pred = predictions.toPandas()
pdf['predictedLabel'] = pred['prediction']
sdf = spark.createDataFrame(pdf)

savePrediction(sdf,'s3://janev/output/project/')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…