# 初始spark設定(每台不同)

In [1]:
import os
import sys
os.environ['SPARK_HOME'] = '/opt/cloudera/parcels/CDH/lib/spark/'

In [2]:
import findspark
findspark.init()

# 欄位資訊

In [190]:
with open("col_info.txt", "r") as f:
    head = f.readline()
    col_info = [ line.strip().split(" ") for line in f.read().split("\n")]

In [191]:
col_info[:5]

[['0', 'Year', 'int', '飛行日期_年', 'X'],
 ['1', 'Month', 'str', '飛行日期_月', 'O', '一月和十二月特別容易取消班機'],
 ['2', 'DayofMonth', 'int', '飛行日期_日', 'O', '11號和13號', '特別容易取消班機'],
 ['3', 'DayOfWeek', 'int', '飛行日期_星期', 'O', '星期二特別容易取消班機,星期日最不容易取消班機'],
 ['4', 'DepTime', 'str', '飛行時間_起飛時間', 'X', "如果班機取消,這個值必為'NA',不可選"]]

# 讀取資料

In [5]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('yarn')
spark = SparkSession(sc)

In [6]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [7]:
from pyspark.sql.types import *

schema = StructType([
    StructField(ci[1], IntegerType() if ci[2]=='int' else StringType() , False) for ci in col_info ])

inspections = sqlContext.read.format('com.databricks.spark.csv') \
        .options(header='true', inferschema='true') \
        .schema(schema) \
        .load('2000.csv')

inspections.createOrReplaceTempView("inspections")

In [99]:
inspections.take(1)

