**QUESTION 1**


In [0]:
# Import required libraries

from graphframes import *
from graphframes import GraphFrame
from pyspark.sql import SparkSession


In [0]:
# Load the data into a DataFrame
df = spark.read.option("header", "true").csv("dbfs:/FileStore/T_T100D_SEGMENT_US_CARRIER_ONLY_with_optional-1.csv")

# Create vertices DataFrame. Here, vertices should be the airports. So we use origin which represents the airports
vertices = df.select("ORIGIN", "ORIGIN_CITY_NAME").distinct()
vertices = vertices.withColumnRenamed("ORIGIN", "id").withColumnRenamed("ORIGIN_CITY_NAME", "name")

# Create edges DataFrame. Here we use the destination as the edge because it represents the connection between the vertices (airport) => represents flights between the airports
edges = df.select("ORIGIN", "DEST")
edges = edges.withColumnRenamed("ORIGIN", "src").withColumnRenamed("DEST", "dst")

# Create GraphFrame with the created vertices and edges
g = GraphFrame(vertices, edges)


In [0]:
g.vertices.show()

+---+--------------------+
| id|                name|
+---+--------------------+
|ABQ|     Albuquerque, NM|
|AEX|      Alexandria, LA|
|ABL|          Ambler, AK|
|ACT|            Waco, TX|
|ADK|     Adak Island, AK|
|6AK|          Newtok, AK|
|ADS|          Dallas, TX|
|ABR|        Aberdeen, SD|
|A27|      Pogo Mines, AK|
|ACV|   Arcata/Eureka, CA|
|ABE|Allentown/Bethleh...|
|AAF|    Apalachicola, FL|
|AET|       Allakaket, AK|
|ABY|          Albany, GA|
|ADQ|          Kodiak, AK|
|1G4|   Peach Springs, AZ|
|ACY|   Atlantic City, NJ|
|7AK|            Akun, AK|
|ACK|       Nantucket, MA|
|ABI|         Abilene, TX|
+---+--------------------+
only showing top 20 rows



In [0]:
g.edges.show()

+---+---+
|src|dst|
+---+---+
|1G4|1G4|
|1G4|BLD|
|1G4|BLD|
|1NY|BED|
|6AK|BET|
|6AK|WWT|
|7AK|CDB|
|7AK|DUT|
|7AK|DUT|
|7AK|KQA|
|7AK|KQA|
|A27|FAI|
|AAF|OCF|
|ABE|AFW|
|ABE|ATL|
|ABE|AVP|
|ABE|BDR|
|ABE|BNA|
|ABE|CLT|
|ABE|CLT|
+---+---+
only showing top 20 rows



**a) Top 5 nodes with the highest outdegree and the count of the number of outgoing edges in each**

ORD (Chicago O'Hare Intl Airport) has the highest outdegree => it is highly connected and has a large number of direct flights departing to various destinations. Similar inferences can be made for the rest (DENver Intl Airport, ATLanta Intl Airport, Dallas/FortWorth Intl Airport, IAH: George Bush Intercontinental Airport in Houston, TX)

In [0]:
# Calculate outdegrees using the graphframes library
outdeg = g.outDegrees

# Find top 5 nodes with the highest outdegree and display count of each
top_outdeg = outdeg.orderBy('outDegree', ascending=False).limit(5)

# Display the results
top_outdeg.show()


+---+---------+
| id|outDegree|
+---+---------+
|ORD|     1266|
|DEN|     1105|
|ATL|      962|
|DFW|      875|
|IAH|      697|
+---+---------+



**b) Top 5 nodes with the highest indegree and the count of the number of incoming edges in each**

ORD (Chicago O'Hare Intl Airport) has the highest indegree => it is highly connected and has a large number of direct flights arriving from various destinations. Similar inferences can be made for the rest (DENver Intl Airport, ATLanta Intl Airport, Dallas/FortWorth Intl Airport, IAH: George Bush Intercontinental Airport in Houston, TX). Notice that these airports were also the ones with highest outdegrees implying that these airports probbaly handle a high volume of both arrivals and departures and could be connecting different regions.

In [0]:
# Calculate indegrees using the graphframes library
indeg = g.inDegrees

# Find top 5 nodes with the highest indegree and display count of each
top_indeg = indeg.orderBy('inDegree', ascending=False).limit(5)

# Display the results
top_indeg.show()


+---+--------+
| id|inDegree|
+---+--------+
|ORD|    1267|
|DEN|    1098|
|ATL|     965|
|DFW|     879|
|IAH|     712|
+---+--------+



**c) Calculation of PageRank for each of the nodes and output the top 5 nodes with the highest PageRank values**

We see that the top 4 nodes are same as the results obtained for indegrees and outdegrees above indicating that these airports could be very important and influential with many incoming and outgoing connections. Despite ANC (Alaska) having low indegrees and outdegrees compared to the rest of the airports on our list, it may have a higher pagerank than IAH because of the quality of connections or its position in the network of airports. IAH may not have the same influence as ANC and this could be the reason why it has lower pagerank than ANC. If we display the top 10 nodes we, see that IAH is not far behind on the pagerank list (top 9th) and the value is also very close to ANC.


