In [2]:
import findspark
findspark.init()

import pyspark

import pyspark.sql.types as typ

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()

In [3]:
flights = spark.read.csv("/home/shivam/intern/departuredelays.csv",header=True,inferSchema=True)

In [4]:
flights.show()

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE| 

In [5]:
train = spark.read.csv("/home/shivam/intern/2018_08.csv",header = True, inferSchema = True )

In [6]:
air = sc.textFile("/home/shivam/intern/airport-codes-na.txt").map(lambda x : x.split("\t"))

In [7]:
air.take(5)

[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR'],
 ['Abilene', 'TX', 'USA', 'ABI'],
 ['Akron', 'OH', 'USA', 'CAK']]

In [8]:
sch = typ.StructType([
    typ.StructField('City',typ.StringType(),True)
    ,typ.StructField('State',typ.StringType(),True)
    ,typ.StructField('Country',typ.StringType(),True)
    ,typ.StructField('IATA',typ.StringType(),True)
])
header = air.first()

In [9]:
a = air.filter(lambda row: row != header)
a.take(5)

[['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR'],
 ['Abilene', 'TX', 'USA', 'ABI'],
 ['Akron', 'OH', 'USA', 'CAK'],
 ['Alamosa', 'CO', 'USA', 'ALS']]

In [10]:
airport = spark.createDataFrame(a,schema = sch)

In [11]:
airport.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)



In [12]:
airport.show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
|   Alliance|   NE|    USA| AIA|
|     Alpena|   MI|    USA| APN|
|    Altoona|   PA|    USA| AOO|
|   Amarillo|   TX|    USA| AMA|
|Anahim Lake|   BC| Canada| YAA|
|  Anchorage|   AK|    USA| ANC|
|   Appleton|   WI|    USA| ATW|
|     Arviat|  NWT| Canada| YEK|
|  Asheville|   NC|    USA| AVL|
|      Aspen|   CO|    USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows



In [62]:
variables_to_remove = ['date','stop_sequence','from_id','to_id','status','type','from','to']
train = train.drop(*variables_to_remove)

In [63]:
train.show()

+--------+-------------------+-------------------+--------------+----------------+
|train_id|     scheduled_time|        actual_time| delay_minutes|            line|
+--------+-------------------+-------------------+--------------+----------------+
|    4631|2018-08-01 15:19:00|2018-08-01 15:19:14|0.233333333333|  Atl. City Line|
|    4631|2018-08-01 15:40:00|2018-08-01 15:40:18|           0.3|  Atl. City Line|
|    4631|2018-08-01 15:47:00|2018-08-01 15:48:22| 1.36666666667|  Atl. City Line|
|    4631|2018-08-01 15:58:00|2018-08-01 16:03:15|          5.25|  Atl. City Line|
|    4631|2018-08-01 16:05:00|2018-08-01 16:10:24|           5.4|  Atl. City Line|
|    4631|2018-08-01 16:18:00|2018-08-01 16:29:19| 11.3166666667|  Atl. City Line|
|    4631|2018-08-01 16:29:00|2018-08-01 16:31:41| 2.68333333333|  Atl. City Line|
|    4631|2018-08-01 16:39:00|2018-08-01 16:44:25| 5.41666666667|  Atl. City Line|
|    4631|2018-08-01 16:52:00|2018-08-01 16:56:00|           4.0|  Atl. City Line|
|   

In [61]:
def get_stats(group):
    return {'min': group.min(), 'max': group.max(),
            'count': group.count(), 'mean': group.mean()}

t = train.select("departure_delays").groupBy('train_id').apply(get_stats).unstack()

AnalysisException: "cannot resolve '`departure_delays`' given input columns: [actual_time, to, line, from, train_id, delay_minutes, scheduled_time];;\n'Project ['departure_delays]\n+- Project [train_id#52, from#54, to#56, scheduled_time#58, actual_time#59, delay_minutes#60, line#62]\n   +- Project [train_id#52, from#54, to#56, scheduled_time#58, actual_time#59, delay_minutes#60, line#62, type#63]\n      +- Relation[date#51,train_id#52,stop_sequence#53,from#54,from_id#55,to#56,to_id#57,scheduled_time#58,actual_time#59,delay_minutes#60,status#61,line#62,type#63] csv\n"