Configured Spark Environment

In [None]:
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local").getOrCreate()
sc = SparkContext.getOrCreate()

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-440
Use 'apt autoremove' to remove it.
Suggested packages:
  libnss-mdns fonts-dejavu-extra fonts-ipafont-gothic fonts-ipafont-mincho
  fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jre-headless
0 upgraded, 1 newly installed, 0 to remove and 35 not upgraded.
Need to get 27.5 MB of archives.
After this operation, 101 MB of additional disk space will be used.
Ign:1 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u252-b09-1~18.04
Err:1 http://security.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u252-b09-1~18.04
  404  Not Found [IP: 91.189.88.142 80]
E: Failed to fetch http://security.ubuntu.com/ubuntu/pool/universe/o/openjdk-8/openjdk-8-jre-headless_8u252-b09-1~1

In [None]:
import pyspark.sql.types
import pyspark
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType,IntegerType,StringType
Schema = StructType([StructField('id', IntegerType(), True),
              StructField('text', StringType(), True),
              StructField('target', IntegerType(), True)])

In [None]:
df1 = spark.read.format("csv").option("encoding","gbk").option("header",True).load(r"clean_tweet1.csv", schema=Schema) 
df2 = spark.read.format("csv").option("encoding","gbk").option("header",True).load(r"clean_tweet2.csv", schema=Schema)
df3 = spark.read.format("csv").option("encoding","gbk").option("header",True).load(r"clean_tweet3.csv", schema=Schema) 
df4 = spark.read.format("csv").option("encoding","gbk").option("header",True).load(r"clean_tweet4.csv", schema=Schema)  

In [None]:
df1.show(3)
df2.show(3)
df3.show(3)
df4.show(3)

+---+--------------------+------+
| id|                text|target|
+---+--------------------+------+
|  0|awww bummer shoul...|     0|
|  1|upset can not upd...|     0|
|  2|dive mani time ba...|     0|
+---+--------------------+------+
only showing top 3 rows

+------+--------------------+------+
|    id|                text|target|
+------+--------------------+------+
|400000|    mutha effin bore|     0|
|400001|not spent afterno...|     0|
|400002|My entir friggin ...|     0|
+------+--------------------+------+
only showing top 3 rows

+------+--------------------+------+
|    id|                text|target|
+------+--------------------+------+
|800000|       love guy best|     4|
|800001|im meet one besti...|     4|
|800002|thank twitter add...|     4|
+------+--------------------+------+
only showing top 3 rows

+-------+--------------------+------+
|     id|                text|target|
+-------+--------------------+------+
|1200000|daniel best thing...|     4|
|1200001|need new

In [None]:
# merge 
data = df1.union(df2)
data = data.union(df3)
data = data.union(df4)

In [None]:
data.count()

1599996

In [None]:
data = data.drop('id')
data = data.dropna()
data.count()

1593372

In [None]:
(train_set, test_set) = data.randomSplit([0.8, 0.2], seed = 2000)
print("Train set has total {0} entries with {1:.2f}% negative, {2:.2f}% positive".format(train_set.count(),(train_set[train_set['target'] == 0].count() / ((train_set.count())*1.))*100, (train_set[train_set['target'] == 4].count()/((train_set.count())*1.))*100))


Train set has total 1274792 entries with 50.05% negative, 49.95% positive


Define Different Pipeline  

1. Compare different features  


*   TFIDF Vectorizer *vs*  Count Vectorizer
*   1-gram *vs* 2-gram *vs* 3-gram *vs* combination

2. Compare different models



In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer

# setup the pipeline
# stage: transform the category to numeric
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
def tweet_pipeline(feature_stages,model,train_set,test_set):
  pipeline_stages = [label_stringIdx] + feature_stages + [model]
  pipeline = Pipeline(stages= pipeline_stages)
  # # fit the pipeline for the trainind data
  model = pipeline.fit(train_set)
  # # transform the data
  # # transform_train = model.transform(train_set)
  prediction = model.transform(test_set)

  return prediction


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
cv_idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
tf_idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

# rl= LogisticRegression(featuresCol='features',labelCol='label'，maxIter=100)
lr = LogisticRegression(maxIter=100)
rf = RandomForestClassifier(numTrees=10)


lr_cvidf_prediction = tweet_pipeline([tokenizer, cv, cv_idf],lr,train_set,test_set)
lr_tfidf_prediction = tweet_pipeline([tokenizer, hashtf, tf_idf],lr,train_set,test_set)
# rf_cvidf_prediction = tweet_pipeline([tokenizer, cv, cv_idf],rf,train_set,test_set)
# rf_tfidf_prediction = tweet_pipeline([tokenizer, hashtf, tf_idf],rf,train_set,test_set)



In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(lr_cvidf_prediction)


0.8552051787564793

In [None]:
evaluator.evaluate(lr_tfidf_prediction)

0.8493910477487672

Feature Extaction  
1. Count Vectorizer (unigram/ bigram/ trigram)
2. TFIDF Vectorizer (unigram/ bigram/ trigram)
3. Word2Vec

In [None]:
from pyspark.ml.feature import NGram, VectorAssembler
def build_ngrams_wocs(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)


In [None]:
tokenizer = [Tokenizer(inputCol="text", outputCol="words")]

In [None]:
from pyspark.ml.feature import NGram, VectorAssembler
n = 3
ngrams = [NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)]

In [None]:
cv = [CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)]

In [None]:
idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

In [None]:
assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import NaiveBayes
label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
lr = [LogisticRegression(maxIter=100)]
nb = [NaiveBayes(smoothing=1.0, modelType="bernoulli")]

In [None]:
trigramwocs_pipelineFit = Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr).fit(train_set)

In [None]:
predictions_wocs = trigramwocs_pipelineFit.transform(test_set)

In [None]:
evaluator.evaluate(predictions_wocs)

0.8731309130073437

In [None]:
# save trained model
from pyspark.ml import PipelineModel

modelPath = 'LogisticPipelineModel_3_gram'
trigramwocs_pipelineFit.write().overwrite().save(modelPath)