In [0]:
# Calculate PageRank with resetProb = 0.15 (15% chance that a random internet surfer will jump to a random node instead of following a link) and tolerance = 0.01 (stop iterating when the change in values of pagerank between iteration is < 0.01)
pg_r = g.pageRank(resetProbability=0.15, tol=0.01)

# Find top 5 nodes with the highest PageRank values
top_pg_r = pg_r.vertices.orderBy('pagerank', ascending=False).limit(5)

# Show the result
top_pg_r.show()

+---+--------------------+------------------+
| id|                name|          pagerank|
+---+--------------------+------------------+
|ORD|         Chicago, IL|19.542944816964106|
|DEN|          Denver, CO|18.123234208092107|
|ATL|         Atlanta, GA|14.365696396005108|
|DFW|Dallas/Fort Worth...|14.036839277723828|
|ANC|       Anchorage, AK|11.940414424514335|
+---+--------------------+------------------+



In [0]:
# Find top 10 nodes with the highest PageRank values
top_pg_r10 = pg_r.vertices.orderBy('pagerank', ascending=False).limit(10)

# Show the result
top_pg_r10.show()

+---+--------------------+------------------+
| id|                name|          pagerank|
+---+--------------------+------------------+
|ORD|         Chicago, IL|19.542944816964106|
|DEN|          Denver, CO|18.123234208092107|
|ATL|         Atlanta, GA|14.365696396005108|
|DFW|Dallas/Fort Worth...|14.036839277723828|
|ANC|       Anchorage, AK|11.940414424514335|
|PHX|         Phoenix, AZ| 11.40992939916564|
|BET|          Bethel, AK| 11.31216649757965|
|SEA|         Seattle, WA|10.620334485874292|
|IAH|         Houston, TX|10.370943908067842|
|LAS|       Las Vegas, NV|10.344745141714421|
+---+--------------------+------------------+



**d) Strongly connected components algorithm tofind the top 5 components with the largest number of nodes.**

Component 0 is the largest strongly connected component with 760 nodes. This means that a large portion of the airport network in January, 2025 was highly interconnected with many airports having direct or indirect connections to each other. The rest of the components have realtively smaller nodes implying that a small subgraph where these airports are mutually reachable.

In [0]:

# Run Strongly Connected Components algorithm from the graphframes library
scc = g.stronglyConnectedComponents(maxIter=10)

# Count the number of nodes in each strongly connected component
scc_counts = scc.groupBy("component").count()

# Find top 5 components with the largest number of nodes and display them
top_scc = scc_counts.orderBy("count", ascending=False).limit(5)
top_scc.show()

+------------+-----+
|   component|count|
+------------+-----+
|           0|  760|
|523986010114|    3|
|154618822658|    2|
|300647710721|    2|
|163208757249|    2|
+------------+-----+



If we want to take a deeper look into which airports contribute to the resulst obtained above, we look at the airport ids. A lot of airports like AFW (Fortworth Alliance Airport), COS (Colorado Springs Airport), etc. are part of the largest strongly connected component indicating that these airports have stong mutual connectivity with other airports. Component 163208757249 has 2 strongly connected components which implies that this small size suggests connetions between 2 airports. LEE airport is located in Maryland which is the ninth smallest state in the US; which could be the reason why it has a very few strongly connected components.

In [0]:
# Display the top 5 components with their node IDs to see which airports are stongly connected and display the result
top_scc_ids = scc.join(top_scc, "component").select("component", "id", "count")
display(top_scc_ids)

component,id,count
0,AFW,760
0,COS,760
0,HNM,760
0,SDF,760
0,TKF,760
0,TOG,760
0,CLL,760
0,PIR,760
0,CMI,760
0,EVV,760


**e)  Triangle counts algorithm on each of the vertices and output the top 5 vertices with the largest triangle count.**

DFW has the highest triangle counts of 3481 followed by ORD, ATL, DEN, and LAS. Therfore, we can conclude that DFW is part of 3,481 such triangles (three airports where each airport has flights to the other two) and similar conclusions can be made for the other airports on the list. Therefore, these airports have lots of flights connecting it to other airports that help travellers reach many different desired destinations.

In [0]:

# Run Triangle Count algorithm using the graphframes library
tc = g.triangleCount()

# Find top 5 vertices with the largest triangle count and display the results
top_tc = tc.orderBy('count', ascending=False).limit(5)
top_tc.show()



+-----+---+--------------------+
|count| id|                name|
+-----+---+--------------------+
| 3481|DFW|Dallas/Fort Worth...|
| 3325|ORD|         Chicago, IL|
| 3303|ATL|         Atlanta, GA|
| 3246|DEN|          Denver, CO|
| 2937|LAS|       Las Vegas, NV|
+-----+---+--------------------+



**QUESTION 2**


