# Computer practicum №1 (Python)

Work with graph structures


In [0]:
from functools import reduce
from pyspark.sql import functions as F
from graphframes import GraphFrame

## Data preparation



Download data:

In [0]:
display(dbutils.fs.ls("/FileStore/tables/"))

#dbutils.fs.rm("dbfs:/FileStore/tables/airlines.dat", True)
#dbutils.fs.rm("dbfs:/FileStore/tables/airports_extended.dat", True)

path,name,size,modificationTime
dbfs:/FileStore/tables/0555635f-8985-4c28-be84-d8bbc0761baf/,0555635f-8985-4c28-be84-d8bbc0761baf/,0,1729446702428
dbfs:/FileStore/tables/140cad6a-0ce3-4d13-a9e5-9f41377c9103/,140cad6a-0ce3-4d13-a9e5-9f41377c9103/,0,1729446702428
dbfs:/FileStore/tables/24fc2232-cf7a-4dc1-ab57-d23c230dddb6/,24fc2232-cf7a-4dc1-ab57-d23c230dddb6/,0,1729446702428
dbfs:/FileStore/tables/2f3f2f5a-6b94-4467-8e66-3fe62f16de4e/,2f3f2f5a-6b94-4467-8e66-3fe62f16de4e/,0,1729446702428
dbfs:/FileStore/tables/38d64430-f669-4cb4-b8a7-5f541638536d/,38d64430-f669-4cb4-b8a7-5f541638536d/,0,1729446702428
dbfs:/FileStore/tables/5262273f-13e7-4027-8a47-5bf1cade06c8/,5262273f-13e7-4027-8a47-5bf1cade06c8/,0,1729446702428
dbfs:/FileStore/tables/66e293bb-e490-481d-bc14-e0d9790a55fd/,66e293bb-e490-481d-bc14-e0d9790a55fd/,0,1729446702428
dbfs:/FileStore/tables/734cdd6b-e2f5-46f7-8ffc-486fc441149a/,734cdd6b-e2f5-46f7-8ffc-486fc441149a/,0,1729446702428
dbfs:/FileStore/tables/880d904e-91bd-4b58-847f-f61d8a572b9a/,880d904e-91bd-4b58-847f-f61d8a572b9a/,0,1729446702428
dbfs:/FileStore/tables/airlines.dat,airlines.dat,396908,1729377227000


In [0]:

airlines_location = "dbfs:/FileStore/tables/airlines.dat"
routes_location = "dbfs:/FileStore/tables/routes.dat"
airports_location = "dbfs:/FileStore/tables/airports_extended.dat"


airlines_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", ",") \
    .load(airlines_location)

airlines_df.show(5)
print(airlines_df.count())

routes_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", ",") \
    .load(routes_location)

routes_df.show(5)
print(routes_df.count())

airports_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", ",") \
    .load(airports_location)

airports_df.show(5)
print(airports_df.count())


+---+--------------------+-----+----+----+--------+--------------+------+
| id|                name|alias|iata|icao|callsign|       country|active|
+---+--------------------+-----+----+----+--------+--------------+------+
|  1|      Private flight|   \N|   -| N/A|    NULL|          NULL|     Y|
|  2|         135 Airways|   \N|NULL| GNL| GENERAL| United States|     N|
|  3|       1Time Airline|   \N|  1T| RNX| NEXTIME|  South Africa|     Y|
|  4|2 Sqn No 1 Elemen...|   \N|NULL| WYT|    NULL|United Kingdom|     N|
|  5|     213 Flight Unit|   \N|NULL| TFU|    NULL|        Russia|     N|
+---+--------------------+-----+----+----+--------+--------------+------+
only showing top 5 rows

6161
+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|airline|airline_id|source_airport|source_airport_id|destination_airport|destination_airport_id|codeshare|stops|equipment|
+-------+----------+--------------+-----------------+-----

Delete rows with null value in columns source_airport_id, destination_airport_id, airline_id in routes_df:

In [0]:
from pyspark.sql.functions import col

routes_count = routes_df.count()

routes_df = routes_df.filter((col('source_airport_id') != "\\N") & (col('destination_airport_id') != "\\N") & (col('airline_id') != "\\N"))
                                                                  
