In [None]:
pip install pyspark

In [None]:
# importing required libraries
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.sql import Row

In [None]:
# initializing spark session
sc = SparkContext(appName="PySparkShell")
spark = SparkSession(sc)
    


In [None]:
# define the schema
my_schema = tp.StructType([
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'date',          dataType= tp.StringType(),  nullable= True),
  tp.StructField(name= 'NO_QUERY',       dataType= tp.StringType(),  nullable= True),
  tp.StructField(name= 'username',       dataType= tp.StringType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])
    
 

In [None]:
 
# read the dataset  
my_data = spark.read.csv('/Users/venkatavarunnelakuditi/Downloads/training1600000.csv',
                         schema=my_schema,
                         header=False)



In [None]:
my_data=my_data.drop("id","date","NO_QUERY","username")

In [None]:
# view the data
my_data.show(5)

# print the schema of the file
my_data.printSchema()

In [None]:
# define stage 1: tokenize the tweet text    
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# define stage 3: create a word vector of the size 100
# bag of words count
#stage_3 = CountVectorizer(inputCol="filtered_words", outputCol="cf", vocabSize=20000, minDF=5)
#hashtf = HashingTF(numFeatures=2 ** 16, inputCol="wordsWithStopwordsfree", outputCol="tf")
#stage_4 = IDF(inputCol="cf", outputCol="vector", minDocFreq=5)
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

In [None]:

# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

# fit the pipeline model with the training data
pipelineFit = pipeline.fit(my_data)

In [None]:
pipelineFit.save("preprocessingAndLR")

In [None]:
# read the dataset  
bbc_data = spark.read.csv('/Users/venkatavarunnelakuditi/Downloads/BBCNewsTrain.csv',
                         header=True)



In [None]:
bbc_data.show(10)

In [None]:
bbc_data=bbc_data.drop('ArticleId')
bbc_data=bbc_data.withColumnRenamed("Text","tweet")

In [None]:
from pyspark.sql.functions import when
bbc_data = bbc_data.withColumn("Category", when(bbc_data.Category == "tech",0) \
      .when(bbc_data.Category == "business",1).when(bbc_data.Category == "politics",2).when(bbc_data.Category == "sport",3).when(bbc_data.Category == "entertainment",4).otherwise(0))

#indexers = [StringIndexer(inputCol="Category", outputCol="Category_index")]


#pipeline = Pipeline(stages=indexers)
#bbc_data = pipeline.fit(bbc_data).transform(bbc_data)


In [None]:
bbc_data.show(10)

In [None]:



# define stage 1: tokenize the tweet text    
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# define stage 3: create a word vector of the size 100
#stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector')

# bag of words count
stage_3 = CountVectorizer(inputCol="filtered_words", outputCol="cf", vocabSize=20000, minDF=5)
#hashtf = HashingTF(numFeatures=2 ** 16, inputCol="wordsWithStopwordsfree", outputCol="tf")
stage_4 = IDF(inputCol="cf", outputCol="vector", minDocFreq=5)
# define stage 4: Logistic Regression Model
model = NaiveBayes(featuresCol= 'vector', labelCol= 'Category',smoothing=1.0, modelType='multinomial')




In [None]:
# setup the pipeline
pipelineBBC = Pipeline(stages= [stage_1, stage_2, stage_3,stage_4,model])



In [None]:
trainDF,testDF=bbc_data.randomSplit([0.75, 0.25], seed=2000)
# fit the pipeline model with the training data
BBCpipelineFit = pipelineBBC.fit(trainDF)

In [None]:
test_df = BBCpipelineFit.transform(testDF)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


test_df=test_df.withColumnRenamed("Category","label")
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(test_df)

In [None]:
BBCpipelineFit.save("preprocessingAndCategory")

In [None]:
bbc_data.show(10)

In [None]:
sc.stop()