In [None]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
# configuración local de pyspark

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, col, expr, when, concat, lit, isnan
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline

In [2]:
#from pyspark import SparkContext
#sc = SparkContext("local", "airlines") 

In [3]:
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.appName('airlines').getOrCreate()

In [4]:
#df=spark.read.csv("gdrive/MyDrive/st1800-2266/datasets/airlines.csv", inferSchema=True, header=True)
df=spark.read.csv("../datasets/airlines.csv", inferSchema=True, header=True)

df.show(10)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|
|10005|Delta Air Lines|17-Jun-14| Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|
|10006|Delta Air Lines|17-Jun-14|     USA|     9|Business|    5|        YES|Narita - Bangkok ...|
|10007|Delta Air Lines|14-Jun-14|      UK|     0| Economy|    1|         NO|Flight from NY La...|
|10008|Delta Air Lin

In [5]:
df.createOrReplaceTempView("train_df")
sqlDF = spark.sql("SELECT * FROM train_df")
sqlDF.show()

+-----+---------------+---------+---------+------+--------+-----+-----------+--------------------+
|   id|        airline|     date| location|rating|   cabin|value|recommended|              review|
+-----+---------------+---------+---------+------+--------+-----+-----------+--------------------+
|10001|Delta Air Lines|21-Jun-14| Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|
|10002|Delta Air Lines|19-Jun-14|      USA|     0| Economy|    2|         NO|Flight 2463 leavi...|
|10003|Delta Air Lines|18-Jun-14|      USA|     0| Economy|    1|         NO|Delta Website fro...|
|10004|Delta Air Lines|17-Jun-14|      USA|     9|Business|    4|        YES|"I just returned ...|
|10005|Delta Air Lines|17-Jun-14|  Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|
|10006|Delta Air Lines|17-Jun-14|      USA|     9|Business|    5|        YES|Narita - Bangkok ...|
|10007|Delta Air Lines|14-Jun-14|       UK|     0| Economy|    1|         NO|Flight from NY La...|
|10008|Del

In [6]:
import nltk
import pandas as pd
import numpy as np
import re
import codecs
import matplotlib.pyplot as plt

In [7]:
nltk.download('punkt')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /Users/emontoya/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/emontoya/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [8]:

# stopwords en nltk
from nltk.corpus import stopwords
 
stop_words_nltk = set(stopwords.words('english'))

In [9]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id
import re

In [10]:
rawdata = spark.read.load("../datasets/airlines.csv", format="csv", header=True)
rawdata.printSchema()
rawdata[0]
rawdata = rawdata.fillna({'review': ''})                               # Replace nulls with blank string

# Add Unique ID
rawdata = rawdata.withColumn("uid", monotonically_increasing_id())     # Create Unique ID

# Generate YYYY-MM variable
rawdata = rawdata.withColumn("year_month", rawdata.date.substr(1,6))

# Show rawdata (as DataFrame)
rawdata.show(10)

# Print data types
for type in rawdata.dtypes:
    print(type)

target = rawdata.select(rawdata['rating'].cast(IntegerType()))
target.dtypes

root
 |-- id: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- value: string (nullable = true)
 |-- recommended: string (nullable = true)
 |-- review: string (nullable = true)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+---+----------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|uid|year_month|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+---+----------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|  0|    21-Jun|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|  1|    19-Jun|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Websi

[('rating', 'int')]

In [11]:
################################################################################################
#
#   Text Pre-processing (consider using one or all of the following):
#       - Remove common words (with stoplist)
#       - Handle punctuation
#       - lowcase/upcase
#       - Stemming
#       - Part-of-Speech Tagging (nouns, verbs, adj, etc.)
#
################################################################################################
from pyspark.sql.functions import udf,struct
#from pyspark.sql.types import StructType
def textprep(record):
    text  = record[1]
    uid   = record[0]
    tokens = text.split()
       
    # Custom List of Stopwords - Add your own here
    tokens = [re.sub('[^a-zA-Z0-9]','',word) for word in tokens]                                       # Remove special characters
    tokens = [word.lower() for word in tokens if len(word)>2 and word.lower() not in stop_words_nltk]     # Remove stopwords and words under X length
    return tokens

udf_textprep = udf(textprep , ArrayType(StringType()))
df = df.withColumn("words", udf_textprep(struct([df[x] for x in df.columns])))

#tokenizer = Tokenizer(inputCol="description", outputCol="words")
#wordsData = tokenizer.transform(text)
df.show(5)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+-------------------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|              words|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+-------------------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|[delta, air, lines]|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|[delta, air, lines]|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|[delta, air, lines]|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|[delta, air, lines]|
|10005|Delta Air Lines|17-Jun-14| Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|[delta, air, lines]|
+-----+---------------+---------+--------+------+-------

In [14]:
# Term Frequency Vectorization  - Option 1 (Using hashingTF): 
#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(clean_text)

# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(df)
featurizedData = cvmodel.transform(df)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show(10)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+-------------------+--------------------+--------------------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|              words|         rawFeatures|            features|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+-------------------+--------------------+--------------------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|[delta, air, lines]|(8,[1,2,3],[1.0,1...|(8,[1,2,3],[1.387...|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|[delta, air, lines]|(8,[1,2,3],[1.0,1...|(8,[1,2,3],[1.387...|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|[delta, air, lines]|(8,[1,2,3],[1.0,1...|(8,[1,2,3],[1.387...|
|10004|Delta Air Lines|17-Jun-14|     US