In [0]:
# loading the dataset and converting it into a dataframe
tweets = spark.read.csv("dbfs:/FileStore/Tweets.csv", header=True, inferSchema=True)

# filter/drop the rows that have null as text column
tweets_clean = tweets.filter(tweets.text.isNotNull())


In [0]:
display(tweets_clean)

tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)
570301130888122368,positive,0.3486,,0.0,Virgin America,,jnardino,,0,@VirginAmerica plus you've added commercials to the experience... tacky.,,2015-02-24 11:15:59 -0800,,Pacific Time (US & Canada)
570301083672813571,neutral,0.6837,,,Virgin America,,yvonnalynn,,0,@VirginAmerica I didn't today... Must mean I need to take another trip!,,2015-02-24 11:15:48 -0800,Lets Play,Central Time (US & Canada)
570301031407624196,negative,1.0,Bad Flight,0.7033,Virgin America,,jnardino,,0,"""@VirginAmerica it's really aggressive to blast obnoxious """"entertainment"""" in your guests' faces & they have little recourse""",,2015-02-24 11:15:36 -0800,,Pacific Time (US & Canada)
570300817074462722,negative,1.0,Can't Tell,1.0,Virgin America,,jnardino,,0,@VirginAmerica and it's a really big bad thing about it,,2015-02-24 11:14:45 -0800,,Pacific Time (US & Canada)
570300767074181121,negative,1.0,Can't Tell,0.6842,Virgin America,,jnardino,,0,@VirginAmerica seriously would pay $30 a flight for seats that didn't have this playing.,,,,
570300616901320704,positive,0.6745,,0.0,Virgin America,,cjmcginnis,,0,"@VirginAmerica yes, nearly every time I fly VX this “ear worm” won’t go away :)",,2015-02-24 11:13:57 -0800,San Francisco CA,Pacific Time (US & Canada)
570300248553349120,neutral,0.634,,,Virgin America,,pilot,,0,"@VirginAmerica Really missed a prime opportunity for Men Without Hats parody, there. https://t.co/mWpG7grEZP",,2015-02-24 11:12:29 -0800,Los Angeles,Pacific Time (US & Canada)
570299953286942721,positive,0.6559,,,Virgin America,,dhepburn,,0,"@virginamerica Well, I didn't…but NOW I DO! :-D",,2015-02-24 11:11:19 -0800,San Diego,Pacific Time (US & Canada)
570295459631263746,positive,1.0,,,Virgin America,,YupitsTate,,0,"@VirginAmerica it was amazing, and arrived an hour early. You're too good to me.",,2015-02-24 10:53:27 -0800,Los Angeles,Eastern Time (US & Canada)


In [0]:
# import all the necessary libraries - tokenizer, stop words removed, hasher
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, StringIndexer
from pyspark.ml import Pipeline

# Split (tokenize) the input (text col) and output the tokenized text
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Stop Word Removal: Removing common stop-words using the lib provided
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# Term Hashing: Converting words into term-frequency vectors based on the input col
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="features")

# Label Conversion: Mapping sentiment labels to numerical values for ease 
label_indexer = StringIndexer(inputCol="airline_sentiment", outputCol="label")

# Creating the pipeline with all transformations - tokenizer, stopwords remover, hasher, and label indexer
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, label_indexer])


In [0]:
# fit the above created pipeline to the cleaned tweet data set (apply transformations to the dataset together instead of doing it individually)
processed_tweets = pipeline.fit(tweets_clean).transform(tweets_clean)


In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the transformed data into training and testing set for evaluation. Fix the seed for reproducibility
train_data, test_data = processed_tweets.randomSplit([0.8, 0.2], seed=2)

# Define the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Define a parameter grid for tuning
param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()



# Define a cross validator (with 3 folds) for model selection
cross_val = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=MulticlassClassificationEvaluator(), numFolds=3)

# Train model using cross-validation on training data
cv_model = cross_val.fit(train_data)

# Make predictions on the testing set using the training model
predictions = cv_model.transform(test_data)

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculat eht accuracy of the mabove model
accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)

# Calculate other metrics like precision, recall and f1 score that are important classification metrics
precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)
f1 = evaluator.setMetricName("f1").evaluate(predictions)

# Print evaluation metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Weighted Precision: {precision:.4f}")
print(f"Weighted Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")


Accuracy: 0.7305
Weighted Precision: 0.7168
Weighted Recall: 0.7305
F1 Score: 0.7198


In [0]:
from pyspark.sql.functions import col

# Cast label and prediction columns to integer 
predictions = predictions.withColumn("label", col("label").cast("int"))
predictions = predictions.withColumn("prediction", col("prediction").cast("int"))

# Pivot the counts to get a matrix-like format
confusion_df = predictions.groupBy("label").pivot("prediction").count().na.fill(0).orderBy("label")

# Show the confusion metrics
confusion_df.show()


+-----+----+---+---+
|label|   0|  1|  2|
+-----+----+---+---+
|    0|1582|169| 46|
|    1| 286|267| 64|
|    2| 134| 71|238|
+-----+----+---+---+