[Row(Year=2000, Month='1', DayofMonth=28, DayOfWeek=5, DepTime='1647', CRSDepTime='1647', ArrTime='1906', CRSArrTime='1859', UniqueCarrier='HP', FlightNum='154', TailNum='N808AW', ActualElapsedTime='259', CRSElapsedTime='252', AirTime='233', ArrDelay=7, DepDelay=0, Origin='ATL', Dest='PHX', Distance=1587, TaxiIn='15', TaxiOut='11', Cancelled=0, CancellationCode='NA', Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA')]

# 查看資料

In [141]:
from pyspark.sql.functions import UserDefinedFunction, count, countDistinct, round, concat, length, sin

In [143]:
import numpy as np

In [179]:
sql_text = """select X.{colname} , X.C as Qty, round(X.C/Y.C*100,2) as Ratio
              from (select {colname}, count({colname}) as C from inspections where Cancelled=1 group by {colname}) X 
              join (select {colname}, count({colname}) as C from inspections group by {colname}) Y on X.{colname}=Y.{colname}
            order by Ratio DESC"""

In [180]:
spark.sql(sql_text.format(colname="UniqueCarrier")).show()

+-------------+-----+-----+
|UniqueCarrier|  Qty|Ratio|
+-------------+-----+-----+
|           UA|44159| 5.69|
|           AS| 7506| 4.87|
|           HP| 9422|  4.3|
|           AA|29677|  4.0|
|           US|28055| 3.75|
|           DL|31569| 3.48|
|           NW|15340| 2.78|
|           TW| 5254| 1.97|
|           CO| 7296| 1.86|
|           AQ|  173| 1.57|
|           WN| 9039| 0.99|
+-------------+-----+-----+



In [181]:
spark.sql(sql_text.format(colname="Origin")).show()

+------+-----+-----+
|Origin|  Qty|Ratio|
+------+-----+-----+
|   DUT|  166|25.08|
|   PIA|    3|16.67|
|   MRY|   51|14.96|
|   HPN|  604|12.07|
|   SCC|   42|10.99|
|   BRW|  105|10.95|
|   DLG|   32| 10.6|
|   AKN|   32| 9.88|
|   BET|   92| 8.88|
|   ADQ|   58| 7.97|
|   LGA| 7591| 7.29|
|   BOS| 7689|  6.8|
|   CID|  342| 6.65|
|   ORD|19318| 6.51|
|   TRI|   70| 6.08|
|   TOL|   54| 5.99|
|   OME|   56| 5.96|
|   BQN|    1| 5.88|
|   RST|  131| 5.86|
|   SWF|   74| 5.79|
+------+-----+-----+
only showing top 20 rows



In [182]:
spark.sql(sql_text.format(colname="Dest")).show()

+----+-----+-----+
|Dest|  Qty|Ratio|
+----+-----+-----+
| DUT|  164|24.77|
| MRY|   49|14.37|
| HPN|  579|11.58|
| SCC|   41|10.73|
| BRW|   96|10.01|
| DLG|   30| 9.93|
| AKN|   31| 9.57|
| BET|   91| 8.78|
| ADQ|   58| 7.97|
| TRI|   83|  7.2|
| LGA| 7287|  7.0|
| ORD|20409| 6.87|
| BOS| 7502| 6.63|
| PIA|    1| 6.25|
| CID|  320| 6.22|
| TOL|   55|  6.1|
| OME|   56| 5.96|
| ANC| 1222| 5.88|
| SWF|   73| 5.71|
| BQN|    1| 5.56|
+----+-----+-----+
only showing top 20 rows



In [183]:
spark.sql(sql_text.format(colname="DayofMonth")).show()

+----------+----+-----+
|DayofMonth| Qty|Ratio|
+----------+----+-----+
|        25|9010| 4.88|
|        11|9014| 4.81|
|        18|8961| 4.78|
|        13|8383| 4.45|
|        31|4695| 4.28|
|        30|7036| 4.14|
|        14|7708| 4.07|
|        20|7294| 3.87|
|        19|7070| 3.78|
|        12|6965| 3.73|
|        17|6953| 3.68|
|        10|6600|  3.5|
|        16|6362| 3.42|
|        26|6323| 3.39|
|        28|6079| 3.24|
|        15|5981| 3.22|
|        21|5886| 3.11|
|        27|5467| 2.91|
|        29|5307| 2.87|
|         2|5248| 2.86|
+----------+----+-----+
only showing top 20 rows



In [168]:
sql_text = """select X.{colname} , X.C as Qty, round(X.C/Y.C*100,2) as Ratio
              from (select {colname}, count({colname}) as C from inspections where Cancelled=1 group by {colname}) X 
              join (select {colname}, count({colname}) as C from inspections group by {colname}) Y on X.{colname}=Y.{colname}
            order by int(X.{colname})"""

In [169]:
spark.sql(sql_text.format(colname="Month")).show()

+-----+-----+-----+
|Month|  Qty|Ratio|
+-----+-----+-----+
|    1|24515| 5.21|
|    2|15188| 3.42|
|    3|10237| 2.12|
|    4|11642| 2.51|
|    5|16513| 3.45|
|    6|18632| 3.95|
|    7|15526| 3.21|
|    8|14991| 3.05|
|    9|10365| 2.24|
|   10|10369| 2.13|
|   11|10912| 2.34|
|   12|28600| 5.95|
+-----+-----+-----+



In [171]:
spark.sql(sql_text.format(colname="DayofWeek")).show()

+---------+-----+-----+
|DayofWeek|  Qty|Ratio|
+---------+-----+-----+
|        1|27812| 3.37|
|        2|31724| 3.83|
|        3|29199| 3.51|
|        4|27804| 3.36|
|        5|27173| 3.28|
|        6|23137| 3.12|
|        7|20641| 2.58|
+---------+-----+-----+



In [187]:
sql_text = """select X.HR , X.C as Qty, round(X.C/Y.C*100,2) as Ratio
              from (select HR, count(HR) as C from (select int({colname}/100) as HR from inspections where Cancelled=1) group by HR) X 
              join (select HR, count(HR) as C from (select int({colname}/100) as HR from inspections) group by HR) Y on X.HR=Y.HR
            order by Ratio DESC"""

In [188]:
spark.sql(sql_text.format(colname="CRSDepTime")).show()

+---+-----+-----+
| HR|  Qty|Ratio|
+---+-----+-----+
| 19|12364| 3.98|
| 17|15933| 3.96|
| 16|11375| 3.58|
| 15|12609| 3.55|
| 18|12136| 3.54|
| 20|10197|  3.4|
|  0|  728| 3.34|
| 14|10754| 3.34|
| 13|12282| 3.32|
|  5|  799| 3.26|
| 10|10902| 3.26|
| 11|11133| 3.18|
| 21| 5459| 3.16|
|  6|11187| 3.06|
| 12|10759| 3.06|
|  7|12214| 2.99|
|  9|10544| 2.92|
|  8|11345|  2.9|
|  2|   34| 2.88|
| 22| 3505| 2.77|
+---+-----+-----+
only showing top 20 rows



In [189]:
spark.sql(sql_text.format(colname="CRSArrTime")).show()

+---+-----+-----+
| HR|  Qty|Ratio|
+---+-----+-----+
|  4|  127| 4.51|
| 20|13339| 3.88|
| 18|13034| 3.79|
| 21|13628| 3.78|
| 19|13782| 3.61|
| 12|11324| 3.36|
| 16|13131| 3.33|
| 23| 6524| 3.32|
|  0| 2381| 3.27|
| 17|11308| 3.27|
|  9| 9653|  3.2|
| 22| 9444| 3.19|
| 14|10837| 3.15|
| 15|10205| 3.13|
| 10|11291| 3.12|
| 13|10160| 3.05|
| 11|10755|  3.0|
|  8| 8433| 2.98|
|  7| 5316| 2.86|
|  1|  474| 2.84|
+---+-----+-----+
only showing top 20 rows



# 截取部分資料

In [192]:
seleced_cols = [ ci[1] for ci in col_info if (ci[4]=="O")and(not "Time" in ci[1])and(not "Distance" in ci[1]) ]
seleced_cols

['Month', 'DayofMonth', 'DayOfWeek', 'UniqueCarrier', 'Origin', 'Dest']

In [193]:
df = inspections.select(seleced_cols+[inspections.Cancelled.alias('label')])

In [194]:
df.columns

['Month',
 'DayofMonth',
 'DayOfWeek',
 'UniqueCarrier',
 'Origin',
 'Dest',
 'label']