In [0]:
display(dbutils.fs.ls("/FileStore/tables/"))

path,name,size
dbfs:/FileStore/tables/Corona_NLP_test.csv,Corona_NLP_test.csv,1002494
dbfs:/FileStore/tables/Corona_NLP_train.csv,Corona_NLP_train.csv,10500262
dbfs:/FileStore/tables/Finance_news.csv,Finance_news.csv,672006
dbfs:/FileStore/tables/Spark_assignment_dag.jpg,Spark_assignment_dag.jpg,4035928
dbfs:/FileStore/tables/allnodeslist.txt,allnodeslist.txt,55741422
dbfs:/FileStore/tables/mnm_dataset-1.csv,mnm_dataset-1.csv,1284872
dbfs:/FileStore/tables/mnm_dataset.csv,mnm_dataset.csv,1284872
dbfs:/FileStore/tables/sentiment140.csv,sentiment140.csv,238803811


In [0]:
# create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FinanceNewsClassification").getOrCreate()

In [0]:
# read dataset
df = spark.read.csv('dbfs:/FileStore/tables/Finance_news.csv',inferSchema=True)

In [0]:
# Formatting the data
df = df.withColumnRenamed('_c0','Sentiment').withColumnRenamed('_c1','News')
df.show()

+---------+--------------------+
|Sentiment|                News|
+---------+--------------------+
|  neutral|According to Gran...|
|  neutral|Technopolis plans...|
| negative|The international...|
| positive|With the new prod...|
| positive|According to the ...|
| positive|FINANCING OF ASPO...|
| positive|For the last quar...|
| positive|In the third quar...|
| positive|Operating profit ...|
| positive|Operating profit ...|
| positive|TeliaSonera TLSN ...|
| positive|STORA ENSO , NORS...|
| positive|A purchase agreem...|
| positive|Finnish Talentum ...|
| positive|Clothing retail c...|
| positive|Consolidated net ...|
| positive|Foundries divisio...|
| positive|HELSINKI ( AFX ) ...|
| positive|Incap Contract Ma...|
| positive|Its board of dire...|
+---------+--------------------+
only showing top 20 rows



In [0]:
df.select("Sentiment").distinct().count()

Out[5]: 3

In [0]:
# this shows that there are no missing values in the data, I am good to go now
# df.toPandas()['News'].isnull().sum()

In [0]:
# Feature extraction from text 
# options: CountVectorizer , TFIDF (can be many , but I know only these)

# Pipeline: tokenizer -> StopWordsRemover -> CountVectorizer -> IDF -> LogisticRegression

In [0]:
# import packages
from pyspark.ml.feature import Tokenizer , StopWordsRemover , CountVectorizer , IDF
from pyspark.ml.feature import StringIndexer # for label encoding

In [0]:
# Pipeline stages

tokenizer = Tokenizer(inputCol="News",outputCol="tokenized_news") # we will tokenize the news text

stopwordsremover = StopWordsRemover(inputCol="tokenized_news",outputCol="new_without_stop_words") # remove stop words
 
countVectorizer = CountVectorizer(inputCol="new_without_stop_words",outputCol="news_vector")

idf = IDF(inputCol="news_vector",outputCol="news_idf")

In [0]:
# label encoding
labelEncoder = StringIndexer(inputCol="Sentiment",outputCol="Label").fit(df)

In [0]:
labelEncoder.labels
# so neutral is 0, positive is 1 and negative is 2

Out[11]: ['neutral', 'positive', 'negative']

In [0]:
df = labelEncoder.transform(df)
df.show(5)

+---------+--------------------+-----+
|Sentiment|                News|Label|
+---------+--------------------+-----+
|  neutral|According to Gran...|  0.0|
|  neutral|Technopolis plans...|  0.0|
| negative|The international...|  2.0|
| positive|With the new prod...|  1.0|
| positive|According to the ...|  1.0|
+---------+--------------------+-----+
only showing top 5 rows



In [0]:
sentiment_label = {0.0:"neutral",1.0:"positive",2.0:"negative"}

