In [None]:
## Big Data Management & Analytics
## Part 1 - PageRank for Airports
# Mounting S3 Bucket
access_key = ""
secret_key = ""
encoded_secret_key = secret_key.replace("/", "%2F")
aws_bucket_name = "stanley-assignment2"
mount_name = "S3_Assignment2"

#dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, aws_bucket_name), "/mnt/%s" % mount_name)
display(dbutils.fs.ls("/mnt/%s" % mount_name)) # Uncomment above line to remount

path,name,size
dbfs:/mnt/S3_Assignment2/AirportData.csv,AirportData.csv,856130
dbfs:/mnt/S3_Assignment2/AirportPageRankInput.txt,AirportPageRankInput.txt,80
dbfs:/mnt/S3_Assignment2/Tweets.csv,Tweets.csv,3421431


In [None]:
pageRankInputParams = sc.textFile("/mnt/S3_Assignment2/AirportPageRankInput.txt")
airportSourceDataLocation = pageRankInputParams.take(3)[0] # S3 location of input file 
pageRankIterations = int(pageRankInputParams.take(3)[1]) #Input file has iteration count = 10
airportRankSaveDestination = pageRankInputParams.take(3)[2] # S3 location to save output file

In [None]:
S3DF = spark.read.csv(airportSourceDataLocation, header=True, inferSchema= True)
display(S3DF)

ORIGIN_AIRPORT_ID,ORIGIN,DEST_AIRPORT_ID,DEST,_c4
10006,06A,10056,A43,
10006,06A,10056,A43,
10009,09A,13934,ORI,
10009,09A,10170,ADQ,
10009,09A,13934,ORI,
10010,1B1,10154,ACK,
10011,1G4,10661,BLD,
10011,1G4,10661,BLD,
10016,A03,10056,A43,
10016,A03,10056,A43,


In [None]:
# Minimizing dataframe to only contain nodes (airports) and edges (outlinks)
minimizedAirportDF = S3DF.select(["ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID"]) 
inputAirportRDD = minimizedAirportDF.rdd
inputAirportRDD.collect()

In [None]:
def to_list(a):
    return [a]

def append(a, b):
    a.append(b)
    return a

def extend(a, b):
    a.extend(b)
    return a

# Creating pair RDDs containing origin airport as key and list of all outlinks as value
groupedAirportRDD = inputAirportRDD.combineByKey(to_list, append, extend)
groupedAirportRDD.collect()

In [None]:
# Getting count of distinct airports for later use in PageRank computation
distinctAirportCount = inputAirportRDD.keys().distinct().count()
distinctAirportCount

In [None]:
# Setting initial page ranks to 10 as specified in assignment directions 
ranks = groupedAirportRDD.keys().distinct().map(lambda x : ((x), 10))
ranks.keys().collect()

In [None]:
N = distinctAirportCount
teleport = 0.15
dampFactor = (1-teleport)
t = teleport * (1/N) + dampFactor

# Map 1 - For each airport determine page rank contributions and push to connected airports
# Reduce - For every airport get summation of incoming pagerank contributions and update ranking
# Map 2 - Apply teleportation and damping factor
for i in range(pageRankIterations):
  ranks = groupedAirportRDD.join(ranks)\
    .flatMap(lambda x : [(outLink, float(x[1][1])/len(x[1][0])) for outLink in x[1][0]])\
    .reduceByKey(lambda x,y: x+y).mapValues(lambda x : x * t)
  
sortedRanks = ranks.sortBy(lambda x : -x[1]).toDF(["AIRPORT_ID", "PAGE_RANK"])
display(sortedRanks)

AIRPORT_ID,PAGE_RANK
11292,50.97767417145509
13930,50.1564880132859
10397,38.94840139254797
11298,36.11721041385771
10299,31.62427035098836
12892,30.36342128901541
13204,28.795214814133036
12889,28.392642062753
14107,28.37695385360836
12266,27.557401981018742


In [None]:
# Write to Amazon S3 Bucket
sortedRanks.coalesce(1).write.csv(airportRankSaveDestination)

In [None]:
### Part 2 - Tweet Processing & Classification using Pipelines
## Providing the 2 input parameters required for input and output file - files stored in mounted AMZN S3 bucket
tweetInputFile = "/mnt/S3_Assignment2/Tweets.csv" # S3 location to pull input file via mount
tweetOutputFile = "dbfs:/mnt/S3_Assignment2/TweetClassificationMetrics.txt" # S3 location to save output file