print(f"{routes_count - routes_df.count()} rows have been deleted")
                                                                       

898 rows have been deleted


Change type columns source_airport_id, destination_airport_id, airline_id

In [0]:
routes_df = routes_df.withColumn("airline_id", routes_df["airline_id"].cast("integer"))
routes_df = routes_df.withColumn("source_airport_id", routes_df["source_airport_id"].cast("integer"))
routes_df = routes_df.withColumn("destination_airport_id", routes_df["destination_airport_id"].cast("integer"))

function for calculate distance


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from math import radians, cos, sin, asin, sqrt

def distance(lat1, lon1, lat2, lon2):
  lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    #haversine formula
  delta_lon = lon2 - lon1
  delta_lat = lat2 - lat1
  hav = sin(delta_lat/2)**2 + cos(lat1) * cos(lat2) * sin(delta_lon/2)**2

  earth_radius = 6371

  return 2 * earth_radius * asin(sqrt(hav))

distance_udf = udf(distance, FloatType())


add column distance to routes_df


In [0]:
from pyspark.sql.functions import lit

routes_df = routes_df.alias("routes") \
    .join(airports_df.alias("src_airports"), col("routes.source_airport_id") == col("src_airports.id"), "inner") \
    .join(airports_df.alias("dst_airports"), col("routes.destination_airport_id") == col("dst_airports.id"), "inner") \
    .select("routes.*",
            col("src_airports.latitude").alias("source_latitude"),
            col("src_airports.longitude").alias("source_longitude"),
            col("dst_airports.latitude").alias("destination_latitude"),
            col("dst_airports.longitude").alias("destination_longitude")
    )


routes_df = routes_df \
    .withColumn("distance",
                lit(distance_udf(routes_df["source_latitude"], 
                     routes_df["source_longitude"], 
                     routes_df["destination_latitude"], 
                     routes_df["destination_longitude"]
                ))) \
    .select("routes.*", col("distance"))


display(routes_df)


airline,airline_id,source_airport,source_airport_id,destination_airport,destination_airport_id,codeshare,stops,equipment,distance
2B,410,AER,2965,KZN,2990,,0,CR2,1506.8257
2B,410,ASF,2966,KZN,2990,,0,CR2,1040.4384
2B,410,ASF,2966,MRV,2962,,0,CR2,448.16492
2B,410,CEK,2968,KZN,2990,,0,CR2,770.5085
2B,410,CEK,2968,OVB,4078,,0,CR2,1338.6315
2B,410,DME,4029,KZN,2990,,0,CR2,715.64935
2B,410,DME,4029,NBC,6969,,0,CR2,892.3828
2B,410,DME,4029,UUA,6160,,0,CR2,951.4322
2B,410,EGO,6156,KGD,2952,,0,CR2,1171.8815
2B,410,EGO,6156,KZN,2990,,0,CR2,1008.2531


Tasks:




## 1) 
Find the airline with the largest sum of all flight distances. The same with the smallest amount.
<br> Знайти авіакомпанію з найбільшою сумою відстаней всіх рейсів. Теж саме з найменшою сумою.

In [0]:
vertices = airports_df.select(F.col("id"), "name", "country")
edges = routes_df.select(F.col("source_airport_id").alias("src"), 
                         F.col("destination_airport_id").alias("dst"), 
                         "airline_id",
                         "distance")
graph = GraphFrame(vertices, edges)


total_distances = graph.edges.groupBy("airline_id").agg({"distance" : "sum"}).withColumnRenamed("sum(distance)", "total_distance")


max_distance_airline = total_distances.orderBy("total_distance", ascending=False).limit(1)

max_distance_airline_id = max_distance_airline.select("airline_id").collect()[0][0]
max_distance_airline_fullname = airlines_df.filter(f"id == {max_distance_airline_id}").select("name").collect()[0][0]
max_distance = max_distance_airline.select("total_distance").collect()[0][0]

print(f"""
    Airline with the largest total distance:
    \t ID: {max_distance_airline_id}
    \t name: {max_distance_airline_fullname}
    \t total distance: {max_distance}
""")


