In [1]:
# Subject: Pyspark
# Author: Saran G
# File Created On: 01 December 2023
# Date of Submission: 27 January 2023

# Necessary Files

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=d4eb6e6e9c062eb76350df9e1cf93c7cfa9da91ef752c5d568c1ae85e89f986e
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('Saran Assignment').getOrCreate()
spark

In [5]:
import gdown

def path(link, filename):
  url = f"https://drive.google.com/uc?id={link}"
  gdown.download(url, filename, quiet=False)


path("1fzb-Nhdb_dmn3oMQrkzmnKZMFAX00IZW", "2014-summary.csv")
path("1lUfnGU_v1ZUw1dRrovASH9M5BgYeAyxQ","2015-summary.csv")
df = spark.read.format("csv").option("header", "true").load("2015-summary.csv")
df2 = spark.read.format("csv").option("header", "true").load("2014-summary.csv")

Downloading...
From: https://drive.google.com/uc?id=1fzb-Nhdb_dmn3oMQrkzmnKZMFAX00IZW
To: /content/2014-summary.csv
100%|██████████| 6.73k/6.73k [00:00<00:00, 9.84MB/s]
Downloading...
From: https://drive.google.com/uc?id=1lUfnGU_v1ZUw1dRrovASH9M5BgYeAyxQ
To: /content/2015-summary.csv
100%|██████████| 7.08k/7.08k [00:00<00:00, 5.96MB/s]


# Day 1 (chapters 1 & 2 from Spark:The definitive guide)

**1. What is bigdata?**

Big data is a large amount of data that is generated and collected by organizations, which can be used to gain valuable insights and drive business growth.

**For example,** a retail company might have big data that includes customer purchase history, website traffic data, and social media interactions. A healthcare provider might have big data that includes patient medical records, insurance claims, and sensor data from medical devices.



**2. Why Spark?**

Apache Spark is a tool that helps with processing big data quickly, easily and efficiently. It can handle large amounts of data, work with many different types of data and is easy to use. It can be integrated with other tools and has a lot of support from the community, which helps it improve and grow.



**3. What is Spark?**

Imagine you are a retailer and you have millions of customer purchase records, website traffic data, and social media interactions stored in your data warehouse. You would like to analyze this data to gain insights about your customers, such as their purchasing habits, what products they like, and what promotions they respond to.

Normally, analyzing such a large amount of data would be a time-consuming and difficult task. But, by using Spark, you can easily process and analyze this data in a matter of minutes. Spark allows you to write simple code to filter, aggregate, and analyze the data in parallel across many machines. This means that you can quickly gain insights from your data and make better-informed business decisions.

 Spark is a powerful tool that allows you to easily process and analyze big data, whether it's stored in a data warehouse, a NoSQL database or a distributed file system like HDFS. With Spark you can easily filter, aggregate, and analyze big data sets, perform advanced analytics, and gain insights from your data.


**4. Internals of spark?**

Apache Spark is a distributed computing system that is composed of a driver program and worker nodes. RDDs are the fundamental data structure in Spark and are used to parallelize data processing. Data shuffling, task scheduler, and caching are some important mechanisms that Spark uses to improve the performance and usability of big data processing.

**5. Highlevel API of spark? Sparksession, Dataframe, Partitions, Transformation, Actions, Lazy Evaluation**

Highlevel API of spark include DataFrame API and Dataset API, which provides a convenient and easy-to-use interface for working with big data in Spark, these APIs are built on top of RDD API and provide more expressive way of working with structured data and additional features such as support for type inference, compile-time type checking, and improved performance.


SparkSession is the entry point to Spark programming, DataFrames are distributed collection of data, Partitions are smaller chunks of data, Transformations are operations on DataFrames, Actions are operations that return value or produce a side effect and Lazy Evaluation is a technique where transformations are not executed until an action is called.

# Day 2

## SCHEMA

In [6]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False)
])
df = spark.read.format("csv")\
.schema(myManualSchema)\
.load("2015-summary.csv")
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



## COLUMNS AND EXPRESSIONS

In [7]:
from pyspark.sql.functions import col, column
df.select(col("count")).show()

