In [22]:
import pandas as pd
import boto3
import os
import psycopg2
from pyspark.sql.functions import when,col,udf
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

In [2]:
# Initializing PySpark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
# Spark Config
conf = SparkConf().setAppName("ML_App")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName('ML_App').getOrCreate()
sqlContext = SQLContext(sc)

In [3]:
# Connect to DB
engine = psycopg2.connect(
    database="kikipng",
    user="postgres",
    password="qwerty123",
    host="kikipng.ckoss6hrgu4d.eu-west-2.rds.amazonaws.com",
    port=5432)
query = """select * from airline_postgres_schema.skytrax_reviews"""
df_skytrax  = pd.read_sql(query, engine)

# Convert pandas datasframe to spark dataframe
dfs_skytrax = spark.createDataFrame(df_skytrax)

# Drop missing values
dfs_skytrax = dfs_skytrax.dropna()
dfs_skytrax.show(5)

  """)


+---------+---------------+--------------+--------------------+------------------+----------------+---------------------+----------+--------------+-----------+--------------------+----------+---------------+----------------------+--------------+------------+------------------+-------------------+---------------------+--------------+
|review_id|        airline|overall_rating|        review_title|     review_author|review_sentiment|review_date_published|  aircraft|traveller_type|cabin_flown|               route|date_flown|value_for_money|inflight_entertainment|ground_service|seat_comfort|food_and_beverages|cabin_staff_service|wifi_and_connectivity|recommendation|
+---------+---------------+--------------+--------------------+------------------+----------------+---------------------+----------+--------------+-----------+--------------------+----------+---------------+----------------------+--------------+------------+------------------+-------------------+---------------------+-----------

In [4]:
# Drop review_author column to anonymize data 
dfs_skytrax = dfs_skytrax.drop("review_author")

# Drop review_title, review_date_published as they are not going to be used in the analysis
dfs_skytrax = dfs_skytrax.drop("review_title","airline","review_id")



In [5]:
# Create a column that has 1 if the flight is direct and 0 if not
direct_flight = when(col("route").contains("via"), 0).otherwise(1)

In [6]:
# drop route column
dfs_skytrax = dfs_skytrax.withColumn("direct_flight",direct_flight)\
            .drop("route")


In [7]:
# Create function that does one hot encodings
def get_one_hot_encodings(dfs, column_name):   
    '''
    Input: spark dataframe and name of column we want to one-hot-encode
    Output: spark dataframe with one-hot-encoding on the column requested
    '''
    unique_values = dfs.select(column_name)\
                        .distinct()\
                        .rdd\
                        .flatMap(lambda x: x).collect()

    # for each of the gathered values create a new column 
    for unique_value in unique_values:
        function = udf(lambda item: 
                       1 if item == unique_value else 0, 
                       IntegerType())
        new_column_name = column_name + '_' + unique_value.lower().replace(' ','_')
        dfs = dfs.withColumn(new_column_name, function(col(column_name)))
    dfs = dfs.drop(column_name)
    return dfs

In [8]:
# One-Hot-Encode cabin_flown, traveller_type and airline
column_names = ['cabin_flown','traveller_type', 'aircraft']
for column_name in column_names:
    dfs_skytrax = get_one_hot_encodings(dfs_skytrax,column_name )


In [21]:
# Now that all categorical columns have been converted to numeric assemble all numeric columns
numericCols = [item[0] for item in dfs_skytrax.dtypes if item[1]== 'bigint' or item[1]== 'int' or item[1]== 'double' or item[1]== 'float' ]#[1:]
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
label_stringIdx = StringIndexer(inputCol = 'recommendation', outputCol = 'labelIndex')
pipeline = Pipeline(stages=[assembler, label_stringIdx])
preprocessing_pipeline_model = pipeline.fit(dfs_skytrax)
dfs_skytrax = preprocessing_pipeline_model.transform(dfs_skytrax)


train, test= dfs_skytrax.randomSplit([0.8, 0.2], seed=42)

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'labelIndex')
rfModel = rf.fit(train)

In [19]:

def uploadDirectory(path):
    session = boto3.Session(
        aws_access_key_id='AKIA2X2ER6BK2I4N3NO4',
        aws_secret_access_key='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'
    )

    s3 = session.resource('s3').Bucket('airline-project-kikipng')
    bucketname = 'airline-project-kikipng'
    for root,dirs,files in os.walk(path):
        for file in files:
            print(file)
            s3.upload_file(os.path.join(root,file),os.path.join(root,file))


rfModel.write().overwrite().save('model')
uploadDirectory('model')


part-00001-00445863-aeeb-49ca-9c99-18dd38218288-c000.snappy.parquet
_SUCCESS
._SUCCESS.crc
part-00000-00445863-aeeb-49ca-9c99-18dd38218288-c000.snappy.parquet
.part-00001-00445863-aeeb-49ca-9c99-18dd38218288-c000.snappy.parquet.crc
.part-00000-00445863-aeeb-49ca-9c99-18dd38218288-c000.snappy.parquet.crc
part-00000
.part-00000.crc
_SUCCESS
._SUCCESS.crc
part-00001-4173302a-7d20-429f-982b-7ccfb7377054-c000.snappy.parquet
_SUCCESS
._SUCCESS.crc
.part-00000-4173302a-7d20-429f-982b-7ccfb7377054-c000.snappy.parquet.crc
part-00000-4173302a-7d20-429f-982b-7ccfb7377054-c000.snappy.parquet
.part-00001-4173302a-7d20-429f-982b-7ccfb7377054-c000.snappy.parquet.crc


In [23]:
predictions = rfModel.transform(test)
predictions.select('labelIndex', 'rawPrediction', 'prediction', 'probability').show(10)


+----------+-------------+----------+-----------+
|labelIndex|rawPrediction|prediction|probability|
+----------+-------------+----------+-----------+
|       0.0|   [20.0,0.0]|       0.0|  [1.0,0.0]|
|       0.0|   [20.0,0.0]|       0.0|  [1.0,0.0]|
|       0.0|   [20.0,0.0]|       0.0|  [1.0,0.0]|
|       0.0|   [19.0,1.0]|       0.0|[0.95,0.05]|
|       1.0|   [11.0,9.0]|       0.0|[0.55,0.45]|
|       0.0|   [20.0,0.0]|       0.0|  [1.0,0.0]|
|       0.0|   [15.0,5.0]|       0.0|[0.75,0.25]|
|       0.0|   [20.0,0.0]|       0.0|  [1.0,0.0]|
|       0.0|   [16.0,4.0]|       0.0|  [0.8,0.2]|
|       1.0|   [1.0,19.0]|       1.0|[0.05,0.95]|
+----------+-------------+----------+-----------+



In [24]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))


Accuracy = 0.8862745098039215
Test Error = 0.11372549019607847


In [25]:
preds_and_labels = predictions.select(['prediction','labelIndex']).withColumn('labelIndex', F.col('labelIndex').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','labelIndex'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())


[[8. 0.]
 [1. 1.]]