min_distance_airline = total_distances.orderBy("total_distance", ascending=True).limit(1)

min_distance_airline_id = min_distance_airline.select("airline_id").collect()[0][0]
min_distance_airline_fullname = airlines_df.filter(f"id == {min_distance_airline_id}").select("name").collect()[0][0]
min_distance = min_distance_airline.select("total_distance").collect()[0][0]

print(f"""
    Airline with the smallest total distance:
    \t ID: {min_distance_airline_id}
    \t name: {min_distance_airline_fullname}
    \t total distance: {min_distance}
""")



    Airline with the largest total distance:
    	 ID: 24
    	 name: American Airlines
    	 total distance: 5433778.017547607


    Airline with the smallest total distance:
    	 ID: 18700
    	 name: SOCHI AIR CHATER
    	 total distance: 38.16073989868164



## 2)
Find all possible flights between Poland and Belgium (with no more than 2 connections). Use motif and other solutions, compare their results in terms of speed and complexity of implementation.
<br> Знайти всі можливі рейси між Польщею та Бельгією (при не більше ніж 2-х стиковках). Використайте motif та інші варіанти рішень, порівняйте їх результати по швидкодії та складності реалізації.  


In [0]:
vertices = airports_df.select(F.col("id"), "name", "country")
edges = routes_df.select(F.col("source_airport_id").alias("src"), 
                         F.col("destination_airport_id").alias("dst"), 
                         "airline_id")
graph = GraphFrame(vertices, edges)


flights_0_transfer = graph.find("(a)-[e]->(b)") \
    .filter("(a.country = 'Poland') and (b.country = 'Belgium')")

print("0 transfer:")
flights_0_transfer.select("a.country", "a.name", "b.country", "b.name").show(truncate=False)


flights_1_transfer = graph.find("(a)-[e1]->(b); (b)-[e2]->(c)") \
    .filter("(a.country = 'Poland') and (c.country = 'Belgium') and (b.country != 'Belgium') and (b.country != 'Poland')")

print("1 transfer:")
flights_1_transfer.select("a.country", "a.name", "b.country", "b.name", "c.country", "c.name").show(truncate=False)


flights_2_transfer = graph.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)") \
    .filter("(a.country = 'Poland') and (d.country = 'Belgium') and (b.country != 'Belgium') and (b.country != 'Poland') and (c.country != 'Belgium') and (c.country != 'Poland')")

print("2 transfers:")
flights_2_transfer.select("a.country", "a.name", "b.country", "a.name", "c.country", "c.name", "d.country", "d.name").show()


0 transfer:
+-------+--------------------------------------------------------+-------+--------------------------------+
|country|name                                                    |country|name                            |
+-------+--------------------------------------------------------+-------+--------------------------------+
|Poland |John Paul II International Airport Kraków-Balice Airport|Belgium|Brussels South Charleroi Airport|
|Poland |Modlin Airport                                          |Belgium|Brussels South Charleroi Airport|
|Poland |Gdańsk Lech Wałęsa Airport                              |Belgium|Brussels Airport                |
|Poland |Copernicus Wrocław Airport                              |Belgium|Brussels Airport                |
|Poland |Warsaw Chopin Airport                                   |Belgium|Brussels Airport                |
|Poland |Warsaw Chopin Airport                                   |Belgium|Brussels South Charleroi Airport|
+-------+-------

In [0]:
vertices = airports_df.select(F.col("id"), "name", "country")
edges = routes_df.select(F.col("source_airport_id").alias("src"), 
                         F.col("destination_airport_id").alias("dst"), 
                         "airline_id")

graph = GraphFrame(vertices, edges)

bfs_result = graph.bfs(
    fromExpr="country = 'Poland'",
    toExpr="country = 'Belgium'",
    maxPathLength=3
)
bfs_result.show(truncate=False)


