In [1]:
from pyspark import SparkContext
import warnings
warnings.filterwarnings("ignore")
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import split, explode, col, desc, trim

In [6]:
sc = SparkContext('local', 'Top10WordCount')
text = sc.textFile('book.txt')

counts = (text.flatMap(lambda line: line.split())
          .map(lambda word: (word, 1))
          .reduceByKey(lambda x, y: x + y))
top10 = counts.takeOrdered(10, key=lambda x: -x[1])

for word, count in top10:
    print(f"{word}: {count}")
   
sc.stop()

                                                                                

the: 1691
to: 970
and: 968
of: 572
a: 516
that: 505
I: 467
in: 437
he: 432
not: 374


In [9]:
spark = SparkSession.builder.appName('flights').getOrCreate()

text = spark.read.text('book.txt')
words = (text.select(explode(split(col('value'), ' ')).alias('word')).filter(trim(col('word')) != ''))
wordscount = words.groupBy('word').count()

top10 = wordscount.orderBy(desc('count')).limit(10)
top10.show()


+----+-----+
|word|count|
+----+-----+
| the| 1691|
|  to|  970|
| and|  968|
|  of|  572|
|   a|  516|
|that|  505|
|   I|  467|
|  in|  437|
|  he|  432|
| not|  374|
+----+-----+



In [8]:
spark.stop()

In [10]:
airlines = spark.read.csv('2008.csv', header=True, inferSchema=True)
airports = spark.read.csv('airports-data.csv', header=True, inferSchema=True)

                                                                                

In [11]:
airlines.show(3)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   2003|      1955|   2211|      2225|           WN|      335

24/12/05 16:11:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [12]:
airports.show(3)

+--------------------+------+
|                name|Origin|
+--------------------+------+
|      Goroka Airport|   GKA|
|      Madang Airport|   MAG|
|Mount Hagen Kagam...|   HGU|
+--------------------+------+
only showing top 3 rows



In [13]:
merged = airports.join(airlines, on='Origin', how='inner')
merged.show(3)

+------+--------------------+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Origin|                name|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+------+--------------------+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|   IAD|Washington Dulles...|20

In [14]:
uniques = merged.groupBy('Cancelled').agg(F.count('CancellationCode').alias('Count'))
cancels = uniques.filter(uniques['Cancelled'] == 1)
cancels.show()



+---------+------+
|Cancelled| Count|
+---------+------+
|        1|137115|
+---------+------+



                                                                                

In [15]:
merged1 = merged.where(merged['Cancelled'] != 1)
merged1.show(3)

+------+--------------------+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Origin|                name|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+------+--------------------+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|   IAD|Washington Dulles...|20

In [16]:
perday = merged1.groupBy('Year', 'Month', 'DayofMonth', 'name').agg(F.count('FlightNum').alias('FlightsCount'))
avv = perday.groupBy('name').agg({'FlightsCount': 'avg'}).alias('AverageFlightsCount')

avv.show()
spark.stop()



+--------------------+------------------+
|                name| avg(FlightsCount)|
+--------------------+------------------+
|Melbourne Interna...| 5.352459016393443|
|     Eppley Airfield| 72.48907103825137|
|George Bush Inter...|  498.386301369863|
|     Kahului Airport| 53.69672131147541|
|Detroit Metropoli...|435.53551912568304|
|Port Columbus Int...| 91.22950819672131|
|St George Municip...|               9.0|
|Waco Regional Air...| 5.257534246575342|
|Sacramento Intern...|143.57650273224044|
|Fayetteville Regi...| 6.101092896174864|
|       Meadows Field|12.158469945355192|
|Durango La Plata ...| 9.789617486338798|
|Falls Internation...|               1.0|
|Fort Wayne Intern...|15.972677595628415|
|Rafael Hernandez ...|4.1256830601092895|
|Wilkes Barre Scra...| 6.251366120218579|
|Elmira Corning Re...|  3.57103825136612|
|Great Falls Inter...| 5.795081967213115|
|Charlotte Douglas...| 338.9590163934426|
|Atlantic City Int...| 1.036697247706422|
+--------------------+------------

                                                                                

In [17]:
spark = SparkSession.builder.appName('rdd').getOrCreate()

airports = spark.read.csv('airports-data.csv', header=True, inferSchema=True).rdd
airlines = spark.read.csv('2008.csv', header=True, inferSchema=True).rdd

airport_map = airports.map(lambda row: (row['Origin'], row['name']))
flights_per_day = airlines.map(lambda row: ((row['Origin'], row['Year'], row['Month'], row['DayofMonth']), 1))

daily_flight_count = flights_per_day.reduceByKey(lambda a, b: a + b)

flights_per_airport = daily_flight_count.map(lambda row: (row[0][0], (row[1], 1)))

flights_total_per_airport = flights_per_airport.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

average_flights_per_airport = flights_total_per_airport.mapValues(lambda totals: totals[0] / totals[1])

airport_avg_flights = average_flights_per_airport.join(airport_map)

results = airport_avg_flights.collect()
for airport, (avg_flights, name) in results:
    print(f"{name}: {avg_flights:.2f}")



Birmingham-Shuttlesworth International Airport: 64.70
Louis Armstrong New Orleans International Airport: 107.71
Dallas Fort Worth International Airport: 768.53
Brownsville South Padre Island International Airport: 4.00
Mc Allen Miller International Airport: 11.25
Port Columbus International Airport: 93.33
San Antonio International Airport: 123.69
Charleston Air Force Base-International Airport: 38.75
Aspen-Pitkin Co/Sardy Field: 14.50
South Bend Regional Airport: 14.53
Yuma MCAS/Yuma International Airport: 10.58
Tri Cities Airport: 6.94
Oxnard Airport: 3.54
El Paso International Airport: 56.64
Huntsville International Carl T Jones Field: 29.48
Billings Logan International Airport: 12.31
Bob Hope Airport: 85.81
Spokane International Airport: 42.51
Grand Junction Regional Airport: 14.08
Mahlon Sweet Field: 15.71
Gulfport Biloxi International Airport: 23.34
Lewiston Nez Perce County Airport: 1.79
Jack Mc Namara Field Airport: 2.91
Luis Munoz Marin International Airport: 59.22
Evansville R

                                                                                

In [18]:
spark.stop()