+-----+
|count|
+-----+
| null|
|   15|
|    1|
|  344|
|   15|
|   62|
|    1|
|   62|
|  588|
|   40|
|    1|
|  325|
|   39|
|   64|
|    1|
|   41|
|   30|
|    6|
|    4|
|  230|
+-----+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import expr
expr("(((count + 5) * 200) - 6) < count")

Column<'((((count + 5) * 200) - 6) < count)'>

In [9]:
spark.read.format("csv")\
.load("2015-summary.csv")\
.columns

['_c0', '_c1', '_c2']


## RECORDS AND ROWS

In [10]:
df.first()

Row(DEST_COUNTRY_NAME='DEST_COUNTRY_NAME', ORIGIN_COUNTRY_NAME='ORIGIN_COUNTRY_NAME', count=None)

## CREATE ROWS

In [11]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [12]:
myRow[0]


'Hello'

In [13]:
myRow[2]

1

## CREATING DATAFRAME

In [14]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType,\
 StringType, LongType
myManualSchema = StructType([
StructField("col1", StringType(), True),
StructField("col2", StringType(), True),
StructField("col3", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

+-----+----+----+
| col1|col2|col3|
+-----+----+----+
|Hello|null|   1|
+-----+----+----+



##SELECT & SELECTEXPR

In [15]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|DEST_COUNTRY_NAME|
|    United States|
+-----------------+
only showing top 2 rows



In [16]:
df.select(
"DEST_COUNTRY_NAME",
"ORIGIN_COUNTRY_NAME")\
.show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
|    United States|            Romania|
+-----------------+-------------------+
only showing top 2 rows



In [17]:
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [18]:
df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME").show(2)

+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
|    United States|    United States|
+-----------------+-----------------+
only showing top 2 rows



In [19]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-----------------+
|      destination|
+-----------------+
|DEST_COUNTRY_NAME|
|    United States|
+-----------------+
only showing top 2 rows



In [20]:
df.select(
expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")
).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|DEST_COUNTRY_NAME|
|    United States|
+-----------------+
only showing top 2 rows



In [21]:
df.selectExpr(
"DEST_COUNTRY_NAME as newColumnName",
"DEST_COUNTRY_NAME"
).show(2)

+-----------------+-----------------+
|    newColumnName|DEST_COUNTRY_NAME|
+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
|    United States|    United States|
+-----------------+-----------------+
only showing top 2 rows



In [22]:
df.selectExpr(
"*", 
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry" )\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|        false|
|    United States|            Romania|   15|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [23]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              133|
+-----------+---------------------------------+



In [24]:
from pyspark.sql.functions import lit
df.select(
expr("*"),
lit(1).alias("One")
).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|  1|
|    United States|            Romania|   15|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



##ADDING COLUMNS

In [25]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|        1|
|    United States|            Romania|   15|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [26]:
df.withColumn(
"withinCountry",
expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|        false|
|    United States|            Romania|   15|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



##RENAMING COLUMNS

In [27]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [28]:
dfWithLongColName = df\
.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColName.show(2)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|  ORIGIN_COUNTRY_NAME|
|    United States|            Romania|   15|              Romania|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows



In [29]:
dfWithLongColName\
.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`" )\
.show(2)

+---------------------+-------------------+
|This Long Column-Name|            new col|
+---------------------+-------------------+
|  ORIGIN_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
|              Romania|            Romania|
+---------------------+-------------------+
only showing top 2 rows



In [30]:
dfWithLongColName.select(expr("`This Long Column-Name`")).columns

['This Long Column-Name']

In [31]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [32]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

DataFrame[count: bigint, This Long Column-Name: string]

##CHANGING COLUMNS's TYPE

In [33]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [34]:
df.withColumn("count",col("count").cast("int")).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



##FILTERING ROWS

In [35]:
colCondition = df.filter(col("count") < 2).take(2)
conditional = df.where("count < 2").take(2)

In [36]:
print(colCondition)
print(conditional)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]


In [37]:
df.where(col("count") < 2)\
.where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



##GETTING UNIQUE ROWS

In [38]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").count()

257

In [39]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

126

##SORTING

In [40]:
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|
|       United States|          Singapore|    1|
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--

In [41]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(desc(col("count")), asc(col("DEST_COUNTRY_NAME"))).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|
|            Malta|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



##LIMIT

In [42]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
+-----------------+-------------------+-----+



In [43]:
df.orderBy(expr("count desc")).limit(6).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|
|       United States|          Singapore|    1|
|Saint Vincent and...|      United States|    1|
|               Malta|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
+--------------------+-------------------+-----+



##REPARTITION AND COALESCE

In [44]:
df.rdd.getNumPartitions()

1

In [45]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [46]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [47]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [48]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

##AGGREGATION FUNCTIONS


1)count

In [49]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").count()

257

2)CountDistinct

In [50]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

126

3)first and last

In [51]:
df.first()

Row(DEST_COUNTRY_NAME='DEST_COUNTRY_NAME', ORIGIN_COUNTRY_NAME='ORIGIN_COUNTRY_NAME', count=None)

In [52]:
df.tail(1)

[Row(DEST_COUNTRY_NAME='Greece', ORIGIN_COUNTRY_NAME='United States', count=30)]

4)min and max

In [53]:
from pyspark.sql.functions import min
df.select(min("count")).take(1)

[Row(min(count)=1)]

In [54]:
from pyspark.sql.functions import max
df.select(max("count")).take(1)

[Row(max(count)=370002)]

5)sum

In [55]:
from pyspark.sql.functions import sum
df.select(sum("count")).show()

+----------+
|sum(count)|
+----------+
|    453316|
+----------+



6)sumDistinct

In [56]:
from pyspark.sql.functions import sum_distinct

df.select(sum_distinct("count")).show()

+-------------------+
|sum(DISTINCT count)|
+-------------------+
|             450718|
+-------------------+



In [57]:
from pyspark.sql.functions import *

df.select(avg("count")).show()

+-----------+
| avg(count)|
+-----------+
|1770.765625|
+-----------+



7)grouping

In [58]:
df1= df.groupBy("DEST_COUNTRY_NAME").count()
df2 = df.groupBy("ORIGIN_COUNTRY_NAME").count()
df3 = df.groupBy("count").count()

In [59]:
df1.show(3)
df2.show(3)
df3.show(3)

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|         Anguilla|    1|
|           Russia|    1|
|         Paraguay|    1|
+-----------------+-----+
only showing top 3 rows

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|           Paraguay|    1|
|             Russia|    1|
|           Anguilla|    1|
+-------------------+-----+
only showing top 3 rows

+-----+-----+
|count|count|
+-----+-----+
|   26|    2|
|  442|    1|
|   19|    3|
+-----+-----+
only showing top 3 rows



8)Window Function

In [60]:
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [61]:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)


+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+



In [62]:
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
    .show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
+-------------+----------+------+----+



In [63]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
+-------------+----------+------+----------+



In [64]:
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
+-------------+----------+------+------------+



In [65]:
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+



In [66]:
from pyspark.sql.functions import cume_dist    
df.withColumn("cume_dist",cume_dist().over(windowSpec)) \
   .show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
+-------------+----------+------+------------------+



In [67]:
from pyspark.sql.functions import lag    
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
      .show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|null|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+



In [68]:
from pyspark.sql.functions import lead    
df.withColumn("lead",lead("salary",2).over(windowSpec)) \
    .show()

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|null|
|      Michael|     Sales|  4600|null|
+-------------+----------+------+----+



In [69]:
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



# Day 4 (chapter 8)


## 1. Joins

In [76]:

df = spark.read.format("csv").option("header", "true").load("2015-summary.csv")
df2 = spark.read.format("csv").option("header", "true").load("2014-summary.csv")

###1. Inner Join:

In [77]:
inner_join = df.join(df2, ["DEST_COUNTRY_NAME"], "inner")
inner_join.show(5)

+-----------------+-------------------+-----+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+--------------------+-----+
|    United States|            Romania|   15|               Haiti|  193|
|    United States|            Romania|   15|Saint Kitts and N...|  123|
|    United States|            Romania|   15|       French Guiana|    4|
|    United States|            Romania|   15|             Bolivia|   14|
|    United States|            Romania|   15| Trinidad and Tobago|  175|
+-----------------+-------------------+-----+--------------------+-----+
only showing top 5 rows



###2. Left Outer Join:

In [78]:
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [79]:
left_outer_join = df.join(df2, ["DEST_COUNTRY_NAME"], "left_outer")
left_outer_join.show(5)

+-----------------+-------------------+-----+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+--------------------+-----+
|    United States|            Romania|   15|               Haiti|  193|
|    United States|            Romania|   15|Saint Kitts and N...|  123|
|    United States|            Romania|   15|       French Guiana|    4|
|    United States|            Romania|   15|             Bolivia|   14|
|    United States|            Romania|   15| Trinidad and Tobago|  175|
+-----------------+-------------------+-----+--------------------+-----+
only showing top 5 rows



###3. Right Outer Join:

In [80]:
right_outer_join = df.join(df2, ["DEST_COUNTRY_NAME"], "right_outer")
right_outer_join.show(5)

+-----------------+--------------------+-----+-------------------+-----+
|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|ORIGIN_COUNTRY_NAME|count|
+-----------------+--------------------+-----+-------------------+-----+
|    United States|               Haiti|  225|       Saint Martin|    1|
|    United States|Saint Kitts and N...|  145|       Saint Martin|    1|
|    United States|             Bolivia|   13|       Saint Martin|    1|
|    United States| Trinidad and Tobago|  217|       Saint Martin|    1|
|    United States|             Namibia|    1|       Saint Martin|    1|
+-----------------+--------------------+-----+-------------------+-----+
only showing top 5 rows



###4. Full Outer Join:

In [81]:
full_outer_join = df.join(df, ["DEST_COUNTRY_NAME"], "full_outer")
full_outer_join.show(5)

+-------------------+-------------------+-----+-------------------+-----+
|  DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+-------------------+-----+
|            Algeria|      United States|    4|      United States|    4|
|             Angola|      United States|   15|      United States|   15|
|           Anguilla|      United States|   41|      United States|   41|
|Antigua and Barbuda|      United States|  126|      United States|  126|
|          Argentina|      United States|  180|      United States|  180|
+-------------------+-------------------+-----+-------------------+-----+
only showing top 5 rows



###5. Left Semi Join:

In [82]:
left_semi_join = df.join(df, df.DEST_COUNTRY_NAME == df.DEST_COUNTRY_NAME, "leftsemi")
left_semi_join.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



###6. Left Anti Join:

In [83]:
left_anti_join = df2.join(df2, df2.DEST_COUNTRY_NAME == df2.DEST_COUNTRY_NAME, "leftanti")
left_anti_join.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
+-----------------+-------------------+-----+



###7. Cross Join:

In [84]:
cross_join = df2.crossJoin(df)
cross_join.show(5)

+-----------------+-------------------+-----+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+-----------------+-------------------+-----+
|    United States|       Saint Martin|    1|    United States|            Romania|   15|
|    United States|            Romania|   12|    United States|            Romania|   15|
|    United States|            Croatia|    2|    United States|            Romania|   15|
|    United States|            Ireland|  291|    United States|            Romania|   15|
|    United States|              India|   62|    United States|            Romania|   15|
+-----------------+-------------------+-----+-----------------+-------------------+-----+
only showing top 5 rows



## Handling Duplicate column names

**1:** Renaming the columns: You can use the withColumnRenamed method to rename the duplicate columns before the join. For example:

In [85]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME_1").show(5)
df2.withColumnRenamed("DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME_2").show(5)


+-------------------+-------------------+-----+
|DEST_COUNTRY_NAME_1|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|      United States|            Romania|   15|
|      United States|            Croatia|    1|
|      United States|            Ireland|  344|
|              Egypt|      United States|   15|
|      United States|              India|   62|
+-------------------+-------------------+-----+
only showing top 5 rows

+-------------------+-------------------+-----+
|DEST_COUNTRY_NAME_2|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|      United States|       Saint Martin|    1|
|      United States|            Romania|   12|
|      United States|            Croatia|    2|
|      United States|            Ireland|  291|
|      United States|              India|   62|
+-------------------+-------------------+-----+
only showing top 5 rows



**2:** Using the as keyword: When selecting columns, you can use the alias method or the as keyword to give a new name to the duplicate column. For example:

In [86]:
df.select(df["DEST_COUNTRY_NAME"].alias("DEST_COUNTRY_NAME_1")).show(5)
df2.selectExpr("DEST_COUNTRY_NAME as DEST_COUNTRY_NAME_1").show(5)

+-------------------+
|DEST_COUNTRY_NAME_1|
+-------------------+
|      United States|
|      United States|
|      United States|
|              Egypt|
|      United States|
+-------------------+
only showing top 5 rows

+-------------------+
|DEST_COUNTRY_NAME_1|
+-------------------+
|      United States|
|      United States|
|      United States|
|      United States|
|      United States|
+-------------------+
only showing top 5 rows



**3:** Using the withColumn method: You can use the withColumn method to add a new column with a new name, and then drop the original column. For example:

In [87]:
summ= df.withColumn("DEST_COUNTRY_NAME_1", df["DEST_COUNTRY_NAME"])
summ.drop("DEST_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+
|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME_1|
+-------------------+-----+-------------------+
|            Romania|   15|      United States|
|            Croatia|    1|      United States|
|            Ireland|  344|      United States|
|      United States|   15|              Egypt|
|              India|   62|      United States|
+-------------------+-----+-------------------+
only showing top 5 rows



**4:** Using the select method: You can use the select method to select only the columns you need from the DataFrame, which will remove the duplicate columns. For example:

In [88]:
summ.select("DEST_COUNTRY_NAME_1","count","ORIGIN_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+
|DEST_COUNTRY_NAME_1|count|ORIGIN_COUNTRY_NAME|
+-------------------+-----+-------------------+
|      United States|   15|            Romania|
|      United States|    1|            Croatia|
|      United States|  344|            Ireland|
|              Egypt|   15|      United States|
|      United States|   62|              India|
+-------------------+-----+-------------------+
only showing top 5 rows



**5:** Using the drop method: You can use the drop method to drop duplicate columns after join the dataframe. For example:

In [89]:
sum = summ.join(df2, df.DEST_COUNTRY_NAME == df2.DEST_COUNTRY_NAME, "inner")
sum.drop("DEST_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+--------------------+-----+
|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME_1| ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+-------------------+--------------------+-----+
|            Romania|   15|      United States|               Haiti|  193|
|            Romania|   15|      United States|Saint Kitts and N...|  123|
|            Romania|   15|      United States|       French Guiana|    4|
|            Romania|   15|      United States|             Bolivia|   14|
|            Romania|   15|      United States| Trinidad and Tobago|  175|
+-------------------+-----+-------------------+--------------------+-----+
only showing top 5 rows



## 3. How spark performs joins

In PySpark, joins are performed by the join method on a DataFrame, which takes one or more DataFrames as arguments. The basic syntax for joining two DataFrames is as follows:

In [90]:
df.join(df, df.DEST_COUNTRY_NAME == df.DEST_COUNTRY_NAME, "inner").show(5)

+-----------------+-------------------+-----+-----------------+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+-----------------+--------------------+-----+
|    United States|            Romania|   15|    United States|               Haiti|  225|
|    United States|            Romania|   15|    United States|Saint Kitts and N...|  145|
|    United States|            Romania|   15|    United States|             Bolivia|   13|
|    United States|            Romania|   15|    United States| Trinidad and Tobago|  217|
|    United States|            Romania|   15|    United States|             Namibia|    1|
+-----------------+-------------------+-----+-----------------+--------------------+-----+
only showing top 5 rows



# Day 5 (chapter 9)

## 1. Datasources


### 1.1. Basics of reading data


The most commonly used method for reading data in PySpark is the read method of the SparkSession object.

Here is an example of how to read a CSV file:

In [91]:
#sum1 = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

## 2. Basics of write data


The most commonly used method for writing data in PySpark is the write method of the DataFrame object.

Here is an example of how to write a DataFrame to a CSV file:

In [92]:
#df.write.csv("path/to/new_file.csv", header=True)

## 3. CSV files - reading, writing


In [93]:
#df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

#df.write.csv("path/to/new_file.csv", header=True)

## 4. Reading and writing json files


In [94]:
#df = spark.read.json("path/to/file.json")

#df.write.json("path/to/new_file.json")

## 5. Parquet files - important

Parquet is a columnar storage format that is widely used in the Apache Hadoop ecosystem and is supported by many big data processing frameworks, including PySpark. There are several reasons why Parquet is important in PySpark:

**Efficiency:** Parquet stores data in a columnar format, which means that only the required columns are read and processed, rather than reading and processing the entire row. This leads to significant performance improvements when working with large datasets.

**Compression:** Parquet supports various compression algorithms, such as Snappy and Gzip, which can greatly reduce the storage space required for large datasets.

**Schema evolution:** Parquet supports schema evolution, which means that a dataset's schema can be changed over time without having to rewrite the entire dataset. This is particularly useful when working with data that is constantly changing or evolving.

**Predicate pushdown:** Parquet supports predicate pushdown, which means that filtering conditions can be pushed down to the storage layer, rather than being applied in the query layer. This leads to further performance improvements when working with large datasets.

**Interoperability:** Parquet is an open standard, which means that it can be used with a wide variety of data processing frameworks, including PySpark, Hive, Pig, and Impala.

## 6. Reading and Writing parquet files


To read and write parquet files in PySpark you can use the read.parquet() and write.parquet() methods respectively. Here is an example:

In [95]:
#df = spark.read.parquet("path/to/file.parquet")

#df.write.parquet("path/to/new_file.parquet")

## 7. orc - optional


ORC (Optimized Row Columnar) is a file format that is similar to Parquet and is also widely used in the Apache Hadoop ecosystem. Like Parquet, ORC is a columnar storage format that is designed to improve the performance and storage efficiency of big data processing frameworks, such as PySpark.

Here are some of the key benefits of using ORC in PySpark:

**Performance:** ORC stores data in a columnar format, which leads to significant performance improvements when working with large datasets.

**Compression:** ORC supports various compression algorithms, such as Snappy, Zlib, and LZO, which can greatly reduce the storage space required for large datasets.

**Schema evolution:** ORC supports schema evolution, which means that a dataset's schema can be changed over time without having to rewrite the entire dataset.

**Predicate pushdown:** ORC supports predicate pushdown, which means that filtering conditions can be pushed down to the storage layer, rather than being applied in the query layer.

**Interoperability:** ORC is an open standard, which means that it can be used with a wide variety of data processing frameworks, including PySpark, Hive, Pig, and Impala.

## 8. Splittable File Types and Compression

When working with PySpark, you can specify the file type and compression algorithm when reading and writing data using the appropriate methods. For example, you can use the read.parquet() method to read a Parquet file and the write.avro() method to write an Avro file.

Here is an example of reading a parquet file with snappy compression:

In [96]:
#df.write.format("parquet").option("compression", "snappy").save("path/to/new_file.parquet")

## 9. Managing File size


Managing file size in PySpark is an important consideration when working with large datasets. There are several strategies that can be used to manage the file size of your data, including:

**Partitioning:** Partitioning is the process of dividing a large dataset into smaller, more manageable chunks. In PySpark, you can partition your data using the partitionBy method when writing data to a file. This allows you to split your data into multiple smaller files based on a specific column, such as a date or a category.

**Compression:** As mentioned previously, compression is the process of reducing the size of a dataset. This can be done by using a compression algorithm, such as Snappy, Gzip, LZO, or Zlib, when reading or writing data in PySpark.

**File format:** Choosing the appropriate file format for your data can also help manage file size. Columnar file formats, such as Parquet and ORC, are often more efficient than row-based formats, such as CSV or JSON, when working with large datasets because they require less storage space and are more easily compressible.

**Filtering:** Filtering is the process of removing unnecessary data from your dataset. In PySpark, you can filter your data using the filter method to remove rows that do not meet a specific criteria. This can help reduce the file size of your data by removing unneeded information.

**Sampling:** Sampling is the process of selecting a random subset of your data to work with. In PySpark, you can use the sample method to randomly select a certain percentage or number of rows from your dataset. This can be useful when working with large datasets as it allows you to work with a smaller, more manageable subset of your data while still getting an accurate representation of the whole dataset.

# Day 6 (chapter 10)
Entire chapter of Spark SQL


Spark SQL is a module for structured data processing in Apache Spark that provides a programming interface for working with structured data using SQL (Structured Query Language). It allows you to run SQL queries on DataFrames and manipulate structured data using SQL-like syntax.

In [97]:
df.createOrReplaceTempView("summary")

In [98]:
spark.sql('SELECT * FROM summary').show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

1. What is the total number of flights?

In [99]:
spark.sql('SELECT COUNT(*) AS ROWS_count FROM summary').show()

+----------+
|ROWS_count|
+----------+
|       256|
+----------+



2. What are the top 10 destination countries by count?

In [100]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 10\
          ").show()

+------------------+-----------+
| DEST_COUNTRY_NAME|total_count|
+------------------+-----------+
|     United States|   411352.0|
|            Canada|     8399.0|
|            Mexico|     7140.0|
|    United Kingdom|     2025.0|
|             Japan|     1548.0|
|           Germany|     1468.0|
|Dominican Republic|     1353.0|
|       South Korea|     1048.0|
|       The Bahamas|      955.0|
|            France|      935.0|
+------------------+-----------+



3. How many flights originated from the United States?

In [101]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE ORIGIN_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|  411966.0|
+----------+



4. What are the top 5 origin countries for flights to Japan?

In [102]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          WHERE DEST_COUNTRY_NAME = 'Japan'\
          GROUP BY ORIGIN_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 5\
          ").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|     1548.0|
+-------------------+-----------+



5. What is the total number of flights to the United States?

In [103]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE DEST_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|  411352.0|
+----------+



6. What is the total number of flights from the United States?


In [104]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE ORIGIN_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|  411966.0|
+----------+



7. What are the top 10 origin and destination pairs by count?

In [105]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 10\
").show()

+-------------------+-----------------+-----------+
|ORIGIN_COUNTRY_NAME|DEST_COUNTRY_NAME|total_count|
+-------------------+-----------------+-----------+
|      United States|    United States|   370002.0|
|             Canada|    United States|     8483.0|
|      United States|           Canada|     8399.0|
|             Mexico|    United States|     7187.0|
|      United States|           Mexico|     7140.0|
|      United States|   United Kingdom|     2025.0|
|     United Kingdom|    United States|     1970.0|
|      United States|            Japan|     1548.0|
|              Japan|    United States|     1496.0|
|      United States|          Germany|     1468.0|
+-------------------+-----------------+-----------+



8. How many flights originated from each country?

In [106]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY ORIGIN_COUNTRY_NAME\
          ORDER BY total_count DESC\
").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|   411966.0|
|             Canada|     8483.0|
|             Mexico|     7187.0|
|     United Kingdom|     1970.0|
|              Japan|     1496.0|
| Dominican Republic|     1420.0|
|            Germany|     1336.0|
|        The Bahamas|      986.0|
|             France|      952.0|
|              China|      920.0|
|           Colombia|      867.0|
|        South Korea|      827.0|
|            Jamaica|      712.0|
|        Netherlands|      660.0|
|             Brazil|      619.0|
|         Costa Rica|      608.0|
|        El Salvador|      508.0|
|               Cuba|      478.0|
|             Panama|      465.0|
|              Spain|      442.0|
+-------------------+-----------+
only showing top 20 rows



9. How many flights went to each country?

In [107]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
").show()

+------------------+-----------+
| DEST_COUNTRY_NAME|total_count|
+------------------+-----------+
|     United States|   411352.0|
|            Canada|     8399.0|
|            Mexico|     7140.0|
|    United Kingdom|     2025.0|
|             Japan|     1548.0|
|           Germany|     1468.0|
|Dominican Republic|     1353.0|
|       South Korea|     1048.0|
|       The Bahamas|      955.0|
|            France|      935.0|
|          Colombia|      873.0|
|            Brazil|      853.0|
|       Netherlands|      776.0|
|             China|      772.0|
|           Jamaica|      666.0|
|        Costa Rica|      588.0|
|       El Salvador|      561.0|
|            Panama|      510.0|
|              Cuba|      466.0|
|             Spain|      420.0|
+------------------+-----------+
only showing top 20 rows



10. What is the total number of flights between the United States and Canada?

In [108]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE (ORIGIN_COUNTRY_NAME = 'United States' AND DEST_COUNTRY_NAME = 'Canada') OR (ORIGIN_COUNTRY_NAME = 'Canada' AND DEST_COUNTRY_NAME = 'United States')\
").show()


+----------+
|sum(count)|
+----------+
|   16882.0|
+----------+



11. What are the 5 most common origin countries for flights to the United Kingdom?

In [109]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
  FROM summary\
  WHERE DEST_COUNTRY_NAME = 'United Kingdom'\
  GROUP BY ORIGIN_COUNTRY_NAME\
  ORDER BY total_count DESC\
  LIMIT 5\
").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|     2025.0|
+-------------------+-----------+



12. What are the top 10 destination countries for flights from China?


In [110]:
spark.sql("\
  SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'China'\
  GROUP BY DEST_COUNTRY_NAME\
  ORDER BY total_count DESC\
  LIMIT 10\
").show()

+-----------------+-----------+
|DEST_COUNTRY_NAME|total_count|
+-----------------+-----------+
|    United States|      920.0|
+-----------------+-----------+



13. What is the total number of flights between United States and New Zealand?

In [111]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE (ORIGIN_COUNTRY_NAME = 'United States' AND DEST_COUNTRY_NAME = 'New Zealand') OR (ORIGIN_COUNTRY_NAME = 'New Zealand' AND DEST_COUNTRY_NAME = 'United States')\
").show()


+----------+
|sum(count)|
+----------+
|     185.0|
+----------+



14. What is the total number of flights from India?

In [112]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'India'\
").show()

+----------+
|sum(count)|
+----------+
|      62.0|
+----------+



15. What is the rank of the destination country with the most flights?

In [113]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  RANK() OVER (ORDER BY SUM(count) DESC) as rank\
  FROM summary\
  GROUP BY DEST_COUNTRY_NAME\
").show()

+------------------+-----------+----+
| DEST_COUNTRY_NAME|total_count|rank|
+------------------+-----------+----+
|     United States|   411352.0|   1|
|            Canada|     8399.0|   2|
|            Mexico|     7140.0|   3|
|    United Kingdom|     2025.0|   4|
|             Japan|     1548.0|   5|
|           Germany|     1468.0|   6|
|Dominican Republic|     1353.0|   7|
|       South Korea|     1048.0|   8|
|       The Bahamas|      955.0|   9|
|            France|      935.0|  10|
|          Colombia|      873.0|  11|
|            Brazil|      853.0|  12|
|       Netherlands|      776.0|  13|
|             China|      772.0|  14|
|           Jamaica|      666.0|  15|
|        Costa Rica|      588.0|  16|
|       El Salvador|      561.0|  17|
|            Panama|      510.0|  18|
|              Cuba|      466.0|  19|
|             Spain|      420.0|  20|
+------------------+-----------+----+
only showing top 20 rows



16. What is the rank of the destination country with the most flights from France?

In [114]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  RANK() OVER (ORDER BY SUM(count) DESC) as rank\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'France'\
  GROUP BY DEST_COUNTRY_NAME\
").show()

+-----------------+-----------+----+
|DEST_COUNTRY_NAME|total_count|rank|
+-----------------+-----------+----+
|    United States|      952.0|   1|
+-----------------+-----------+----+



17. What is the cumulative sum of flights to each destination country, ordered by the number of flights?

In [115]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  SUM(SUM(count)) OVER (ORDER BY SUM(count) DESC) as cumulative_sum\
  FROM summary\
  GROUP BY DEST_COUNTRY_NAME\
  ORDER BY total_count DESC\
").show()

+------------------+-----------+--------------+
| DEST_COUNTRY_NAME|total_count|cumulative_sum|
+------------------+-----------+--------------+
|     United States|   411352.0|      411352.0|
|            Canada|     8399.0|      419751.0|
|            Mexico|     7140.0|      426891.0|
|    United Kingdom|     2025.0|      428916.0|
|             Japan|     1548.0|      430464.0|
|           Germany|     1468.0|      431932.0|
|Dominican Republic|     1353.0|      433285.0|
|       South Korea|     1048.0|      434333.0|
|       The Bahamas|      955.0|      435288.0|
|            France|      935.0|      436223.0|
|          Colombia|      873.0|      437096.0|
|            Brazil|      853.0|      437949.0|
|       Netherlands|      776.0|      438725.0|
|             China|      772.0|      439497.0|
|           Jamaica|      666.0|      440163.0|
|        Costa Rica|      588.0|      440751.0|
|       El Salvador|      561.0|      441312.0|
|            Panama|      510.0|      44