## **PySpark Installation**

In [132]:
#from google.colab import drive
#drive.mount('/content/drive')

In [133]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [134]:
#!wget https://downloads.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop2.7.tgz

In [135]:
#!tar -xvzf /content/spark-3.2.2-bin-hadoop2.7.tgz

In [136]:
#!pip install -q findspark

In [137]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop2.7"

In [138]:
import findspark
findspark.init()

In [139]:
findspark.find()

'/content/spark-3.2.2-bin-hadoop2.7'

In [140]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark

In [141]:
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import col,when,isnan,count,lit
from pyspark.ml.feature import RegexTokenizer,StopWordsRemover,HashingTF,IDF,StringIndexer,VectorAssembler,OneHotEncoder
from pyspark.ml.classification import LogisticRegression, NaiveBayes, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

In [142]:
data=spark.read.format('csv').load('/content/airline.csv',header=True)
data.show()

+-------------+--------------------+--------------------+----------------+--------------+----------+--------------------+--------+--------------+--------------+-----+--------------+-------------------+------------------+---------------------+-----------------------------+---------------------+------------------------+------------------+-----------+
| airline_name|                link|               title|          author|author_country|      date|             content|aircraft|type_traveller|   cabin_flown|route|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|ground_service_rating|wifi_connectivity_rating|value_money_rating|recommended|
+-------------+--------------------+--------------------+----------------+--------------+----------+--------------------+--------+--------------+--------------+-----+--------------+-------------------+------------------+---------------------+-----------------------------+---------------------+----

In [143]:
data.collect()[0][2]

'Adria Airways customer review'

In [144]:
data.count()

41455

In [145]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+------------+----+-----+------+--------------+----+-------+--------+--------------+-----------+-----+--------------+-------------------+------------------+---------------------+-----------------------------+---------------------+------------------------+------------------+-----------+
|airline_name|link|title|author|author_country|date|content|aircraft|type_traveller|cabin_flown|route|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|ground_service_rating|wifi_connectivity_rating|value_money_rating|recommended|
+------------+----+-----+------+--------------+----+-------+--------+--------------+-----------+-----+--------------+-------------------+------------------+---------------------+-----------------------------+---------------------+------------------------+------------------+-----------+
|           0|  14|   16|    15|          1610|  18|     13|   40045|         39107|       2956|39144|          4629|               7770|  

In [146]:
cols_to_drop=['link','title','author','author_country','date','aircraft','type_traveller','route','ground_service_rating','wifi_connectivity_rating']
data=data.drop(*cols_to_drop)
data.show()

+-------------+--------------------+--------------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
| airline_name|             content|   cabin_flown|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|value_money_rating|recommended|
+-------------+--------------------+--------------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|adria-airways|Outbound flight F...|       Economy|           7.0|                4.0|               4.0|                  4.0|                          0.0|               4.0|          1|
|adria-airways|Two short hops ZR...|Business Class|          10.0|                4.0|               5.0|                  4.0|                          1.0|               5.0|          1|
|adria-airways|Flew Zurich-Ljubl...|       Economy|    

In [147]:
data.select(data.columns).distinct().show()

+--------------------+--------------------+---------------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|        airline_name|             content|    cabin_flown|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|value_money_rating|recommended|
+--------------------+--------------------+---------------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|       adria-airways|DUB-NAP. Was expe...|           null|          null|               null|              null|                 null|                         null|              null|          0|
|aeroflot-russian-...|Excellent service...|        Economy|           9.0|                4.0|               3.0|                  4.0|                          3.0|               5.0|          1|
|aeroflot-russi

In [148]:
data.count()

41455

In [149]:
data.columns

['airline_name',
 'content',
 'cabin_flown',
 'overall_rating',
 'seat_comfort_rating',
 'cabin_staff_rating',
 'food_beverages_rating',
 'inflight_entertainment_rating',
 'value_money_rating',
 'recommended']

