In [1]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext
import numpy as np

conf = SparkConf().setMaster('local').setAppName('Mi Programa')
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format('csv').option('header','true').option('inferSchema','true').load('Z:/csv/2008.csv')
dfspark = dfspark.sample(fraction=0.001,withReplacement=False)
dfspark = dfspark.withColumn('ArrDelay',dfspark['ArrDelay'].cast('integer'))

df2 = dfspark.na.drop(subset=['ArrDelay','DepDelay','Distance'])
df2 = df2.filter('ArrDelay is not NULL')
df2 = df2.dropDuplicates()

In [2]:
df2.select('ArrDelay').filter('ArrDelay > 60').take(5)
#df2.select('ArrDelay').filter('ArrDelay > 60').take(1)[0]

[Row(ArrDelay=88),
 Row(ArrDelay=419),
 Row(ArrDelay=192),
 Row(ArrDelay=396),
 Row(ArrDelay=81)]

In [3]:
df2.filter('ArrDelay > 60').take(5)

[Row(Year=2008, Month=2, DayofMonth=5, DayOfWeek=2, DepTime='1443', CRSDepTime=1255, ArrTime='1608', CRSArrTime=1440, UniqueCarrier='WN', FlightNum=2474, TailNum='N368SW', ActualElapsedTime='85', CRSElapsedTime='105', AirTime='70', ArrDelay=88, DepDelay='108', Origin='PHL', Dest='CMH', Distance=405, TaxiIn='3', TaxiOut='12', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='0', WeatherDelay='0', NASDelay='0', SecurityDelay='0', LateAircraftDelay='88'),
 Row(Year=2008, Month=2, DayofMonth=12, DayOfWeek=2, DepTime='1607', CRSDepTime=1030, ArrTime='1723', CRSArrTime=1024, UniqueCarrier='YV', FlightNum=7340, TailNum='N511MJ', ActualElapsedTime='136', CRSElapsedTime='54', AirTime='122', ArrDelay=419, DepDelay='337', Origin='SBN', Dest='ORD', Distance=84, TaxiIn='6', TaxiOut='8', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='0', WeatherDelay='0', NASDelay='419', SecurityDelay='0', LateAircraftDelay='0'),
 Row(Year=2008, Month=2, DayofMonth=25, DayOfWeek=1, DepTime

In [4]:
media = np.mean(df2.select('ArrDelay').collect())
df2.select('ArrDelay').rdd.map(lambda x: (x-media)**2).take(10)

[array([34.78908734]),
 array([221.95718187]),
 array([734.50607404]),
 array([221.95718187]),
 array([6416.29396178]),
 array([221.95718187]),
 array([146.45289827]),
 array([26.02808291]),
 array([259.26707847]),
 array([436.73591156])]

In [5]:
df2.groupBy('DayOfWeek').count().show()

+---------+-----+
|DayOfWeek|count|
+---------+-----+
|        1|  969|
|        6|  856|
|        3|  980|
|        5|  979|
|        4| 1010|
|        7|  987|
|        2|  989|
+---------+-----+



In [6]:
df2.groupBy('DayOfWeek').mean('ArrDelay').show()

+---------+------------------+
|DayOfWeek|     avg(ArrDelay)|
+---------+------------------+
|        1| 8.208462332301341|
|        6|3.1869158878504673|
|        3| 7.052040816326531|
|        5|11.204290091930542|
|        4| 7.691089108910891|
|        7| 9.267477203647417|
|        2| 8.082912032355916|
+---------+------------------+



In [7]:
df2.select('Origin').rdd.distinct().take(5)

[Row(Origin='PIT'),
 Row(Origin='FSM'),
 Row(Origin='SMF'),
 Row(Origin='CWA'),
 Row(Origin='SPI')]

In [8]:
df2.select('Origin').rdd.distinct().count()

249

In [9]:
df2.orderBy(df2.ArrDelay.desc()).take(5)

[Row(Year=2008, Month=6, DayofMonth=5, DayOfWeek=4, DepTime='816', CRSDepTime=2229, ArrTime='937', CRSArrTime=2336, UniqueCarrier='NW', FlightNum=445, TailNum='N8945E', ActualElapsedTime='81', CRSElapsedTime='67', AirTime='65', ArrDelay=601, DepDelay='587', Origin='MSP', Dest='GFK', Distance=284, TaxiIn='4', TaxiOut='12', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='0', WeatherDelay='391', NASDelay='14', SecurityDelay='0', LateAircraftDelay='196'),
 Row(Year=2008, Month=5, DayofMonth=27, DayOfWeek=2, DepTime='1650', CRSDepTime=935, ArrTime='1948', CRSArrTime=1215, UniqueCarrier='AA', FlightNum=1648, TailNum='N4XTAA', ActualElapsedTime='118', CRSElapsedTime='100', AirTime='84', ArrDelay=453, DepDelay='435', Origin='ABQ', Dest='DFW', Distance=569, TaxiIn='12', TaxiOut='22', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='0', WeatherDelay='435', NASDelay='18', SecurityDelay='0', LateAircraftDelay='0'),
 Row(Year=2008, Month=8, DayofMonth=14, DayOfWeek=4, Dep

In [10]:
df2.select('ArrDelay').describe().show()

+-------+------------------+
|summary|          ArrDelay|
+-------+------------------+
|  count|              6770|
|   mean| 7.898227474150665|
| stddev|36.249465747597114|
|    min|               -59|
|    max|               601|
+-------+------------------+



In [11]:
df2.select('Origin').rdd.countByValue()

defaultdict(int,
            {Row(Origin='SJC'): 57,
             Row(Origin='MKE'): 29,
             Row(Origin='ATL'): 417,
             Row(Origin='BDL'): 32,
             Row(Origin='PHL'): 94,
             Row(Origin='MSP'): 151,
             Row(Origin='IAD'): 86,
             Row(Origin='TUL'): 24,
             Row(Origin='PNS'): 10,
             Row(Origin='ORD'): 329,
             Row(Origin='SMX'): 3,
             Row(Origin='DCA'): 78,
             Row(Origin='DFW'): 268,
             Row(Origin='SFO'): 125,
             Row(Origin='MIA'): 76,
             Row(Origin='SYR'): 14,
             Row(Origin='RIC'): 15,
             Row(Origin='JFK'): 99,
             Row(Origin='SLC'): 128,
             Row(Origin='DTW'): 181,
             Row(Origin='IAH'): 180,
             Row(Origin='BWI'): 107,
             Row(Origin='CVG'): 89,
             Row(Origin='HOU'): 46,
             Row(Origin='DEN'): 235,
             Row(Origin='CMH'): 24,
             Row(Origin='STL'): 51,
  

In [12]:
df2.select('Origin').rdd.max() #df2.select('Origin').rdd.max()[0]

Row(Origin='YUM')

In [13]:
df2.select('Origin').rdd.collect()

[Row(Origin='SJC'),
 Row(Origin='MKE'),
 Row(Origin='ATL'),
 Row(Origin='BDL'),
 Row(Origin='PHL'),
 Row(Origin='MSP'),
 Row(Origin='IAD'),
 Row(Origin='TUL'),
 Row(Origin='PNS'),
 Row(Origin='ORD'),
 Row(Origin='SMX'),
 Row(Origin='DCA'),
 Row(Origin='DFW'),
 Row(Origin='SFO'),
 Row(Origin='MIA'),
 Row(Origin='SYR'),
 Row(Origin='RIC'),
 Row(Origin='JFK'),
 Row(Origin='ATL'),
 Row(Origin='ORD'),
 Row(Origin='SLC'),
 Row(Origin='DTW'),
 Row(Origin='IAH'),
 Row(Origin='ATL'),
 Row(Origin='BWI'),
 Row(Origin='SLC'),
 Row(Origin='ATL'),
 Row(Origin='CVG'),
 Row(Origin='DTW'),
 Row(Origin='HOU'),
 Row(Origin='DEN'),
 Row(Origin='ATL'),
 Row(Origin='ATL'),
 Row(Origin='MIA'),
 Row(Origin='SJC'),
 Row(Origin='CMH'),
 Row(Origin='STL'),
 Row(Origin='LAX'),
 Row(Origin='ATL'),
 Row(Origin='PHX'),
 Row(Origin='MSY'),
 Row(Origin='SBN'),
 Row(Origin='DFW'),
 Row(Origin='IAH'),
 Row(Origin='LAS'),
 Row(Origin='DEN'),
 Row(Origin='LAS'),
 Row(Origin='ORD'),
 Row(Origin='SFO'),
 Row(Origin='BWI'),


In [14]:
df2.crosstab('DayOfWeek','Origin').take(2)

[Row(DayOfWeek_Origin='5', ABE=1, ABI=0, ABQ=6, ACT=0, ACV=0, AEX=0, AGS=0, ALB=2, AMA=2, ANC=3, ASE=1, ATL=67, ATW=2, AUS=7, AVL=0, AZO=3, BDL=6, BET=1, BFL=1, BGM=0, BHM=1, BIL=1, BIS=1, BMI=0, BNA=7, BOI=2, BOS=22, BQK=0, BQN=1, BRO=1, BTR=2, BTV=0, BUF=5, BUR=7, BWI=12, BZN=0, CAE=0, CAK=0, CDC=0, CEC=0, CHA=1, CHS=0, CIC=1, CID=1, CLD=1, CLE=10, CLL=0, CLT=17, CMH=5, CMI=0, COD=0, COS=2, CPR=0, CRP=0, CRW=0, CSG=0, CVG=14, CWA=1, DAB=2, DAL=7, DAY=5, DBQ=1, DCA=10, DEN=36, DFW=30, DHN=0, DLG=0, DRO=1, DSM=3, DTW=33, EKO=0, ELP=3, ERI=0, EUG=1, EVV=2, EWN=0, EWR=20, EYW=0, FAI=0, FAR=0, FAT=4, FAY=0, FCA=0, FLG=1, FLL=5, FLO=0, FNT=0, FSD=3, FSM=0, FWA=0, GCC=0, GEG=3, GFK=0, GJT=0, GNV=0, GPT=1, GRB=1, GRK=0, GRR=2, GSO=3, GSP=3, GUC=0, HDN=1, HLN=1, HNL=10, HOU=12, HPN=1, HRL=0, HSV=1, IAD=11, IAH=30, ICT=3, IDA=1, ILM=0, IND=4, ISP=0, ITO=0, IYK=0, JAC=1, JAN=1, JAX=2, JFK=11, JNU=0, KOA=3, KTN=0, LAN=0, LAS=21, LAW=0, LAX=35, LBB=0, LEX=0, LFT=0, LGA=16, LGB=0, LIH=1, LIT=3, LM

In [15]:
lista = sc.parallelize(range(1,1000000))
lista.reduce(lambda x,y:x+y)

499999500000

In [17]:
lista.sum()

499999500000

In [16]:
from pyspark.sql.functions import mean, stddev, col
media = df2.select(mean(col('ArrDelay'))).collect()
std = df2.select(stddev(col('ArrDelay'))).collect()

In [18]:
media

[Row(avg(ArrDelay)=7.898227474150665)]

In [19]:
std[0][0]

36.249465747597114

In [20]:
df2.withColumn('Diferencia',df2['ArrDelay'] - df2['DepDelay']).collect()

[Row(Year=2008, Month=1, DayofMonth=7, DayOfWeek=1, DepTime='1140', CRSDepTime=1125, ArrTime='1242', CRSArrTime=1240, UniqueCarrier='WN', FlightNum=321, TailNum='N606SW', ActualElapsedTime='62', CRSElapsedTime='75', AirTime='47', ArrDelay=2, DepDelay='15', Origin='SJC', Dest='LAX', Distance=308, TaxiIn='5', TaxiOut='10', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA', Diferencia=-13.0),
 Row(Year=2008, Month=1, DayofMonth=13, DayOfWeek=7, DepTime='1140', CRSDepTime=1148, ArrTime='1403', CRSArrTime=1410, UniqueCarrier='OH', FlightNum=5341, TailNum='N716CA', ActualElapsedTime='83', CRSElapsedTime='82', AirTime='69', ArrDelay=-7, DepDelay='-8', Origin='MKE', Dest='CVG', Distance=318, TaxiIn='3', TaxiOut='11', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA', Diferencia=1.0),
 Row(Year=2008, Month=1, 

In [21]:
df2.withColumn('Standard',(df2['ArrDelay'] - media[0][0])/std[0][0]).collect()

[Row(Year=2008, Month=1, DayofMonth=7, DayOfWeek=1, DepTime='1140', CRSDepTime=1125, ArrTime='1242', CRSArrTime=1240, UniqueCarrier='WN', FlightNum=321, TailNum='N606SW', ActualElapsedTime='62', CRSElapsedTime='75', AirTime='47', ArrDelay=2, DepDelay='15', Origin='SJC', Dest='LAX', Distance=308, TaxiIn='5', TaxiOut='10', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA', Standard=-0.16271212147565633),
 Row(Year=2008, Month=1, DayofMonth=13, DayOfWeek=7, DepTime='1140', CRSDepTime=1148, ArrTime='1403', CRSArrTime=1410, UniqueCarrier='OH', FlightNum=5341, TailNum='N716CA', ActualElapsedTime='83', CRSElapsedTime='82', AirTime='69', ArrDelay=-7, DepDelay='-8', Origin='MKE', Dest='CVG', Distance=318, TaxiIn='3', TaxiOut='11', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA', Standard=-0.41099164268754035