In [0]:
#df.count() -> 4846
# let us split the data into training and test

trainDF , testDF = df.randomSplit((0.7,0.3),seed = 43)

In [0]:
trainDF.show(5)

+---------+--------------------+-----+
|Sentiment|                News|Label|
+---------+--------------------+-----+
| negative|( ADP News ) - Fe...|  2.0|
| negative|( ADP News ) - Fe...|  2.0|
| negative|( ADP News ) - Ja...|  2.0|
| negative|( ADP News ) - Ja...|  2.0|
| negative|( ADPnews ) - Oct...|  2.0|
+---------+--------------------+-----+
only showing top 5 rows



In [0]:
# Estimator -> takes data , fits it, and gives model
from pyspark.ml.classification import LogisticRegression

In [0]:
logreg = LogisticRegression(featuresCol="news_idf",labelCol="Label")

In [0]:
# Joining the pipeline
from pyspark.ml import Pipeline

pipe = Pipeline(stages=[tokenizer,stopwordsremover,countVectorizer,idf,logreg])

In [0]:
pipe.stages

Out[19]: Param(parent='Pipeline_a7d7aae15a17', name='stages', doc='a list of pipeline stages')

In [0]:
# building model
log_reg_model = pipe.fit(trainDF)

In [0]:
# predictions on testDF
predictions = log_reg_model.transform(testDF) 
# we do not fit the model on testDF
# so the model has not seen the test data or the labels in test data

In [0]:
predictions.columns

Out[31]: ['Sentiment',
 'News',
 'Label',
 'tokenized_news',
 'new_without_stop_words',
 'news_vector',
 'news_idf',
 'rawPrediction',
 'probability',
 'prediction']

In [0]:
predictions.select('Sentiment','Label','prediction','probability').show(10)
# we can see there are 3 wrong predictions in these rows

+---------+-----+----------+--------------------+
|Sentiment|Label|prediction|         probability|
+---------+-----+----------+--------------------+
| negative|  2.0|       2.0|       [0.0,0.0,1.0]|
| negative|  2.0|       2.0|[0.0,1.1199121372...|
| negative|  2.0|       2.0|       [0.0,0.0,1.0]|
| negative|  2.0|       2.0|       [0.0,0.0,1.0]|
| negative|  2.0|       1.0|[0.0,1.0,3.245388...|
| negative|  2.0|       2.0|       [0.0,0.0,1.0]|
| negative|  2.0|       1.0|[0.0,0.9999945247...|
| negative|  2.0|       2.0|       [0.0,0.0,1.0]|
| negative|  2.0|       2.0|       [0.0,0.0,1.0]|
| negative|  2.0|       1.0|[1.45659621627210...|
+---------+-----+----------+--------------------+
only showing top 10 rows



In [0]:
# checking accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Label",predictionCol="prediction",metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

accuracy

Out[34]: 0.6852756454989533

In [0]:
# seems that my model is not very accurate !!

In [0]:
test = spark.createDataFrame(schema=df.schema,data=[('negative',"The bank collapsed,no money, bank has lost",2.0)])
test.show()

+---------+--------------------+-----+
|Sentiment|                News|Label|
+---------+--------------------+-----+
| negative|The bank collapse...|  1.0|
+---------+--------------------+-----+



In [0]:
p = log_reg_model.transform(test)

In [0]:
p.select('Sentiment','Label','prediction','probability').show()

# this is predicting nicely

+---------+-----+----------+-------------+
|Sentiment|Label|prediction|  probability|
+---------+-----+----------+-------------+
| negative|  1.0|       2.0|[0.0,0.0,1.0]|
+---------+-----+----------+-------------+



In [0]:
log_reg_model.save("dbfs:/FileStore/models/FinalcialNewsClassifier")

In [0]:
loaded = log_reg_model.load("dbfs:/FileStore/models/FinalcialNewsClassifier")

In [0]:
loaded

Out[23]: PipelineModel_8a63f04160a2