In [0]:
%pyspark
file_path = 'hdfs://localhost:9000/input/airline'

df = spark.read.csv(file_path, header=True, inferSchema=True)


In [1]:
%pyspark
df.printSchema()

In [2]:
%pyspark
df.count()

In [3]:
%pyspark
z.show(df)

In [4]:
%pyspark
from pyspark.sql.functions import col

df = df.select(
    'Month',
    'DayOfWeek',
    'DayofMonth',
    'Origin',
    'Dest',
    'Cancelled',
    'UniqueCarrier',
    'Distance',
    'FlightNum',
    col('AirTime').cast('int'),
    col('ArrTime').cast('int'),
    col('ArrDelay').cast('int'),
    col('DepTime').cast('int'),
    col('DepDelay').cast('int'),
    col('ActualElapsedTime').cast('int'),
    col('CRSElapsedTime').cast('int'),
)

In [5]:
%pyspark
df.printSchema()

In [6]:
%pyspark
df.createOrReplaceTempView('airline')

In [7]:
%sql

select * from airline limit 10


In [8]:
%sql

select distinct UniqueCarrier from airline


In [9]:
%pyspark
df.select('UniqueCarrier').distinct().show()

In [10]:
%pyspark
from pyspark.sql.functions import *
df.groupBy('UniqueCarrier').agg(count('*')).show()

In [11]:
%sql

select dayofweek, avg(DepDelay), avg(arrdelay)
from airline
group by dayofweek
order by dayofweek;


In [12]:
%pyspark
df.groupBy('DayOfWeek').agg(avg('DepDelay'), avg('ArrDelay')).orderBy('DayOfWeek').show()

In [13]:
%sql
select uniquecarrier, month, count(*), avg(depdelay)
from airline
group by uniquecarrier, month;

In [14]:
%pyspark
df.groupBy('UniqueCarrier', 'Month').agg(count('*'), avg('DepDelay')).show()

In [15]:
%pyspark
df.groupBy('UniqueCarrier')\
    .agg(
        sum('Cancelled').alias('flight_cancelled_count'),
        sum(when(df.Cancelled == 0, 1).otherwise(0)),
        count('*').alias('total_count'),
    ).withColumn('cancel_rate', col('flight_cancelled_count') / col('total_count')*100).show()

In [16]:
%sql
SELECT
    *,
    (flight_cancelled_count / total_count * 100) AS cancel_rate
FROM
(SELECT
    UniqueCarrier,
    SUM(Cancelled) AS flight_cancelled_count,
    SUM(CASE WHEN Cancelled == 0 THEN 1 ELSE 0 END),
    COUNT(*) AS total_count
FROM airline
GROUP BY UniqueCarrier)

In [17]:
%pyspark

origin_df = df.groupBy('Origin').count()

dest_df = df.groupBy('Dest').count()

origin_df.join(dest_df, origin_df.Origin == dest_df.Dest).withColumn('total', origin_df['count'] + dest_df['count']).orderBy(desc('total')).show()


In [18]:
%sql

SELECT *, origin_count + dest_count AS total
FROM
(
(SELECT Origin, COUNT(*) AS origin_count
FROM airline
GROUP BY Origin) AS origin_airline

JOIN

(SELECT Dest, COUNT(*) AS dest_count
FROM airline
GROUP BY Dest) AS dest_airline

ON origin_airline.Origin == dest_airline.Dest
)
ORDER BY total DESC LIMIT 10;


In [19]:
%pyspark

df.groupBy('Origin', 'Dest') \
    .agg(
        avg('ActualElapsedTime').alias('real_time'),
        avg('CRSElapsedTime').alias('crs_time')
    ).withColumn('diff_time', abs(col('real_time')-col('crs_time'))) \
    .orderBy(desc('diff_time')).show()

In [20]:
%sql

SELECT
    *, ABS(real_time - crs_time) AS diff_time
FROM
(SELECT 
    Origin, 
    Dest, 
    AVG(ActualElapsedTime) AS real_time, 
    AVG(CRSElapsedTime) AS crs_time
FROM airline
GROUP BY Origin, Dest)
ORDER BY diff_time DESC