In [150]:
data=data.withColumn('overall_rating',col('overall_rating').cast(IntegerType()))
data=data.withColumn('seat_comfort_rating',col('seat_comfort_rating').cast(IntegerType()))
data=data.withColumn('cabin_staff_rating',col('cabin_staff_rating').cast(IntegerType()))
data=data.withColumn('food_beverages_rating',col('food_beverages_rating').cast(IntegerType()))
data=data.withColumn('inflight_entertainment_rating',col('inflight_entertainment_rating').cast(IntegerType()))
data=data.withColumn('value_money_rating',col('value_money_rating').cast(IntegerType()))
data=data.withColumn('recommended',col('recommended').cast(IntegerType()))
data=data.na.drop()
data.show()

+---------------+--------------------+--------------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|   airline_name|             content|   cabin_flown|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|value_money_rating|recommended|
+---------------+--------------------+--------------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|  adria-airways|Outbound flight F...|       Economy|             7|                  4|                 4|                    4|                            0|                 4|          1|
|  adria-airways|Two short hops ZR...|Business Class|            10|                  4|                 5|                    4|                            1|                 5|          1|
|  adria-airways|Flew Zurich-Ljubl...|       

In [151]:
# Data Split
airline=data.select('airline_name')
review_data=data.select(['content','recommended'])
rating_data=data.select(['cabin_flown','overall_rating','seat_comfort_rating','cabin_staff_rating','food_beverages_rating','inflight_entertainment_rating','value_money_rating','recommended'])

In [152]:
airline.show()

+---------------+
|   airline_name|
+---------------+
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|  adria-airways|
|aegean-airlines|
|aegean-airlines|
|aegean-airlines|
+---------------+
only showing top 20 rows



In [153]:
review_data.show()

+--------------------+-----------+
|             content|recommended|
+--------------------+-----------+
|Outbound flight F...|          1|
|Two short hops ZR...|          1|
|Flew Zurich-Ljubl...|          1|
|Adria serves this...|          1|
|"WAW-SKJ Economy....|          0|
|Sarajevo-Frankfur...|          1|
|I had flights fro...|          1|
|LJU to FRA and ba...|          1|
|On my Ljubljana -...|          1|
|Flights from LJU ...|          1|
|I was very satisf...|          1|
|I was on JP650 th...|          1|
|VIE-LJU LJU-MUC A...|          1|
|If I have to fly ...|          1|
|Istanbul-Ljubljan...|          1|
|Return flight Par...|          1|
|BEG-LJU-BEG with ...|          1|
|Flight on time, r...|          1|
|We flew on flight...|          1|
|Multiple trip wit...|          1|
+--------------------+-----------+
only showing top 20 rows



In [154]:
rating_data.show()

+--------------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|   cabin_flown|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|value_money_rating|recommended|
+--------------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|       Economy|             7|                  4|                 4|                    4|                            0|                 4|          1|
|Business Class|            10|                  4|                 5|                    4|                            1|                 5|          1|
|       Economy|             9|                  5|                 5|                    4|                            0|                 5|          1|
|Business Class|             8|                  4|                 4|      

In [155]:
rating_data=rating_data.withColumn('cabin_flown',when(rating_data.cabin_flown=='Economy',lit(0)).otherwise(lit(1)))
rating_data=rating_data.withColumn('recommended',col('recommended').cast(StringType()))
rating_data.show()

+-----------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|cabin_flown|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|value_money_rating|recommended|
+-----------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+
|          0|             7|                  4|                 4|                    4|                            0|                 4|          1|
|          1|            10|                  4|                 5|                    4|                            1|                 5|          1|
|          0|             9|                  5|                 5|                    4|                            0|                 5|          1|
|          1|             8|                  4|                 4|                    3|     

# **NLP**

In [156]:
def cross_validator_results(model,train_data,test_data):
  
  # Building a parameter grid
  grid=ParamGridBuilder().addGrid(model.maxIter,[0,1]).build()

  # Building a cross validator and fitting the training data
  cv=CrossValidator(estimator=model,estimatorParamMaps=grid,evaluator=BinaryClassificationEvaluator())
  cvModel=cv.fit(train_data)

  #Predictiion upon the testing data 
  predictions=cvModel.transform(test_data)
  
  return predictions

## **For Reviews**

