In [0]:
# install java libs and spark.
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
! tar xf spark-2.4.4-bin-hadoop2.7.tgz
! pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [4]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 25kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 42.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=3085eb27f4cc2cd3ee917724bf130a8ec65164d87d88e01065dab78e24dc171d
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession



In [6]:
#Data files in My Drive
!ls "/content/drive/My Drive/data"

raw_1987.csv.bz2  raw_1993.csv.bz2  raw_1999.csv.bz2  raw_2005.csv.bz2
raw_1988.csv.bz2  raw_1994.csv.bz2  raw_2000.csv.bz2  raw_2006.csv.bz2
raw_1989.csv.bz2  raw_1995.csv.bz2  raw_2001.csv.bz2  raw_2007.csv.bz2
raw_1990.csv.bz2  raw_1996.csv.bz2  raw_2002.csv.bz2  raw_2008.csv.bz2
raw_1991.csv.bz2  raw_1997.csv.bz2  raw_2003.csv.bz2
raw_1992.csv.bz2  raw_1998.csv.bz2  raw_2004.csv.bz2


In [0]:


spark = SparkSession.builder.appName("Flight Carrier Count").master("local[*]").getOrCreate()


In [0]:
# Dataframe Creation and Loading data
carrier_rdd = spark.read.options(header="true",inferschema = "true").csv('/content/drive/My Drive/data/*.csv.bz2')


In [7]:
#Dataset Columns types
carrier_rdd.dtypes

[('Year', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('DepTime', 'string'),
 ('CRSDepTime', 'int'),
 ('ArrTime', 'string'),
 ('CRSArrTime', 'int'),
 ('UniqueCarrier', 'string'),
 ('FlightNum', 'int'),
 ('TailNum', 'string'),
 ('ActualElapsedTime', 'string'),
 ('CRSElapsedTime', 'string'),
 ('AirTime', 'string'),
 ('ArrDelay', 'string'),
 ('DepDelay', 'string'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Distance', 'string'),
 ('TaxiIn', 'string'),
 ('TaxiOut', 'string'),
 ('Cancelled', 'int'),
 ('CancellationCode', 'string'),
 ('Diverted', 'int'),
 ('CarrierDelay', 'string'),
 ('WeatherDelay', 'string'),
 ('NASDelay', 'string'),
 ('SecurityDelay', 'string'),
 ('LateAircraftDelay', 'string')]

In [9]:
# UniqueCarrier in Dataset
carrier_rdd.select("UniqueCarrier").distinct().show(100)

+-------------+
|UniqueCarrier|
+-------------+
|           UA|
|           EA|
|           PI|
|           PS|
|           AA|
|           NW|
|           EV|
|           B6|
|           HP|
|           TW|
|           DL|
|           OO|
|           F9|
|           YV|
|           TZ|
|           US|
|           AQ|
|           MQ|
|           OH|
|           HA|
|       ML (1)|
|           XE|
|           DH|
|           AS|
|           FL|
|           CO|
|       PA (1)|
|           WN|
|           9E|
+-------------+



In [0]:
carrierView = carrier_rdd.createOrReplaceTempView("carriers")

In [11]:
query = "select count(*) from carriers where UniqueCarrier='EA'";

spark.sql(query).show();

+--------+
|count(1)|
+--------+
|  919785|
+--------+



In [0]:
#UniqueCarrier Count
query =  "SELECT UniqueCarrier ,COUNT (*) FROM carriers group by UniqueCarrier";
unique_carriers_df =spark.sql(query);

In [13]:
unique_carriers_df.show(29)

+-------------+--------+
|UniqueCarrier|count(1)|
+-------------+--------+
|           UA|13299817|
|           EA|  919785|
|           PI|  873957|
|           PS|   83617|
|           AA|14984647|
|           NW|10292627|
|           EV| 1697172|
|           B6|  811341|
|           HP| 3636682|
|           TW| 3757747|
|           DL|16547870|
|           OO| 3090853|
|           F9|  336958|
|           YV|  854056|
|           TZ|  208420|
|           US|14075530|
|           AQ|  154381|
|           MQ| 3954895|
|           OH| 1464176|
|           HA|  274265|
|       ML (1)|   70622|
|           XE| 2350309|
|           DH|  693047|
|           AS| 2878021|
|           FL| 1265138|
|           CO| 8145788|
|       PA (1)|  316167|
|           WN|15976022|
|           9E|  521059|
+-------------+--------+

