####Install

In [0]:
%sh
pip install nltk
pip install stop-words
pip install pyspellchecker

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.
Collecting stop-words
  Downloading stop-words-2018.7.23.tar.gz (31 kB)
Building wheels for collected packages: stop-words
  Building wheel for stop-words (setup.py): started
  Building wheel for stop-words (setup.py): finished with status 'done'
  Created wheel for stop-words: filename=stop_words-2018.7.23-py3-none-any.whl size=32916 sha256=2f2d9d9e4413a34c0df26834157e74c59d3e21f565a357acfe8217009eb63217
  Stored in directory: /root/.cache/pip/wheels/eb/03/0d/3bd31c983789aeb0b4d5e2ca48590288d9db1586cf5f225062
Successfully built stop-words
Installing collected packages: stop-words
Successfully installed stop-words-2018.7.23
You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.
Collecting pyspellchecker
  Downloading pyspellchecker-0.6.2-py3-none-any.whl (2.7 MB)
Installing collected packages: pyspellchecker
Successfully inst

####TBD

1. Tokenization into words
2. Stop words removal
3. Noise reduction (e.g., removal of punctuation)
4. Stemmin

#### 1. Load Data

In [0]:
import pandas as pd
import numpy as np
# File location and type
file_location = "/FileStore/tables/additional.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", "true").option("sep", delimiter).load(file_location)


pandasDF_news = df.select('news').toPandas()
pandasDF_target = df.select('target').toPandas()

In [0]:
from sklearn.datasets import fetch_20newsgroups
import pandas as pd
import numpy as np
import re
from pyspark.sql import SQLContext

categories = ['rec.autos', 'rec.sport.baseball', 'comp.graphics', 'comp.sys.mac.hardware', 
              'sci.space', 'sci.crypt', 'talk.politics.guns', 'talk.religion.misc']
newsgroup = fetch_20newsgroups(subset='train',categories= categories , shuffle=True, random_state=42)

df_news = pd.DataFrame(data=newsgroup.data, columns=['news']) 

df_news = df_news.append(pandasDF_news, ignore_index=True)

df_news = df_news.replace(re.compile(r"From: \S*@\S*\s?"),"")
df_news = df_news.replace(re.compile('\s+')," ")
df_news = df_news.replace(re.compile("\'"),"")

#df_news = df_news.dropna()

df_target = pd.DataFrame(data=newsgroup.target, columns=['target'])

df_target = df_target.append(pandasDF_target, ignore_index=True)
#df_target = df_target.dropna()

df_target['target']=df_target.target.astype('int64')

df_binary_labels = pd.DataFrame(np.where (df_target < 10, 0, 1), columns=['Binary Label'])

sqlContext = SQLContext(sc)
df_newsgroup = sqlContext.createDataFrame(pd.concat([df_news, df_target, df_binary_labels], axis=1))



#### 2. Pipeline

In [0]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline


regexTokenizer = RegexTokenizer(inputCol="news", outputCol="news_words", pattern="\\W")
add_stopwords = ["http","https","amp","rt","t","c","the","subject","re",'.',',','', 'i i','?','\'\'',"''",'y','*','out','==','df','e.g.','\'m','\[',"'m",':', ')', '(','n\'t', '\'','``','``','\'s', 'https://','-'] 
stopwordsRemover = StopWordsRemover(inputCol="news_words", outputCol="filtered").setStopWords(add_stopwords)
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
string_indexer = StringIndexer(inputCol = "target", outputCol = "target_indexed")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, string_indexer])
pipelineFit = pipeline.fit(df_newsgroup)

dataset = pipelineFit.transform(df_newsgroup)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
#lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
rf = RandomForestClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol(), maxDepth=10)
rf_mod = rf.fit(trainingData)
#lrModel = lr.fit(trainingData)
#predictions = lrModel.transform(testData)
predictions = rf_mod.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol="target_indexed", predictionCol="prediction")
evaluator.evaluate(predictions)

Out[9]: 0.7032946941465295

#### 3. Evaluate ML Model

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="target_indexed", predictionCol="prediction")
evaluator.evaluate(predictions)

accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.7032946941465295
Test Error = 0.29670530585347055


#### 4. Parameter tuning

In [0]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

trainingData1 = trainingData.drop("news_words","news_tf","news_tfidf","rawPrediction","probability","prediction","filtered","rawFeatures","CrossValidator_2b30ebf36fbb_rand")
testData1 = testData.drop("news_words","news_tf","news_tfidf","rawPrediction","probability","prediction","filtered","rawFeatures","CrossValidator_2b30ebf36fbb_rand")


trainingData1.show(5)

#grid for randomforest
grid = (ParamGridBuilder().baseOn([evaluator.metricName, 'precision']).addGrid(rf.maxDepth, [10, 20]).build())


# Instanciation of a CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3)

# Transform the data and train the classifier on the training set
cv_model = cv.fit(trainingData1)

# Transform the data and perform predictions on the test set
df_test_pred1 = cv_model.transform(testData1)

# Evaluate the predictions done on the test set
evaluator.evaluate(df_test_pred1)

+--------------------+------+------------+--------------------+--------------+
|                news|target|Binary Label|            features|target_indexed|
+--------------------+------+------------+--------------------+--------------+
| (Peter van der V...|     0|           0|(10000,[42,66,120...|           5.0|
|( Nikan B Firoozy...|     5|           0|(10000,[55,222,26...|           4.0|
|( Phil Mueller ) ...|     2|           0|(10000,[15,78,207...|           1.0|
|("Imaging Club") ...|     0|           0|(10000,[78,452,48...|           5.0|
|("RWTMS2::MUNIZB"...|     5|           0|(10000,[66,78,86,...|           4.0|
+--------------------+------+------------+--------------------+--------------+
only showing top 5 rows

 To try the new MLflow PySpark ML autologging feature, which will be enabled by default in an upcoming release, call `mlflow.pyspark.ml.autolog()`.
MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to s