df = spark.read.csv(tweetInputFile, header=True, inferSchema= True)

from pyspark.sql.functions import *
df = df.filter(col("text").isNotNull())

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Tokenizer - doing this before StopWordRemover as it expects array of strings as input 
from pyspark.ml.feature import Tokenizer
tokenizeThem = Tokenizer(inputCol="text", outputCol="tokenText")
# Remove stop words from the text column 
from pyspark.ml.feature import StopWordsRemover
removeThem = StopWordsRemover(inputCol=tokenizeThem.getOutputCol(), outputCol="filteredText")
# TERM HASHING
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol =removeThem.getOutputCol(), outputCol = "termFreqRawVector", numFeatures=1e5) #1e5
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="textFeatures")

# LABEL CONVERSION
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="airline_sentiment", outputCol="sentimentIndex")


In [None]:
preprocessPipeline = Pipeline(stages =[tokenizeThem, removeThem, hashingTF, idf, indexer])
processedData = preprocessPipeline.fit(df).transform(df).select("textFeatures", "sentimentIndex").toDF("features", "label")
display(processedData.collect())

features,label
"Map(vectorType -> sparse, length -> 100000, indices -> List(13018, 64098, 72341), values -> List(3.402770408009837, 8.204740169972537, 8.897887350532482))",1.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(13018, 28946, 43307, 54290, 81517, 86860), values -> List(3.402770408009837, 6.595302257538435, 8.897887350532482, 7.981596618658326, 8.897887350532482, 5.6592088983681))",2.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(10909, 13018, 16537, 20855, 41023, 46248, 48166, 76037), values -> List(5.92747288496278, 3.402770408009837, 3.3295428467713846, 4.0419584461972065, 6.1252986282926996, 7.193139258294056, 4.030352900076899, 7.981596618658326))",1.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(2377, 9972, 32055, 35536, 36216, 45345, 51590, 55208, 59706, 64112, 65993), values -> List(7.981596618658326, 8.897887350532482, 8.492422242424317, 3.2349268703965355, 8.492422242424317, 8.897887350532482, 5.386341911701461, 8.204740169972537, 8.897887350532482, 4.018880498914662, 7.645124382037113))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(13018, 13048, 64112, 83857, 97956), values -> List(3.402770408009837, 5.342539289043067, 4.018880498914662, 5.67901152566428, 4.60742790938409))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(13018, 17123, 24099, 37029, 52509, 84897, 91695), values -> List(3.402770408009837, 4.580399236996171, 6.295197665088097, 1.6394751999371748, 8.897887350532482, 4.746847444633835, 7.981596618658326))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(2866, 13018, 13737, 23234, 25944, 29681, 36414, 45262, 52682, 78157, 81210, 87701, 96451), values -> List(6.882984329990216, 3.402770408009837, 5.6397908125109995, 8.897887350532482, 3.7562237940298213, 4.899686648863283, 7.393809953756207, 5.286969437888257, 4.635207473491166, 3.312513106926582, 8.897887350532482, 7.51159298941259, 4.049770985934001))",2.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(4043, 13018, 27118, 36976, 60208, 64112, 83585, 85804, 88310, 93383, 95171), values -> List(4.9370741809349035, 3.402770408009837, 8.897887350532482, 7.288449438098381, 4.73900426717281, 4.018880498914662, 8.204740169972537, 6.258830020917222, 8.897887350532482, 8.204740169972537, 8.204740169972537))",1.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(1078, 3678, 8996, 13018, 89340), values -> List(6.412980700744481, 7.106127881304427, 8.897887350532482, 3.402770408009837, 8.492422242424317))",2.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(13018, 35177, 50527, 52369, 73753, 86168, 97304), values -> List(3.402770408009837, 4.449370974589766, 3.6121487661913045, 5.784372041322107, 6.951977201477168, 4.10209680493574, 8.204740169972537))",2.0


In [None]:
lr = LogisticRegression(maxIter=1000, regParam=0.1, elasticNetParam=0.0, featuresCol = 'features', labelCol = 'label')
paramGrid = ParamGridBuilder()\
.addGrid(lr.maxIter, [10, 100, 1000])\
.addGrid(lr.regParam, [0.0, 0.1, 0.3])\
.addGrid(lr.elasticNetParam, [0.0, 0.5]).build()