In [157]:
# Creating a pipeline: 
def nlp_dataset_creation(data):

  data=data.withColumn('recommended',col('recommended').cast(StringType()))

  # Tokenizing using Regular Expression
  tokenize=RegexTokenizer(inputCol='content',outputCol='words',pattern='\w')

  # Removing the stop words
  stop_words=['a','the','an','is','at','on','in','.','!','$','%','&','(',')',',',' ','http','https','amp','rt','t','c']
  remove_stop_words=StopWordsRemover(inputCol='words',outputCol='filteredWords').setStopWords(stop_words)

  # Using TFID Vectorizer
  tf=HashingTF(inputCol='filteredWords',outputCol='rawFeatures')
  idf=IDF(inputCol='rawFeatures',outputCol='features')

  # Creating Labels
  label=StringIndexer(inputCol='recommended',outputCol='label')

  # Pipeline
  pipeline=Pipeline(stages=[tokenize,remove_stop_words,tf,idf,label])
  fit=pipeline.fit(data)
  dataset=fit.transform(data)
  return dataset

In [158]:
nlp_dataset=nlp_dataset_creation(review_data)
nlp_dataset.show()

+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----+
|             content|recommended|               words|       filteredWords|         rawFeatures|            features|label|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----+
|Outbound flight F...|          1|[ ,  , /,  , . , ...|[/, . , . , /, . ...|(262144,[182899,1...|(262144,[182899,1...|  0.0|
|Two short hops ZR...|          1|[ ,  ,  , -,  ,  ...|  [-, -, . , . , . ]|(262144,[38640,18...|(262144,[38640,18...|  0.0|
|Flew Zurich-Ljubl...|          1|[ , -,  ,  ,  ,  ...| [-, . , . , . , . ]|(262144,[38640,18...|(262144,[38640,18...|  0.0|
|Adria serves this...|          1|[ ,  ,  ,  ,  ,  ...| [. , . , . , . , ']|(262144,[182899,1...|(262144,[182899,1...|  0.0|
|"WAW-SKJ Economy....|          0|[", -,  , . ,  , ...|[", -, . , . , . ...|(262144,[38640,77...|(262144,[38640,77...|  1.0|


In [159]:
# Data Split
(training_data,testing_data)=nlp_dataset.randomSplit([0.8,0.2],seed=100)
training_data.show()

+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----+
|             content|recommended|               words|       filteredWords|         rawFeatures|            features|label|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----+
| I fly up to Lisb...|          0|[ ,  ,  ,  ,  ,  ...|[  , . , ', . , '...|(262144,[36696,38...|(262144,[36696,38...|  1.0|
| The aircraft was...|          1|[ ,  ,  ,  ,  ,  ...|   [. ,  - , . , . ]|(262144,[106237,1...|(262144,[106237,1...|  0.0|
| it took 3 attemp...|          0|[ ,  ,  ,  ,  ,  ...|               [. "]|(262144,[89746],[...|(262144,[89746],[...|  1.0|
|"(YVR-YYC: Vancou...|          0|["(, -, : ,  ,  ,...|["(, -, : , -, /,...|(262144,[38640,44...|(262144,[38640,44...|  1.0|
|"09/09/14 VS016 f...|          0|[", /, /,  ,  ,  ...|[", /, /,  (, ). ...|(262144,[44646,72...|(262144,[44646,72...|  1.0|


In [160]:
logReg=LogisticRegression(maxIter=20,elasticNetParam=0,regParam=0.3)

In [161]:
# Logistic Regression Model
predictions_logReg_nlp=cross_validator_results(logReg,training_data,testing_data)
predictions_logReg_nlp.show()

+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|             content|recommended|               words|       filteredWords|         rawFeatures|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|"(LAX to LHR on U...|          1|["(,  ,  ,  ,  , ...|["(, ) , -,  (, )...|(262144,[38640,44...|(262144,[38640,44...|  0.0|[-1.0574592825088...|[0.25779528841848...|       1.0|
|"11/11/2013 my co...|          1|[", /, /,  ,  ,  ...|[", /, /, . , . ,...|(262144,[6447,446...|(262144,[6447,446...|  0.0|[-0.3776523974061...|[0.40669323481348...|       1.0|
|"13-16 Sep 2013 E...|          0|[", -,  ,  ,  ,  ...|[", -, . , . ,  "...|(262144,[38640,90...|(262144,[3864

In [162]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='label')
evaluator.evaluate(predictions_logReg_nlp)

0.5520061184178414

## **For Ratings**

In [163]:
print(rating_data.columns)

['cabin_flown', 'overall_rating', 'seat_comfort_rating', 'cabin_staff_rating', 'food_beverages_rating', 'inflight_entertainment_rating', 'value_money_rating', 'recommended']


In [164]:
def classification_data(data):
  
  # Casting recommended as string type
  data=data.withColumn('recommended',col('recommended').cast(StringType()))

  # Vectorizing the data
  vec_transform=VectorAssembler().setInputCols(['cabin_flown', 'overall_rating', 'seat_comfort_rating', 'cabin_staff_rating', 'food_beverages_rating', 'inflight_entertainment_rating', 'value_money_rating']).setOutputCol('features')

  # Setting the labels
  labelIndexer=StringIndexer(inputCol='recommended',outputCol='label')

  # Pipeline 
  pipeline=Pipeline(stages=[vec_transform,labelIndexer])
  fit=pipeline.fit(data)
  dataset=fit.transform(data)
  return dataset
  

In [165]:
clDF=classification_data(rating_data)
clDF.show()

+-----------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+--------------------+-----+
|cabin_flown|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|value_money_rating|recommended|            features|label|
+-----------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+--------------------+-----+
|          0|             7|                  4|                 4|                    4|                            0|                 4|          1|[0.0,7.0,4.0,4.0,...|  0.0|
|          1|            10|                  4|                 5|                    4|                            1|                 5|          1|[1.0,10.0,4.0,5.0...|  0.0|
|          0|             9|                  5|                 5|                    4|                     

In [166]:
# Data Split
(training_data,testing_data)=clDF.randomSplit([0.8,0.2],seed=100)
training_data.show()

+-----------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+-------------+-----+
|cabin_flown|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|value_money_rating|recommended|     features|label|
+-----------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+-------------+-----+
|          0|             1|                  0|                 0|                    0|                            0|                 0|          0|(7,[1],[1.0])|  1.0|
|          0|             1|                  0|                 0|                    0|                            0|                 0|          0|(7,[1],[1.0])|  1.0|
|          0|             1|                  0|                 0|                    0|                            0|                 0|       

In [167]:
# Logistic Regression Model
logReg=LogisticRegression(maxIter=20,elasticNetParam=0,regParam=0.3)

In [168]:
predictions_logReg=cross_validator_results(logReg,training_data,testing_data)
predictions_logReg.show()

+-----------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+--------------------+-----+--------------------+--------------------+----------+
|cabin_flown|overall_rating|seat_comfort_rating|cabin_staff_rating|food_beverages_rating|inflight_entertainment_rating|value_money_rating|recommended|            features|label|       rawPrediction|         probability|prediction|
+-----------+--------------+-------------------+------------------+---------------------+-----------------------------+------------------+-----------+--------------------+-----+--------------------+--------------------+----------+
|          0|             1|                  0|                 0|                    0|                            0|                 0|          0|       (7,[1],[1.0])|  1.0|[-4.5392448249160...|[0.01056858189912...|       1.0|
|          0|             1|                  0|                 0|         

In [169]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='label')
evaluator.evaluate(predictions_logReg)

0.9290076390683782

## **Random Forest Classifier**

In [170]:
rf = RandomForestClassifier(featuresCol='features',labelCol='label')
rfModel = rf.fit(training_data)
predictions = rfModel.transform(testing_data)

In [171]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='label')
evaluator.evaluate(predictions)

0.9488602423824705

## **Naive Bayes**

In [172]:
nb=NaiveBayes(featuresCol='features',labelCol='label')
nbModel=nb.fit(training_data)
predictions=nbModel.transform(testing_data)

In [173]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='label')
evaluator.evaluate(predictions)

0.7153642916630769