+-----------------------------------------------------------------------+-----------------+------------------------------------------------+
|from                                                                   |e0               |to                                              |
+-----------------------------------------------------------------------+-----------------+------------------------------------------------+
|{669, John Paul II International Airport Kraków-Balice Airport, Poland}|{669, 304, 4296} |{304, Brussels South Charleroi Airport, Belgium}|
|{8414, Modlin Airport, Poland}                                         |{8414, 304, 4296}|{304, Brussels South Charleroi Airport, Belgium}|
|{668, Gdańsk Lech Wałęsa Airport, Poland}                              |{668, 302, 2245} |{302, Brussels Airport, Belgium}                |
|{680, Copernicus Wrocław Airport, Poland}                              |{680, 302, 2245} |{302, Brussels Airport, Belgium}                |
|{679, Warsaw

## 3)
Find the airports with the least and most flights.
<br> Знайти аеропорти з найменшою та найбільшою кількістю рейсів. 


In [0]:
vertices = airports_df.distinct()
edges = routes_df \
        .withColumnRenamed("source_airport", "src") \
        .withColumnRenamed("destination_airport", "dst")
graph = GraphFrame(vertices, edges)


flight_counts = edges.groupBy("src").count().withColumnRenamed("count", "outgoing_flights") \
                .join(edges.groupBy("dst").count().withColumnRenamed("count", "incoming_flights"), col("src") == col("dst"), "outer") \
                .fillna(0, subset=["outgoing_flights", "incoming_flights"])

flight_counts = flight_counts.withColumn("total_flights", col("outgoing_flights") + col("incoming_flights"))


most_flights_df = flight_counts.orderBy(col("total_flights").desc()).limit(1)
airoports_with_most_flights_df = vertices.where(col("iata") == most_flights_df.select("dst").first()[0])

print(f"""
    Airport with the most flights:
    \t ID: {airoports_with_most_flights_df.select("id").first()[0]}
    \t name: {airoports_with_most_flights_df.select("name").first()[0]}
    \t country: {airoports_with_most_flights_df.select("country").first()[0]}
    \t flights: {most_flights_df.select("total_flights").first()[0]}
""")


least_flights_df = flight_counts.orderBy(col("total_flights").asc()).limit(1)
airoports_with_least_flights_df = vertices.where(col("iata") == least_flights_df.select("dst").first()[0])

print(f"""
    Airport with the least flights:
    \t ID: {airoports_with_least_flights_df.select("id").first()[0]}
    \t name: {airoports_with_least_flights_df.select("name").first()[0]}
    \t country: {airoports_with_least_flights_df.select("country").first()[0]}
    \t flights: {least_flights_df.select("total_flights").first()[0]}
""")



    Airport with the most flights:
    	 ID: 3682
    	 name: Hartsfield Jackson Atlanta International Airport
    	 country: United States
    	 flights: 1826


    Airport with the least flights:
    	 ID: 7370
    	 name: Breves Airport
    	 country: Brazil
    	 flights: 1



## 4) 
Find the shortest and longest route between 2 given airports. We take the distance between airports as the length of the flight. Routes with transfers (no more than 4) must be taken into account.
<br> Знайти найкоротший та найдовший маршрут між 2 заданими аеропортами. За довжину рейсу беремо відстань між аеропортами. Необхідно врахувати маршрути з пересадками (не більше 4).


In [0]:

vertices = airports_df.select(F.col("id"), "name", "country")
edges = routes_df.select(F.col("source_airport_id").alias("src"), 
                         F.col("destination_airport_id").alias("dst"), 
                         "airline_id",
                         "distance")
graph = GraphFrame(vertices, edges)


def build_motif_transfer(transfer_num):
    return f" (c{transfer_num})-[e{transfer_num + 1}]->(c{transfer_num + 1});"

def build_motif_to_dst(route_num):
    return f" (c{route_num})-[e{route_num + 1}]->(b)"

def build_motif(transfers_count):
    if (transfers_count == 0):
        return "(a)-[e1]->(b)"
    
    route = "(a)-[e1]->(c1);"
    for transfer_num in range(1, transfers_count + 1):
        if(transfer_num == transfers_count):
            route += build_motif_to_dst(transfer_num)
        else:
            route += build_motif_transfer(transfer_num)
    
    return route

def build_find_filter(src_id: int, dst_id: int, column_names: str, shortest_route_distance: float, less: bool):
    distances_sum_in_string_format = ' + '.join(column_names)
    if (less):
        op = "<"
    else:
        op = ">"
    return f"a.id == '{src_id}' AND b.id == '{dst_id}' AND {distances_sum_in_string_format} {op} {shortest_route_distance}"

def find_route_between_2_airports(src_id: int, dst_id: int, max_transfer_count: int, less_route: bool):
    route = None
    if(less_route):
        route_distance = 1000000
    else:
        route_distance = -1

    for transfer_num in range (0, max_transfer_count + 1):
        motif = build_motif(transfer_num)
        routes = graph.find(motif)

        edge_columns = [f"{col}.distance" for col in routes.columns if col.startswith('e')]
        edge_distance_exprs = [F.col(col) for col in edge_columns]

        find_filter = build_find_filter(src_id, dst_id, edge_columns, route_distance, less_route)
        routes = routes.filter(find_filter).withColumn('route_distance', sum(edge_distance_exprs)).orderBy('route_distance').limit(1)

        routes_row = routes.first()
        if(not routes_row is None):
            route = routes
            route_distance = routes_row['route_distance']

    return route, route_distance


src_airport_id = 3682
dst_airport_id = 4335
max_count_transfer = 3

shortest_route, shortest_route_distance = find_route_between_2_airports(src_airport_id, dst_airport_id, max_count_transfer, True)
if(shortest_route is None):
    print(f"The shortest route between airport_id={src_airport_id} and airport_id={dst_airport_id} hasn't been found")
else:
    display(shortest_route)

longest_route, longest_route_distance = find_route_between_2_airports(src_airport_id, dst_airport_id, max_count_transfer, False)
if(longest_route is None):
    print(f"The longest route between airport_id={src_airport_id} and airport_id={dst_airport_id} hasn't been found")
else:
    display(longest_route)


a,e1,b,route_distance
"List(3682, Hartsfield Jackson Atlanta International Airport, United States)","List(3682, 4335, 20710, 428.5227)","List(4335, Key Field, United States)",428.5227


a,e1,c1,e2,c2,e3,c3,e4,b,route_distance
"List(3682, Hartsfield Jackson Atlanta International Airport, United States)","List(3682, 3876, 5347, 364.56708)","List(3876, Charlotte Douglas International Airport, United States)","List(3876, 4034, 24, 121.40196)","List(4034, Greenville Spartanburg International Airport, United States)","List(4034, 3682, 3090, 246.59177)","List(3682, Hartsfield Jackson Atlanta International Airport, United States)","List(3682, 4335, 20710, 428.5227)","List(4335, Key Field, United States)",1161.0835


## 5) 
Find the airline with the largest sum of all flight distances.
<br> Знайти авіакомпанію з найбільшою сумою відстаней всіх рейсів.


In [0]:
vertices = airports_df.select(F.col("id"), "name", "country")
edges = routes_df.select(F.col("source_airport_id").alias("src"), 
                         F.col("destination_airport_id").alias("dst"), 
                         "airline_id",
                         "distance")
graph = GraphFrame(vertices, edges)


total_distances_by_airline = edges.groupBy("airline_id").agg(F.sum("distance").alias("total_distance"))
largest_airline_distance = total_distances_by_airline.orderBy(F.desc("total_distance")).limit(1)

id = largest_airline_distance.select("airline_id").first()[0]
print(f"""
    Airline with the largest sum of all flight distances:
    \t ID: {id}
    \t name: {airlines_df.where(col("id") == id).select("name").first()[0]}
    \t distance: {largest_airline_distance.select("total_distance").first()[0]}
""")



    Airline with the largest sum of all flight distances:
    	 ID: 24
    	 name: American Airlines
    	 distance: 5433778.017547607



## 6) 
Highlight large clusters of airports (at least 5, based on flight information). Analyze the results of creating these clusters.
<br> Виділити великі кластери аеропортів (не менше 5, на основі інформації про рейси). Проаналізувати результати створення цих кластерів. 


In [0]:
vertices = airports_df.select(F.col("id"), "name", "country")
edges = routes_df.select(F.col("source_airport_id").alias("src"), 
                         F.col("destination_airport_id").alias("dst"), 
                         "airline_id",
                         "distance")
graph = GraphFrame(vertices, edges)


sc.setCheckpointDir("dbfs:/FileStore/tables")

clusters = graph.labelPropagation(maxIter=5)
cluster_sizes = clusters.groupBy("label").count()
large_clusters = cluster_sizes.filter(F.col("count") >= 5).orderBy(F.col("count").desc())

airports_with_cluster = clusters.alias("clusters")\
    .join(large_clusters.alias("size"), col("clusters.label") == col("size.label"), "inner") \
    .select("id", "name", "country", "clusters.label", "count")


print("Airoports: ")
airports_with_cluster.orderBy(F.col("count").desc()).show()

print("Airoports clusters {cluster name, count items in this cluster}: ")
large_clusters.show()

print(f"Count clusters: {large_clusters.count()}")


Airoports: 
+----+--------------------+-------------+-----+-----+
|  id|                name|      country|label|count|
+----+--------------------+-------------+-----+-----+
|3877|McCarran Internat...|United States| 3682|  684|
|2893|George F. L. Char...|  Saint Lucia| 3682|  684|
|1813|Lázaro Cárdenas A...|       Mexico| 3682|  684|
|1780|Sangster Internat...|      Jamaica| 3682|  684|
|2718|   Yariguíes Airport|     Colombia| 3682|  684|
|2787| Andahuaylas Airport|         Peru| 3682|  684|
|3722|Palm Beach Intern...|United States| 3682|  684|
|3488|Cincinnati Northe...|United States| 3682|  684|
|4016|Roanoke–Blacksbur...|United States| 3682|  684|
| 120|Regina Internatio...|       Canada| 3682|  684|
|3653|Austin Straubel I...|United States| 3682|  684|
|6066|Mayor General FAP...|         Peru| 3682|  684|
|4103|       Roberts Field|United States| 3682|  684|
|3846|Baton Rouge Metro...|United States| 3682|  684|
| 117|Greater Moncton I...|       Canada| 3682|  684|
|1795|Ingeniero 

## 7)
Find subgraphs with at least 2 connected nodes that do not have connections to other airports.
<br> Знайти підграфи, з кількістю вузлів не менше 2 зв’язаних між собою, які не мають зв’язків з іншими аеропортами.    

In [0]:
vertices = airports_df.select(F.col("id"), "name", "country")
edges = routes_df.select(F.col("source_airport_id").alias("src"),
                         F.col("destination_airport_id").alias("dst"),
                         "airline_id")
graph = GraphFrame(vertices, edges)


sc.setCheckpointDir("dbfs:/FileStore/tables")

components = graph.connectedComponents()
component_counts = components.groupBy("component").count().filter(F.col("count") >= 2)
subgraph_vertices = components.join(component_counts, "component").select("id", "name", "country", "component")


subgraph_vertices.show()
subgraph_vertices.where(col("component") == 3860).show()

print("Subgraphs {name subgraph, count vertices in this subgraph}: ")
component_counts.show()
print(f"Count subgraphs: {component_counts.count()}")


+----+--------------------+----------------+---------+
|  id|                name|         country|component|
+----+--------------------+----------------+---------+
|6448|Grand Canyon West...|   United States|     6448|
|6449|Boulder City Muni...|   United States|     6448|
|   1|      Goroka Airport|Papua New Guinea|        1|
|   2|      Madang Airport|Papua New Guinea|        1|
|   3|Mount Hagen Kagam...|Papua New Guinea|        1|
|   4|      Nadzab Airport|Papua New Guinea|        1|
|   5|Port Moresby Jack...|Papua New Guinea|        1|
|   6|Wewak Internation...|Papua New Guinea|        1|
|   7|  Narsarsuaq Airport|       Greenland|        1|
|   8|Godthaab / Nuuk A...|       Greenland|        1|
|   9|Kangerlussuaq Air...|       Greenland|        1|
|  10|      Thule Air Base|       Greenland|        1|
|  11|    Akureyri Airport|         Iceland|        1|
|  12| Egilsstaðir Airport|         Iceland|        1|
|  15|  Ísafjörður Airport|         Iceland|        1|
|  16|Kefl