In [None]:
train, test = processedData.randomSplit([0.80, 0.20], 101)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
crossValidate = CrossValidator(estimator=lr, estimatorParamMaps = paramGrid, evaluator = MulticlassClassificationEvaluator(predictionCol="prediction"), numFolds = 5, parallelism =2)
crossValidatedModel = crossValidate.fit(train)

In [None]:
predictionCrossValidate = crossValidatedModel.bestModel.transform(test)
display(predictionCrossValidate.collect())

features,label,rawPrediction,probability,prediction
"Map(vectorType -> sparse, length -> 100000, indices -> List(13, 10519, 11780, 14485, 16415, 17123, 37029, 39279, 45015, 45948, 50527, 65117, 83735), values -> List(8.897887350532482, 8.897887350532482, 5.401379789066001, 6.700662773196262, 1.6615480077781375, 4.580399236996171, 1.6394751999371748, 8.492422242424317, 5.463900146047335, 7.51159298941259, 3.6121487661913045, 8.897887350532482, 8.492422242424317))",0.0,"Map(vectorType -> dense, length -> 3, values -> List(6.013616723342231, 1.3544941386482698, -7.3681108619905))","Map(vectorType -> dense, length -> 3, values -> List(0.9906126424093155, 0.009385828992224786, 1.5285984595863864E-6))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(59, 10157, 18560, 32336, 40269, 47948, 52682, 62707, 64531, 64928, 81850, 83544, 83882, 88869, 92089), values -> List(8.204740169972537, 4.89055416530001, 1.3943221238793562, 7.026085173630889, 7.645124382037113, 5.328354654051111, 4.635207473491166, 5.184315283828173, 5.620742617540304, 3.4907155790723623, 4.333539159064645, 8.897887350532482, 4.73900426717281, 6.15704732660728, 7.51159298941259))",2.0,"Map(vectorType -> dense, length -> 3, values -> List(-3.9936923636175736, 3.4355691649092477, 0.5581231987083267))","Map(vectorType -> dense, length -> 3, values -> List(5.61681791541909E-4, 0.9461884267813019, 0.05324989142715614))",1.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(73, 5160, 18560, 18750, 43273, 47520, 70666, 78244, 87577, 89391, 93372, 93980, 95113), values -> List(8.897887350532482, 7.799275061864371, 1.3943221238793562, 5.953448371366041, 8.897887350532482, 5.980116618448202, 5.05843503793917, 5.953448371366041, 8.492422242424317, 3.9706336653752765, 2.2145264047662065, 8.492422242424317, 6.295197665088097))",0.0,"Map(vectorType -> dense, length -> 3, values -> List(6.366135192696553, -1.4843081159655842, -4.881827076730969))","Map(vectorType -> dense, length -> 3, values -> List(0.9995975489758698, 3.8942243956248163E-4, 1.3028584567664368E-5))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(87, 7330, 16458, 45331, 63939, 71921, 86000, 94947), values -> List(7.51159298941259, 1.8378402612080855, 8.897887350532482, 5.247229109238742, 5.829834415398864, 7.981596618658326, 8.492422242424317, 7.393809953756207))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-6.289576456616868, 6.17439192605497, 0.11518453056189903))","Map(vectorType -> dense, length -> 3, values -> List(3.854359301462517E-6, 0.9976653480979402, 0.0023307975427583815))",1.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(87, 7330, 25317, 26332, 34781, 41485, 44110, 50594, 55619, 62114, 70815, 93372, 94947), values -> List(7.51159298941259, 1.8378402612080855, 7.106127881304427, 6.75782118703621, 7.51159298941259, 8.897887350532482, 5.90215507697849, 4.54760941417318, 3.7591520538089096, 4.908903303968207, 3.402770408009837, 2.2145264047662065, 7.393809953756207))",0.0,"Map(vectorType -> dense, length -> 3, values -> List(0.9793573186143247, 0.5383243261726837, -1.5176816447870087))","Map(vectorType -> dense, length -> 3, values -> List(0.5794750307974744, 0.3728177080868489, 0.047707261115676566))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(94, 7330, 13079, 16369, 23174, 35146, 50988, 54693, 61707, 66405, 76010, 85276, 86403, 89391, 94073, 99256), values -> List(6.3721587062242255, 1.8378402612080855, 3.597073103785857, 6.951977201477168, 5.67901152566428, 3.579767356688265, 6.223738701105953, 5.221586678625405, 7.981596618658326, 5.513497087186707, 3.5133922877433923, 4.91820569663052, 4.122974389957295, 3.9706336653752765, 5.102398161360287, 5.67901152566428))",0.0,"Map(vectorType -> dense, length -> 3, values -> List(6.835322175044029, -2.846234249063399, -3.98908792598063))","Map(vectorType -> dense, length -> 3, values -> List(0.9999176749306133, 6.241913040531868E-5, 1.990593898138496E-5))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(94, 7341, 8661, 16415, 17712, 19451, 27969, 37029, 42392, 86971, 93372, 94422), values -> List(6.3721587062242255, 5.14838327460211, 2.6862837611305883, 1.6615480077781375, 4.065581591960643, 7.51159298941259, 7.288449438098381, 1.6394751999371748, 5.080175024575577, 6.332937993070944, 4.429052809532413, 5.806844897174165))",0.0,"Map(vectorType -> dense, length -> 3, values -> List(8.44001430161986, -3.069182530781149, -5.370831770838711))","Map(vectorType -> dense, length -> 3, values -> List(0.9999889580906183, 1.0037245094099191E-5, 1.0046642876179785E-6))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(94, 11626, 13018, 16537, 32388, 44630, 49535, 70387, 70577, 72290, 91751, 92092), values -> List(6.3721587062242255, 3.8195934079624108, 3.402770408009837, 6.659085693542769, 6.332937993070944, 7.981596618658326, 8.204740169972537, 8.00009510116314, 7.193139258294056, 6.4999920777341105, 5.069245954043386, 7.106127881304427))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(3.60022643566319, 4.953150865462507, -8.553377301125696))","Map(vectorType -> dense, length -> 3, values -> List(0.20539245226236602, 0.7946064654531256, 1.0822845082437493E-6))",1.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(97, 8861, 18560, 23901, 42049, 55619, 58889, 59855, 60831, 64520, 67417, 75693, 84985, 99783), values -> List(8.897887350532482, 8.492422242424317, 1.3943221238793562, 4.081646194464449, 7.106127881304427, 3.7591520538089096, 8.897887350532482, 8.492422242424317, 6.035686469603013, 6.75782118703621, 4.820349906626761, 7.026085173630889, 4.899686648863283, 8.897887350532482))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(4.181058367440338, -0.004901883442052425, -4.176156483998285))","Map(vectorType -> dense, length -> 3, values -> List(0.9847925450619535, 0.014976326992876779, 2.311279451696945E-4))",0.0
"Map(vectorType -> sparse, length -> 100000, indices -> List(99, 1120, 13525, 18560, 35073, 36406, 38771, 49458, 54119, 61929, 70436, 73384, 77209), values -> List(6.951977201477168, 8.897887350532482, 3.89059095770174, 1.3943221238793562, 8.204740169972537, 3.931552315332805, 6.75782118703621, 8.897887350532482, 6.818445808852645, 5.11369771661422, 5.806844897174165, 7.799275061864371, 6.882984329990216))",0.0,"Map(vectorType -> dense, length -> 3, values -> List(10.642743641542111, -7.653774091932292, -2.9889695496098185))","Map(vectorType -> dense, length -> 3, values -> List(0.9999987869071879, 1.1321990430241967E-8, 1.2017708216129848E-6))",0.0


In [None]:
metrics = MulticlassMetrics(predictionCrossValidate.select(['label', 'prediction']).rdd)

# Overall statistics
accuracy = metrics.accuracy
precision = metrics.precision(0.0)
recall = metrics.recall(0.0)
f1Score = metrics.fMeasure(0.0)
print("Summary Stats")
print("Accuracy = %s" % accuracy)
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

In [None]:
# Writing txt file with evaluation metrics to S3 bucket
dbutils.fs.put(tweetOutputFile, "Evaluation Metrics\n Accuracy = " + str(accuracy) +" \n Precision = " + str(precision) + "\n Recall = " + str(recall) + "\n F1 Score = " + str(f1Score))